1 /*****************************************************************************
2 
3 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file clone/clone0clone.cc
28  Innodb Clone System
29 
30  *******************************************************/
31 
32 #include "clone0clone.h"
33 #include <string>
34 #ifdef UNIV_DEBUG
35 #include "current_thd.h" /* current_thd */
36 #include "debug_sync.h"  /* DBUG_SIGNAL_WAIT_FOR */
37 #endif                   /* UNIV_DEBUG */
38 
39 /** Global Clone System */
40 Clone_Sys *clone_sys = nullptr;
41 
42 /** Clone System state */
43 Clone_Sys_State Clone_Sys::s_clone_sys_state = {CLONE_SYS_INACTIVE};
44 
45 /** Number of active abort requests */
46 uint Clone_Sys::s_clone_abort_count = 0;
47 
48 /** Number of active wait requests */
49 uint Clone_Sys::s_clone_wait_count = 0;
50 
Clone_Sys()51 Clone_Sys::Clone_Sys()
52     : m_clone_arr(),
53       m_num_clones(),
54       m_num_apply_clones(),
55       m_snapshot_arr(),
56       m_num_snapshots(),
57       m_num_apply_snapshots(),
58       m_clone_id_generator() {
59   mutex_create(LATCH_ID_CLONE_SYS, &m_clone_sys_mutex);
60 }
61 
~Clone_Sys()62 Clone_Sys::~Clone_Sys() {
63   mutex_free(&m_clone_sys_mutex);
64 
65 #ifdef UNIV_DEBUG
66   /* Verify that no active clone is present */
67   int idx;
68   for (idx = 0; idx < CLONE_ARR_SIZE; idx++) {
69     ut_ad(m_clone_arr[idx] == nullptr);
70   }
71   ut_ad(m_num_clones == 0);
72   ut_ad(m_num_apply_clones == 0);
73 
74   for (idx = 0; idx < SNAPSHOT_ARR_SIZE; idx++) {
75     ut_ad(m_snapshot_arr[idx] == nullptr);
76   }
77   ut_ad(m_num_snapshots == 0);
78   ut_ad(m_num_apply_snapshots == 0);
79 
80 #endif /* UNIV_DEBUG */
81 }
82 
find_clone(const byte * ref_loc,uint loc_len,Clone_Handle_Type hdl_type)83 Clone_Handle *Clone_Sys::find_clone(const byte *ref_loc, uint loc_len,
84                                     Clone_Handle_Type hdl_type) {
85   int idx;
86   bool match_found;
87 
88   Clone_Desc_Locator loc_desc;
89   Clone_Desc_Locator ref_desc;
90   Clone_Handle *clone_hdl;
91 
92   ut_ad(mutex_own(&m_clone_sys_mutex));
93 
94   if (ref_loc == nullptr) {
95     return (nullptr);
96   }
97 
98   ref_desc.deserialize(ref_loc, loc_len, nullptr);
99 
100   match_found = false;
101   clone_hdl = nullptr;
102 
103   for (idx = 0; idx < CLONE_ARR_SIZE; idx++) {
104     clone_hdl = m_clone_arr[idx];
105 
106     if (clone_hdl == nullptr || clone_hdl->is_init()) {
107       continue;
108     }
109 
110     if (clone_hdl->match_hdl_type(hdl_type)) {
111       clone_hdl->build_descriptor(&loc_desc);
112 
113       if (loc_desc.match(&ref_desc)) {
114         match_found = true;
115         break;
116       }
117     }
118   }
119 
120   if (match_found) {
121     clone_hdl->attach();
122     return (clone_hdl);
123   }
124 
125   return (nullptr);
126 }
127 
find_free_index(Clone_Handle_Type hdl_type,uint & free_index)128 int Clone_Sys::find_free_index(Clone_Handle_Type hdl_type, uint &free_index) {
129   free_index = CLONE_ARR_SIZE;
130 
131   uint target_index = CLONE_ARR_SIZE;
132   Clone_Handle *target_clone = nullptr;
133 
134   for (uint idx = 0; idx < CLONE_ARR_SIZE; idx++) {
135     auto clone_hdl = m_clone_arr[idx];
136 
137     if (clone_hdl == nullptr) {
138       free_index = idx;
139       break;
140     }
141 
142     /* If existing clone has some error, it is on its way to exit. */
143     auto err = clone_hdl->check_error(nullptr);
144     if (hdl_type == CLONE_HDL_COPY && (clone_hdl->is_idle() || err != 0)) {
145       target_clone = clone_hdl;
146       target_index = idx;
147     }
148   }
149 
150   if (free_index == CLONE_ARR_SIZE ||
151       (hdl_type == CLONE_HDL_COPY && m_num_clones == MAX_CLONES) ||
152       (hdl_type == CLONE_HDL_APPLY && m_num_apply_clones == MAX_CLONES)) {
153     if (target_clone == nullptr) {
154       my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_CLONES);
155       return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
156     }
157   } else {
158     return (0);
159   }
160 
161   /* We can abort idle clone and use the index. */
162   ut_ad(target_clone != nullptr);
163   ut_ad(mutex_own(&m_clone_sys_mutex));
164   ut_ad(hdl_type == CLONE_HDL_COPY);
165 
166   target_clone->set_state(CLONE_STATE_ABORT);
167 
168   free_index = target_index;
169 
170   /* Sleep for 100 milliseconds. */
171   Clone_Msec sleep_time(100);
172   /* Generate alert message every second. */
173   Clone_Sec alert_interval(1);
174   /* Wait for 5 seconds for idle client to abort. */
175   Clone_Sec time_out(5);
176 
177   bool is_timeout = false;
178   auto err = Clone_Sys::wait(
179       sleep_time, time_out, alert_interval,
180       [&](bool alert, bool &result) {
181         ut_ad(mutex_own(clone_sys->get_mutex()));
182         auto current_clone = m_clone_arr[target_index];
183         result = (current_clone != nullptr);
184 
185         if (thd_killed(nullptr)) {
186           ib::info(ER_IB_CLONE_START_STOP)
187               << "Clone Begin Master wait for abort interrupted";
188           my_error(ER_QUERY_INTERRUPTED, MYF(0));
189           return (ER_QUERY_INTERRUPTED);
190 
191         } else if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
192           ib::info(ER_IB_CLONE_START_STOP)
193               << "Clone Begin Master wait for abort interrupted by DDL";
194           my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
195           return (ER_CLONE_DDL_IN_PROGRESS);
196 
197         } else if (result) {
198           ut_ad(current_clone->is_abort());
199         }
200 
201         if (!result) {
202           ib::info(ER_IB_CLONE_START_STOP) << "Clone Master aborted idle task";
203 
204         } else if (alert) {
205           ib::info(ER_IB_CLONE_TIMEOUT)
206               << "Clone Master waiting for idle task abort";
207         }
208         return (0);
209       },
210       clone_sys->get_mutex(), is_timeout);
211 
212   if (err == 0 && is_timeout) {
213     ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master wait for abort timed out";
214     my_error(ER_INTERNAL_ERROR, MYF(0),
215              "Innodb Clone Copy failed to abort idle clone [timeout]");
216     err = ER_INTERNAL_ERROR;
217   }
218   return (err);
219 }
220 
add_clone(const byte * loc,Clone_Handle_Type hdl_type,Clone_Handle * & clone_hdl)221 int Clone_Sys::add_clone(const byte *loc, Clone_Handle_Type hdl_type,
222                          Clone_Handle *&clone_hdl) {
223   ut_ad(mutex_own(&m_clone_sys_mutex));
224   ut_ad(m_num_clones <= MAX_CLONES);
225   ut_ad(m_num_apply_clones <= MAX_CLONES);
226 
227   auto version = choose_desc_version(loc);
228 
229   /* Find a free index to allocate new clone. */
230   uint free_idx;
231   auto err = find_free_index(hdl_type, free_idx);
232   if (err != 0) {
233     return (err);
234   }
235 
236   /* Create a new clone. */
237   clone_hdl = UT_NEW(Clone_Handle(hdl_type, version, free_idx), mem_key_clone);
238 
239   if (clone_hdl == nullptr) {
240     my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Clone_Handle));
241     return (ER_OUTOFMEMORY);
242   }
243 
244   m_clone_arr[free_idx] = clone_hdl;
245 
246   if (hdl_type == CLONE_HDL_COPY) {
247     ++m_num_clones;
248   } else {
249     ut_ad(hdl_type == CLONE_HDL_APPLY);
250     ++m_num_apply_clones;
251   }
252 
253   clone_hdl->attach();
254 
255   return (0);
256 }
257 
drop_clone(Clone_Handle * clone_handle)258 void Clone_Sys::drop_clone(Clone_Handle *clone_handle) {
259   ut_ad(mutex_own(&m_clone_sys_mutex));
260 
261   if (clone_handle->detach() > 0) {
262     return;
263   }
264 
265   auto index = clone_handle->get_index();
266 
267   ut_ad(m_clone_arr[index] == clone_handle);
268 
269   m_clone_arr[index] = nullptr;
270 
271   if (clone_handle->is_copy_clone()) {
272     ut_ad(m_num_clones > 0);
273     --m_num_clones;
274 
275   } else {
276     ut_ad(m_num_apply_clones > 0);
277     --m_num_apply_clones;
278   }
279 
280   UT_DELETE(clone_handle);
281 }
282 
get_clone_by_index(const byte * loc,uint loc_len)283 Clone_Handle *Clone_Sys::get_clone_by_index(const byte *loc, uint loc_len) {
284   Clone_Desc_Locator loc_desc;
285   Clone_Handle *clone_hdl;
286 
287   loc_desc.deserialize(loc, loc_len, nullptr);
288 
289 #ifdef UNIV_DEBUG
290   Clone_Desc_Header *header = &loc_desc.m_header;
291   ut_ad(header->m_type == CLONE_DESC_LOCATOR);
292 #endif
293   clone_hdl = m_clone_arr[loc_desc.m_clone_index];
294 
295   ut_ad(clone_hdl != nullptr);
296 
297   return (clone_hdl);
298 }
299 
attach_snapshot(Clone_Handle_Type hdl_type,Ha_clone_type clone_type,ib_uint64_t snapshot_id,bool is_pfs_monitor,Clone_Snapshot * & snapshot)300 int Clone_Sys::attach_snapshot(Clone_Handle_Type hdl_type,
301                                Ha_clone_type clone_type,
302                                ib_uint64_t snapshot_id, bool is_pfs_monitor,
303                                Clone_Snapshot *&snapshot) {
304   uint idx;
305   uint free_idx = SNAPSHOT_ARR_SIZE;
306 
307   ut_ad(mutex_own(&m_clone_sys_mutex));
308 
309   /* Try to attach to an existing snapshot. */
310   for (idx = 0; idx < SNAPSHOT_ARR_SIZE; idx++) {
311     snapshot = m_snapshot_arr[idx];
312 
313     if (snapshot != nullptr) {
314       if (snapshot->attach(hdl_type, is_pfs_monitor)) {
315         return (0);
316       }
317     } else if (free_idx == SNAPSHOT_ARR_SIZE) {
318       free_idx = idx;
319     }
320   }
321 
322   if (free_idx == SNAPSHOT_ARR_SIZE ||
323       (hdl_type == CLONE_HDL_COPY && m_num_snapshots == MAX_SNAPSHOTS) ||
324       (hdl_type == CLONE_HDL_APPLY && m_num_apply_snapshots == MAX_SNAPSHOTS)) {
325     my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_SNAPSHOTS);
326     return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
327   }
328 
329   /* Create a new snapshot. */
330   snapshot = UT_NEW(Clone_Snapshot(hdl_type, clone_type, free_idx, snapshot_id),
331                     mem_key_clone);
332 
333   if (snapshot == nullptr) {
334     my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Clone_Snapshot));
335     return (ER_OUTOFMEMORY);
336   }
337 
338   m_snapshot_arr[free_idx] = snapshot;
339 
340   if (hdl_type == CLONE_HDL_COPY) {
341     ++m_num_snapshots;
342   } else {
343     ut_ad(hdl_type == CLONE_HDL_APPLY);
344     ++m_num_apply_snapshots;
345   }
346 
347   snapshot->attach(hdl_type, is_pfs_monitor);
348 
349   return (0);
350 }
351 
detach_snapshot(Clone_Snapshot * snapshot,Clone_Handle_Type hdl_type)352 void Clone_Sys::detach_snapshot(Clone_Snapshot *snapshot,
353                                 Clone_Handle_Type hdl_type) {
354   uint num_clones;
355 
356   ut_ad(mutex_own(&m_clone_sys_mutex));
357   num_clones = snapshot->detach();
358 
359   if (num_clones != 0) {
360     return;
361   }
362 
363   /* All clones are detached. Drop the snapshot. */
364   uint index;
365 
366   index = snapshot->get_index();
367   ut_ad(m_snapshot_arr[index] == snapshot);
368 
369   UT_DELETE(snapshot);
370 
371   m_snapshot_arr[index] = nullptr;
372 
373   if (hdl_type == CLONE_HDL_COPY) {
374     ut_ad(m_num_snapshots > 0);
375     --m_num_snapshots;
376 
377   } else {
378     ut_ad(hdl_type == CLONE_HDL_APPLY);
379     ut_ad(m_num_apply_snapshots > 0);
380     --m_num_apply_snapshots;
381   }
382 }
383 
check_active_clone(bool print_alert)384 bool Clone_Sys::check_active_clone(bool print_alert) {
385   ut_ad(mutex_own(&m_clone_sys_mutex));
386 
387   bool active_clone = false;
388   /* Check for active clone operations. */
389   for (int idx = 0; idx < CLONE_ARR_SIZE; idx++) {
390     auto clone_hdl = m_clone_arr[idx];
391 
392     if (clone_hdl != nullptr && clone_hdl->is_copy_clone()) {
393       active_clone = true;
394       break;
395     }
396   }
397 
398   if (active_clone && print_alert) {
399     ib::info(ER_IB_CLONE_TIMEOUT) << "DDL waiting for CLONE to abort";
400   }
401   return (active_clone);
402 }
403 
mark_abort(bool force)404 bool Clone_Sys::mark_abort(bool force) {
405   ut_ad(mutex_own(&m_clone_sys_mutex));
406 
407   /* Check for active clone operations. */
408   auto active_clone = check_active_clone(false);
409 
410   /* If active clone is running and force is not set then
411   return without setting abort state. */
412   if (active_clone && !force) {
413     return (false);
414   }
415 
416   ++s_clone_abort_count;
417 
418   if (s_clone_sys_state != CLONE_SYS_ABORT) {
419     ut_ad(s_clone_abort_count == 1);
420     s_clone_sys_state = CLONE_SYS_ABORT;
421 
422     DEBUG_SYNC_C("clone_marked_abort");
423   }
424 
425   if (active_clone) {
426     ut_ad(force);
427 
428     /* Sleep for 1 second */
429     Clone_Msec sleep_time(Clone_Sec(1));
430     /* Generate alert message every minute. */
431     Clone_Sec alert_time(Clone_Min(1));
432     /* Timeout in 15 minutes - safeguard against hang, should not happen */
433     Clone_Sec time_out(Clone_Min(15));
434 
435     bool is_timeout = false;
436 
437     wait(
438         sleep_time, time_out, alert_time,
439         [&](bool alert, bool &result) {
440           ut_ad(mutex_own(&m_clone_sys_mutex));
441           result = check_active_clone(alert);
442 
443           return (0);
444         },
445         &m_clone_sys_mutex, is_timeout);
446 
447     if (is_timeout) {
448       ut_ad(false);
449       ib::warn(ER_IB_CLONE_TIMEOUT) << "DDL wait for CLONE abort timed out"
450                                        ", Continuing DDL.";
451     }
452   }
453   return (true);
454 }
455 
mark_active()456 void Clone_Sys::mark_active() {
457   ut_ad(mutex_own(&m_clone_sys_mutex));
458 
459   ut_ad(s_clone_abort_count > 0);
460   --s_clone_abort_count;
461 
462   if (s_clone_abort_count == 0) {
463     s_clone_sys_state = CLONE_SYS_ACTIVE;
464   }
465 }
466 
mark_wait()467 bool Clone_Sys::mark_wait() {
468   ut_ad(mutex_own(&m_clone_sys_mutex));
469 
470   /* Check for active clone operations. */
471   auto active_clone = check_active_clone(false);
472 
473   /* If active clone is running return. */
474   if (active_clone) {
475     return (false);
476   }
477 
478   /* Let any new clone operation wait till mark_free is called. */
479   ++s_clone_wait_count;
480   return (true);
481 }
482 
mark_free()483 void Clone_Sys::mark_free() {
484   ut_ad(mutex_own(&m_clone_sys_mutex));
485   ut_ad(s_clone_wait_count > 0);
486   --s_clone_wait_count;
487 }
488 
wait_for_free(THD * thd)489 int Clone_Sys::wait_for_free(THD *thd) {
490   ut_ad(mutex_own(&m_clone_sys_mutex));
491 
492   if (s_clone_wait_count == 0) {
493     return (0);
494   }
495 
496   auto wait_condition = [&](bool alert, bool &result) {
497     ut_ad(mutex_own(&m_clone_sys_mutex));
498     result = (s_clone_wait_count == 0);
499     if (alert) {
500       ib::info(ER_IB_CLONE_OPERATION)
501           << "CLONE waiting for redo/undo encryption";
502     }
503     if (thd_killed(thd)) {
504       my_error(ER_QUERY_INTERRUPTED, MYF(0));
505       return (ER_QUERY_INTERRUPTED);
506     }
507     return (0);
508   };
509 
510   /* Sleep for 100 milliseconds */
511   Clone_Msec sleep_time(100);
512   /* Generate alert message 5 second. */
513   Clone_Sec alert_time(5);
514   /* Timeout in 5 minutes - safeguard against hang, should not happen */
515   Clone_Sec time_out(Clone_Min(5));
516 
517   bool is_timeout = false;
518   auto err = wait(sleep_time, time_out, alert_time, wait_condition,
519                   &m_clone_sys_mutex, is_timeout);
520 
521   if (err != 0) {
522     return (err);
523   }
524 
525   if (is_timeout) {
526     ut_ad(false);
527     my_error(ER_INTERNAL_ERROR, MYF(0),
528              "Innodb Clone timeout waiting for background");
529     return (ER_INTERNAL_ERROR);
530   }
531 
532   return (0);
533 }
534 
get_next_id()535 ib_uint64_t Clone_Sys::get_next_id() {
536   ut_ad(mutex_own(&m_clone_sys_mutex));
537 
538   return (++m_clone_id_generator);
539 }
540 
541 #ifdef UNIV_DEBUG
debug_wait(uint chunk_num,Clone_Task * task)542 void Clone_Task_Manager::debug_wait(uint chunk_num, Clone_Task *task) {
543   auto state = m_clone_snapshot->get_state();
544   auto nchunks = m_clone_snapshot->get_num_chunks();
545 
546   /* Stop somewhere in the middle of current stage */
547   if (!task->m_is_master || task->m_ignore_sync ||
548       (chunk_num != 0 && chunk_num < (nchunks / 2 + 1))) {
549     return;
550   }
551 
552   if (state == CLONE_SNAPSHOT_FILE_COPY) {
553     DBUG_SIGNAL_WAIT_FOR(current_thd, "gr_clone_wait", "gr_clone_paused",
554                          "gr_clone_continue");
555 
556     DEBUG_SYNC_C("clone_file_copy");
557 
558   } else if (state == CLONE_SNAPSHOT_PAGE_COPY) {
559     DEBUG_SYNC_C("clone_page_copy");
560 
561   } else if (state == CLONE_SNAPSHOT_REDO_COPY) {
562     DEBUG_SYNC_C("clone_redo_copy");
563   }
564 
565   task->m_ignore_sync = true;
566 }
567 
debug_restart(Clone_Task * task,int in_err,int restart_count)568 int Clone_Task_Manager::debug_restart(Clone_Task *task, int in_err,
569                                       int restart_count) {
570   auto err = in_err;
571 
572   if (err != 0 || restart_count < task->m_debug_counter || !task->m_is_master) {
573     return (err);
574   }
575 
576   /* Restart somewhere in the middle of all chunks */
577   if (restart_count == 1) {
578     auto nchunks = m_clone_snapshot->get_num_chunks();
579     auto cur_chunk = task->m_task_meta.m_chunk_num;
580 
581     if (cur_chunk != 0 && cur_chunk < (nchunks / 2 + 1)) {
582       return (err);
583     }
584   }
585 
586   DBUG_EXECUTE_IF("clone_restart_apply", err = ER_NET_READ_ERROR;);
587 
588   if (err != 0) {
589     my_error(err, MYF(0));
590   }
591 
592   /* Allow restart from next point */
593   task->m_debug_counter = restart_count + 1;
594 
595   return (err);
596 }
597 #endif /* UNIV_DEBUG */
598 
init(Clone_Snapshot * snapshot)599 void Clone_Task_Manager::init(Clone_Snapshot *snapshot) {
600   uint idx;
601 
602   m_clone_snapshot = snapshot;
603 
604   m_current_state = snapshot->get_state();
605 
606   /* ACK state is the previous state of current state */
607   if (m_current_state == CLONE_SNAPSHOT_INIT) {
608     m_ack_state = CLONE_SNAPSHOT_NONE;
609   } else {
610     /* If clone is attaching to active snapshot with
611     other concurrent clone */
612     ut_ad(m_current_state == CLONE_SNAPSHOT_FILE_COPY);
613     m_ack_state = CLONE_SNAPSHOT_INIT;
614   }
615 
616   m_chunk_info.m_total_chunks = 0;
617 
618   m_chunk_info.m_min_unres_chunk = 1;
619   m_chunk_info.m_max_res_chunk = 0;
620 
621   /* Initialize all tasks in inactive state. */
622   for (idx = 0; idx < CLONE_MAX_TASKS; idx++) {
623     Clone_Task *task;
624 
625     task = m_clone_tasks + idx;
626     task->m_task_state = CLONE_TASK_INACTIVE;
627 
628     task->m_serial_desc = nullptr;
629     task->m_alloc_len = 0;
630 
631     task->m_current_file_des.m_file = OS_FILE_CLOSED;
632     task->m_current_file_index = 0;
633     task->m_file_cache = true;
634 
635     task->m_current_buffer = nullptr;
636     task->m_buffer_alloc_len = 0;
637     task->m_is_master = false;
638     task->m_has_thd = false;
639     task->m_data_size = 0;
640     ut_d(task->m_ignore_sync = false);
641     ut_d(task->m_debug_counter = 2);
642   }
643 
644   m_num_tasks = 0;
645   m_num_tasks_finished = 0;
646   m_num_tasks_transit = 0;
647   m_restart_count = 0;
648 
649   m_next_state = CLONE_SNAPSHOT_NONE;
650   m_send_state_meta = false;
651   m_transferred_file_meta = false;
652   m_saved_error = 0;
653 
654   /* Allocate error file name */
655   auto heap = m_clone_snapshot->lock_heap();
656 
657   m_err_file_name = static_cast<char *>(mem_heap_alloc(heap, FN_REFLEN_SE));
658 
659   m_err_file_len = FN_REFLEN_SE;
660 
661   m_clone_snapshot->release_heap(heap);
662 
663   /* Initialize error file name */
664   memset(m_err_file_name, 0, m_err_file_len);
665 
666   strncpy(m_err_file_name, "Clone File", m_err_file_len);
667 }
668 
reserve_task(THD * thd,uint & task_id)669 void Clone_Task_Manager::reserve_task(THD *thd, uint &task_id) {
670   ut_ad(mutex_own(&m_state_mutex));
671 
672   Clone_Task *task = nullptr;
673 
674   task_id = 0;
675 
676   /* Find inactive task in the array. */
677   for (; task_id < CLONE_MAX_TASKS; task_id++) {
678     task = m_clone_tasks + task_id;
679     auto task_meta = &task->m_task_meta;
680 
681     if (task->m_task_state == CLONE_TASK_INACTIVE) {
682       task->m_task_state = CLONE_TASK_ACTIVE;
683 
684       task_meta->m_task_index = task_id;
685       task_meta->m_chunk_num = 0;
686       task_meta->m_block_num = 0;
687 
688       /* Set first task as master task */
689       if (task_id == 0) {
690         ut_ad(thd != nullptr);
691         task->m_is_master = true;
692       }
693 
694       /* Whether the task has an associated user session */
695       task->m_has_thd = (thd != nullptr);
696 
697       break;
698     }
699 
700     task = nullptr;
701   }
702 
703   ut_ad(task != nullptr);
704 }
705 
alloc_buffer(Clone_Task * task)706 int Clone_Task_Manager::alloc_buffer(Clone_Task *task) {
707   if (task->m_alloc_len != 0) {
708     /* Task buffers are already allocated in case
709     clone operation is restarted. */
710 
711     ut_ad(task->m_buffer_alloc_len != 0);
712     ut_ad(task->m_serial_desc != nullptr);
713     ut_ad(task->m_current_buffer != nullptr);
714 
715     return (0);
716   }
717 
718   /* Allocate task descriptor. */
719   auto heap = m_clone_snapshot->lock_heap();
720 
721   /* Maximum variable length of descriptor. */
722   auto alloc_len =
723       static_cast<uint>(m_clone_snapshot->get_max_file_name_length());
724 
725   /* Check with maximum path name length. */
726   if (alloc_len < FN_REFLEN_SE) {
727     alloc_len = FN_REFLEN_SE;
728   }
729 
730   /* Maximum fixed length of descriptor */
731   alloc_len += CLONE_DESC_MAX_BASE_LEN;
732 
733   /* Add some buffer. */
734   alloc_len += CLONE_DESC_MAX_BASE_LEN;
735 
736   ut_ad(task->m_alloc_len == 0);
737   ut_ad(task->m_buffer_alloc_len == 0);
738 
739   task->m_alloc_len = alloc_len;
740   task->m_buffer_alloc_len = m_clone_snapshot->get_dyn_buffer_length();
741 
742   alloc_len += task->m_buffer_alloc_len;
743 
744   alloc_len += CLONE_ALIGN_DIRECT_IO;
745 
746   ut_ad(task->m_serial_desc == nullptr);
747 
748   task->m_serial_desc = static_cast<byte *>(mem_heap_alloc(heap, alloc_len));
749 
750   m_clone_snapshot->release_heap(heap);
751 
752   if (task->m_serial_desc == nullptr) {
753     my_error(ER_OUTOFMEMORY, MYF(0), alloc_len);
754     return (ER_OUTOFMEMORY);
755   }
756 
757   if (task->m_buffer_alloc_len > 0) {
758     task->m_current_buffer = static_cast<byte *>(ut_align(
759         task->m_serial_desc + task->m_alloc_len, CLONE_ALIGN_DIRECT_IO));
760   }
761 
762   return (0);
763 }
764 
handle_error_other_task(bool set_error)765 int Clone_Task_Manager::handle_error_other_task(bool set_error) {
766   char errbuf[MYSYS_STRERROR_SIZE];
767 
768   if (set_error && m_saved_error != 0) {
769     ib::info(ER_IB_CLONE_OPERATION)
770         << "Clone error from other task code: " << m_saved_error;
771   }
772 
773   if (!set_error) {
774     return (m_saved_error);
775   }
776 
777   /* Handle shutdown and KILL */
778   if (thd_killed(nullptr)) {
779     my_error(ER_QUERY_INTERRUPTED, MYF(0));
780     return (ER_QUERY_INTERRUPTED);
781   }
782 
783   /* Check if DDL has marked for abort. Ignore for client apply. */
784   if ((m_clone_snapshot == nullptr || m_clone_snapshot->is_copy()) &&
785       Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
786     my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
787     return (ER_CLONE_DDL_IN_PROGRESS);
788   }
789 
790   switch (m_saved_error) {
791     case ER_CLONE_DDL_IN_PROGRESS:
792     case ER_QUERY_INTERRUPTED:
793       my_error(m_saved_error, MYF(0));
794       break;
795 
796     /* Network errors */
797     case ER_NET_PACKET_TOO_LARGE:
798     case ER_NET_PACKETS_OUT_OF_ORDER:
799     case ER_NET_UNCOMPRESS_ERROR:
800     case ER_NET_READ_ERROR:
801     case ER_NET_READ_INTERRUPTED:
802     case ER_NET_ERROR_ON_WRITE:
803     case ER_NET_WRITE_INTERRUPTED:
804     case ER_NET_WAIT_ERROR:
805       my_error(m_saved_error, MYF(0));
806       break;
807 
808     /* IO Errors */
809     case ER_CANT_OPEN_FILE:
810     case ER_CANT_CREATE_FILE:
811     case ER_ERROR_ON_READ:
812     case ER_ERROR_ON_WRITE:
813       ut_ad(m_err_file_name != nullptr);
814       ut_ad(m_err_file_len != 0);
815       my_error(m_saved_error, MYF(0), m_err_file_name, errno,
816                my_strerror(errbuf, sizeof(errbuf), errno));
817       break;
818 
819     case ER_FILE_EXISTS_ERROR:
820       ut_ad(m_err_file_name != nullptr);
821       ut_ad(m_err_file_len != 0);
822       my_error(m_saved_error, MYF(0), m_err_file_name);
823       break;
824 
825     case ER_WRONG_VALUE:
826       ut_ad(m_err_file_name != nullptr);
827       ut_ad(m_err_file_len != 0);
828       my_error(m_saved_error, MYF(0), "file path", m_err_file_name);
829       break;
830 
831     case ER_CLONE_DONOR:
832       /* Will get the error message from remote */
833       break;
834 
835     case 0:
836       break;
837 
838     default:
839       my_error(ER_INTERNAL_ERROR, MYF(0),
840                "Innodb Clone error in concurrent task");
841   }
842 
843   return (m_saved_error);
844 }
845 
wait_before_add(const byte * ref_loc,uint loc_len)846 bool Clone_Task_Manager::wait_before_add(const byte *ref_loc, uint loc_len) {
847   ut_ad(mutex_own(&m_state_mutex));
848 
849   /* 1. Don't wait if master task. */
850   if (m_num_tasks == 0) {
851     return (false);
852   }
853 
854   /* 2. Wait for state transition to get over */
855   if (in_transit_state()) {
856     return (true);
857   }
858 
859   /* 3. For copy state(donor), wait for the state to reach file copy. */
860   ut_ad(m_current_state != CLONE_SNAPSHOT_NONE);
861   if (ref_loc == nullptr) {
862     return (m_current_state == CLONE_SNAPSHOT_INIT);
863   }
864 
865   Clone_Desc_Locator ref_desc;
866   ref_desc.deserialize(ref_loc, loc_len, nullptr);
867 
868   ut_ad(m_current_state <= ref_desc.m_state);
869 
870   /* 4. For apply state (recipient), wait for apply state to reach
871   the copy state in reference locator. */
872   if (m_current_state != ref_desc.m_state) {
873     return (true);
874   }
875 
876   /* 4A. For file copy state, wait for all metadata to be transferred. */
877   if (m_current_state == CLONE_SNAPSHOT_FILE_COPY &&
878       !is_file_metadata_transferred()) {
879     return (true);
880   }
881   return (false);
882 }
883 
add_task(THD * thd,const byte * ref_loc,uint loc_len,uint & task_id)884 int Clone_Task_Manager::add_task(THD *thd, const byte *ref_loc, uint loc_len,
885                                  uint &task_id) {
886   mutex_enter(&m_state_mutex);
887 
888   /* Check for error from other tasks */
889   bool raise_error = (thd != nullptr);
890 
891   auto err = handle_error_other_task(raise_error);
892 
893   if (err != 0) {
894     mutex_exit(&m_state_mutex);
895     return (err);
896   }
897 
898   if (wait_before_add(ref_loc, loc_len)) {
899     bool is_timeout = false;
900     int alert_count = 0;
901     err = Clone_Sys::wait_default(
902         [&](bool alert, bool &result) {
903           ut_ad(mutex_own(&m_state_mutex));
904           result = wait_before_add(ref_loc, loc_len);
905 
906           /* Check for error from other tasks */
907           err = handle_error_other_task(raise_error);
908 
909           if (err == 0 && result && alert) {
910             /* Print messages every 1 minute - default is 5 seconds. */
911             if (++alert_count == 12) {
912               alert_count = 0;
913               ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Add task waiting "
914                                                "for state change";
915             }
916           }
917           return (err);
918         },
919         &m_state_mutex, is_timeout);
920 
921     if (err != 0) {
922       mutex_exit(&m_state_mutex);
923       return (err);
924 
925     } else if (is_timeout) {
926       ut_ad(false);
927       mutex_exit(&m_state_mutex);
928 
929       ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Add task timed out";
930 
931       my_error(ER_INTERNAL_ERROR, MYF(0),
932                "Clone Add task failed: "
933                "Wait too long for state transition");
934       return (ER_INTERNAL_ERROR);
935     }
936   }
937 
938   /* We wait for state transition before adding new task. */
939   ut_ad(!in_transit_state());
940 
941   if (m_num_tasks == CLONE_MAX_TASKS) {
942     err = ER_CLONE_TOO_MANY_CONCURRENT_CLONES;
943     my_error(err, MYF(0), CLONE_MAX_TASKS);
944 
945     mutex_exit(&m_state_mutex);
946     return (err);
947   }
948 
949   reserve_task(thd, task_id);
950   ut_ad(task_id <= m_num_tasks);
951 
952   ++m_num_tasks;
953 
954   mutex_exit(&m_state_mutex);
955   return (0);
956 }
957 
drop_task(THD * thd,uint task_id,bool & is_master)958 bool Clone_Task_Manager::drop_task(THD *thd, uint task_id, bool &is_master) {
959   mutex_enter(&m_state_mutex);
960 
961   if (in_transit_state()) {
962     ut_ad(m_num_tasks_transit > 0);
963     --m_num_tasks_transit;
964   }
965 
966   ut_ad(m_num_tasks > 0);
967   --m_num_tasks;
968 
969   auto task = get_task_by_index(task_id);
970 
971   add_incomplete_chunk(task);
972 
973   reset_chunk(task);
974 
975   ut_ad(task->m_task_state == CLONE_TASK_ACTIVE);
976   task->m_task_state = CLONE_TASK_INACTIVE;
977 
978   is_master = task->m_is_master;
979 
980   if (!is_master) {
981     mutex_exit(&m_state_mutex);
982     return (false);
983   }
984 
985   /* Master needs to wait for other tasks to get dropped */
986   if (m_num_tasks > 0) {
987     bool is_timeout = false;
988     int alert_count = 0;
989     auto err = Clone_Sys::wait_default(
990         [&](bool alert, bool &result) {
991           ut_ad(mutex_own(&m_state_mutex));
992           result = (m_num_tasks > 0);
993 
994           if (thd_killed(thd)) {
995             return (ER_QUERY_INTERRUPTED);
996 
997           } else if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
998             return (ER_CLONE_DDL_IN_PROGRESS);
999           }
1000           if (alert && result) {
1001             /* Print messages every 1 minute - default is 5 seconds. */
1002             if (++alert_count == 12) {
1003               alert_count = 0;
1004               ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master drop task waiting "
1005                                                "for other tasks";
1006             }
1007           }
1008           return (0);
1009         },
1010         &m_state_mutex, is_timeout);
1011 
1012     if (err != 0) {
1013       mutex_exit(&m_state_mutex);
1014       return (false);
1015 
1016     } else if (is_timeout) {
1017       ut_ad(false);
1018       ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master drop task timed out";
1019 
1020       mutex_exit(&m_state_mutex);
1021       return (false);
1022     }
1023   }
1024 
1025   mutex_exit(&m_state_mutex);
1026 
1027   /* Restart after network error */
1028   auto current_err = handle_error_other_task(false);
1029   if (is_network_error(current_err)) {
1030     return (true);
1031   }
1032   return (false);
1033 }
1034 
get_next_chunk()1035 uint32_t Clone_Task_Manager::get_next_chunk() {
1036   auto &max_chunk = m_chunk_info.m_max_res_chunk;
1037   auto &min_chunk = m_chunk_info.m_min_unres_chunk;
1038 
1039   ut_ad(max_chunk <= m_chunk_info.m_total_chunks);
1040 
1041   if (min_chunk > m_chunk_info.m_total_chunks) {
1042     /* No more chunks left for current state. */
1043     return (0);
1044   }
1045 
1046   /* Return the minimum unreserved chunk */
1047   auto ret_chunk = min_chunk;
1048 
1049   /* Mark the chunk reserved. The chunk must be unreserved. */
1050   ut_ad(!m_chunk_info.m_reserved_chunks[min_chunk]);
1051   m_chunk_info.m_reserved_chunks[min_chunk] = true;
1052 
1053   /* Increase max reserved chunk if needed */
1054   if (max_chunk < min_chunk) {
1055     max_chunk = min_chunk;
1056   }
1057 
1058   ut_ad(max_chunk == m_chunk_info.m_reserved_chunks.get_max_set_bit());
1059 
1060   /* Set the next unreserved chunk */
1061   while (m_chunk_info.m_reserved_chunks[min_chunk]) {
1062     ++min_chunk;
1063 
1064     /* Exit if all chunks are over */
1065     if (min_chunk > max_chunk || min_chunk > m_chunk_info.m_total_chunks) {
1066       ut_ad(min_chunk > m_chunk_info.m_total_chunks ||
1067             !m_chunk_info.m_reserved_chunks[min_chunk]);
1068 
1069       break;
1070     }
1071   }
1072 
1073   return (ret_chunk);
1074 }
1075 
get_next_incomplete_chunk(uint32 & block_num)1076 uint32_t Clone_Task_Manager::get_next_incomplete_chunk(uint32 &block_num) {
1077   block_num = 0;
1078 
1079   auto &chunks = m_chunk_info.m_incomplete_chunks;
1080 
1081   if (chunks.empty()) {
1082     return (0);
1083   }
1084 
1085   auto it = chunks.begin();
1086 
1087   auto chunk_num = it->first;
1088 
1089   block_num = it->second;
1090 
1091   chunks.erase(it);
1092 
1093   return (chunk_num);
1094 }
1095 
reserve_next_chunk(Clone_Task * task,uint32_t & ret_chunk,uint32_t & ret_block)1096 int Clone_Task_Manager::reserve_next_chunk(Clone_Task *task,
1097                                            uint32_t &ret_chunk,
1098                                            uint32_t &ret_block) {
1099   mutex_enter(&m_state_mutex);
1100   ret_chunk = 0;
1101 
1102   /* Check for error from other tasks */
1103   auto err = handle_error_other_task(task->m_has_thd);
1104   if (err != 0) {
1105     mutex_exit(&m_state_mutex);
1106     return (err);
1107   }
1108 
1109   if (process_inclomplete_chunk()) {
1110     /* Get next incomplete chunk. */
1111     ret_chunk = get_next_incomplete_chunk(ret_block);
1112     ut_ad(ret_chunk != 0);
1113 
1114   } else {
1115     /* Get next unreserved chunk. */
1116     ret_block = 0;
1117     ret_chunk = get_next_chunk();
1118   }
1119 
1120   reset_chunk(task);
1121   mutex_exit(&m_state_mutex);
1122   return (0);
1123 }
1124 
set_chunk(Clone_Task * task,Clone_Task_Meta * new_meta)1125 int Clone_Task_Manager::set_chunk(Clone_Task *task, Clone_Task_Meta *new_meta) {
1126   auto cur_meta = &task->m_task_meta;
1127   int err = 0;
1128 
1129   ut_ad(cur_meta->m_task_index == new_meta->m_task_index);
1130   cur_meta->m_task_index = new_meta->m_task_index;
1131 
1132   /* Check if this is a new chunk */
1133   if (cur_meta->m_chunk_num != new_meta->m_chunk_num) {
1134     mutex_enter(&m_state_mutex);
1135 
1136     /* Mark the current chunk reserved */
1137     m_chunk_info.m_reserved_chunks[new_meta->m_chunk_num] = true;
1138 
1139     /* Check and remove the chunk from incomplete chunk list. */
1140     auto &chunks = m_chunk_info.m_incomplete_chunks;
1141 
1142     auto key_value = chunks.find(new_meta->m_chunk_num);
1143 
1144     if (key_value != chunks.end()) {
1145       ut_ad(key_value->second < new_meta->m_block_num);
1146       chunks.erase(key_value);
1147     }
1148 
1149     reset_chunk(task);
1150 
1151     /* Check for error from other tasks */
1152     err = handle_error_other_task(task->m_has_thd);
1153 
1154     mutex_exit(&m_state_mutex);
1155 
1156     cur_meta->m_chunk_num = new_meta->m_chunk_num;
1157 
1158 #ifdef UNIV_DEBUG
1159     /* Network failure in the middle of a state */
1160     err = debug_restart(task, err, 1);
1161 
1162     /* Wait in the middle of state */
1163     debug_wait(cur_meta->m_chunk_num, task);
1164 #endif /* UNIV_DEBUG */
1165   }
1166 
1167   cur_meta->m_block_num = new_meta->m_block_num;
1168 
1169   return (err);
1170 }
1171 
add_incomplete_chunk(Clone_Task * task)1172 void Clone_Task_Manager::add_incomplete_chunk(Clone_Task *task) {
1173   /* Track incomplete chunks during apply */
1174   if (m_clone_snapshot->is_copy()) {
1175     return;
1176   }
1177 
1178   auto &task_meta = task->m_task_meta;
1179 
1180   /* The task doesn't have any incomplete chunks */
1181   if (task_meta.m_chunk_num == 0) {
1182     return;
1183   }
1184 
1185   auto &chunks = m_chunk_info.m_incomplete_chunks;
1186 
1187   chunks[task_meta.m_chunk_num] = task_meta.m_block_num;
1188 
1189   ib::info(ER_IB_CLONE_RESTART)
1190       << "Clone Apply add incomplete Chunk = " << task_meta.m_chunk_num
1191       << " Block = " << task_meta.m_block_num
1192       << " Task = " << task_meta.m_task_index;
1193 }
1194 
1195 /** Print completed chunk information
1196 @param[in]	chunk_info	chunk information */
print_chunk_info(Chunk_Info * chunk_info)1197 static void print_chunk_info(Chunk_Info *chunk_info) {
1198   for (auto &chunk : chunk_info->m_incomplete_chunks) {
1199     ib::info(ER_IB_CLONE_RESTART)
1200         << "Incomplete: Chunk = " << chunk.first << " Block = " << chunk.second;
1201   }
1202 
1203   auto min = chunk_info->m_reserved_chunks.get_min_unset_bit();
1204   auto max = chunk_info->m_reserved_chunks.get_max_set_bit();
1205 
1206   auto size = chunk_info->m_reserved_chunks.size_bits();
1207 
1208   ib::info(ER_IB_CLONE_RESTART)
1209       << "Number of Chunks: " << size << " Min = " << min << " Max = " << max;
1210 
1211   ut_ad(min != max);
1212 
1213   if (max > min) {
1214     ib::info(ER_IB_CLONE_RESTART)
1215         << "Reserved Chunk Information : " << min << " - " << max
1216         << " Chunks: " << max - min + 1;
1217 
1218     for (uint32_t index = min; index <= max;) {
1219       uint32_t ind = 0;
1220 
1221       const int STR_SIZE = 64;
1222       char str[STR_SIZE + 1];
1223 
1224       while (index <= max && ind < STR_SIZE) {
1225         str[ind] = chunk_info->m_reserved_chunks[index] ? '1' : '0';
1226         ++index;
1227         ++ind;
1228       }
1229 
1230       ut_ad(ind <= STR_SIZE);
1231       str[ind] = '\0';
1232 
1233       ib::info(ER_IB_CLONE_RESTART) << str;
1234     }
1235   }
1236 }
1237 
reinit_apply_state(const byte * ref_loc,uint ref_len,byte * & new_loc,uint & new_len,uint & alloc_len)1238 void Clone_Task_Manager::reinit_apply_state(const byte *ref_loc, uint ref_len,
1239                                             byte *&new_loc, uint &new_len,
1240                                             uint &alloc_len) {
1241   ut_ad(m_current_state != CLONE_SNAPSHOT_NONE);
1242   ut_ad(!m_clone_snapshot->is_copy());
1243 
1244   /* Only master task should be present */
1245   ut_ad(m_num_tasks == 1);
1246 
1247   /* Reset State transition information */
1248   reset_transition();
1249 
1250   /* Reset Error information */
1251   reset_error();
1252 
1253   /* Check if current state is finished and acknowledged */
1254   ut_ad(m_ack_state <= m_current_state);
1255 
1256   if (m_ack_state == m_current_state) {
1257     ++m_num_tasks_finished;
1258   }
1259 
1260   ++m_restart_count;
1261 
1262   switch (m_current_state) {
1263     case CLONE_SNAPSHOT_INIT:
1264       ib::info(ER_IB_CLONE_RESTART) << "Clone Apply Restarting State: INIT";
1265       break;
1266 
1267     case CLONE_SNAPSHOT_FILE_COPY:
1268       ib::info(ER_IB_CLONE_OPERATION)
1269           << "Clone Apply Restarting State: FILE COPY";
1270       break;
1271 
1272     case CLONE_SNAPSHOT_PAGE_COPY:
1273       ib::info(ER_IB_CLONE_OPERATION)
1274           << "Clone Apply Restarting State: PAGE COPY";
1275       break;
1276 
1277     case CLONE_SNAPSHOT_REDO_COPY:
1278       ib::info(ER_IB_CLONE_OPERATION)
1279           << "Clone Apply Restarting State: REDO COPY";
1280       break;
1281 
1282     case CLONE_SNAPSHOT_DONE:
1283       ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply Restarting State: DONE";
1284       break;
1285 
1286     case CLONE_SNAPSHOT_NONE:
1287       /* fall through */
1288 
1289     default:
1290       ut_ad(false);
1291   }
1292 
1293   if (m_current_state == CLONE_SNAPSHOT_INIT ||
1294       m_current_state == CLONE_SNAPSHOT_DONE ||
1295       m_current_state == CLONE_SNAPSHOT_NONE) {
1296     new_loc = nullptr;
1297     new_len = 0;
1298     return;
1299   }
1300 
1301   /* Add incomplete chunks from master task */
1302   auto task = get_task_by_index(0);
1303 
1304   add_incomplete_chunk(task);
1305 
1306   /* Reset task information */
1307   mutex_enter(&m_state_mutex);
1308   reset_chunk(task);
1309   mutex_exit(&m_state_mutex);
1310 
1311   /* Allocate for locator if required */
1312   Clone_Desc_Locator temp_locator;
1313 
1314   temp_locator.deserialize(ref_loc, ref_len, nullptr);
1315 
1316   /* Update current state information */
1317   temp_locator.m_state = m_current_state;
1318 
1319   /* Update sub-state information */
1320   temp_locator.m_metadata_transferred = m_transferred_file_meta;
1321 
1322   auto len = temp_locator.m_header.m_length;
1323   len += static_cast<uint>(m_chunk_info.get_serialized_length(0));
1324 
1325   if (len > alloc_len) {
1326     /* Allocate for more for possible reuse */
1327     len = CLONE_DESC_MAX_BASE_LEN;
1328     ut_ad(len >= temp_locator.m_header.m_length);
1329 
1330     len += static_cast<uint>(m_chunk_info.get_serialized_length(
1331         static_cast<uint32_t>(CLONE_MAX_TASKS)));
1332 
1333     auto heap = m_clone_snapshot->lock_heap();
1334 
1335     new_loc = static_cast<byte *>(mem_heap_zalloc(heap, len));
1336     alloc_len = len;
1337 
1338     m_clone_snapshot->release_heap(heap);
1339   }
1340 
1341   new_len = alloc_len;
1342 
1343   temp_locator.serialize(new_loc, new_len, &m_chunk_info, nullptr);
1344 
1345   print_chunk_info(&m_chunk_info);
1346 }
1347 
reinit_copy_state(const byte * loc,uint loc_len)1348 void Clone_Task_Manager::reinit_copy_state(const byte *loc, uint loc_len) {
1349   ut_ad(m_clone_snapshot->is_copy());
1350   ut_ad(m_num_tasks == 0);
1351 
1352   mutex_enter(&m_state_mutex);
1353 
1354   /* Reset State transition information */
1355   reset_transition();
1356 
1357   /* Reset Error information */
1358   reset_error();
1359 
1360   ++m_restart_count;
1361 
1362   switch (m_current_state) {
1363     case CLONE_SNAPSHOT_INIT:
1364       ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: INIT";
1365       break;
1366 
1367     case CLONE_SNAPSHOT_FILE_COPY:
1368       ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: FILE COPY";
1369       break;
1370 
1371     case CLONE_SNAPSHOT_PAGE_COPY:
1372       ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: PAGE COPY";
1373       break;
1374 
1375     case CLONE_SNAPSHOT_REDO_COPY:
1376       ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: REDO COPY";
1377       break;
1378 
1379     case CLONE_SNAPSHOT_DONE:
1380       ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: DONE";
1381       break;
1382 
1383     case CLONE_SNAPSHOT_NONE:
1384       /* fall through */
1385 
1386     default:
1387       ut_ad(false);
1388   }
1389 
1390   if (m_current_state == CLONE_SNAPSHOT_NONE) {
1391     ut_ad(false);
1392     mutex_exit(&m_state_mutex);
1393     return;
1394   }
1395 
1396   /* Reset to beginning of current state */
1397   init_state();
1398 
1399   /* Compare local and remote state */
1400   Clone_Desc_Locator temp_locator;
1401 
1402   temp_locator.deserialize(loc, loc_len, nullptr);
1403 
1404   /* If Local state is ahead, we must have finished the
1405   previous state confirmed by ACK. It is enough to
1406   start from current state. */
1407   if (temp_locator.m_state != m_current_state) {
1408 #ifdef UNIV_DEBUG
1409     /* Current state could be just one state ahead */
1410     if (temp_locator.m_state == CLONE_SNAPSHOT_INIT) {
1411       ut_ad(m_current_state == CLONE_SNAPSHOT_FILE_COPY);
1412 
1413     } else if (temp_locator.m_state == CLONE_SNAPSHOT_FILE_COPY) {
1414       ut_ad(m_current_state == CLONE_SNAPSHOT_PAGE_COPY);
1415 
1416     } else if (temp_locator.m_state == CLONE_SNAPSHOT_PAGE_COPY) {
1417       ut_ad(m_current_state == CLONE_SNAPSHOT_REDO_COPY);
1418 
1419     } else if (temp_locator.m_state == CLONE_SNAPSHOT_REDO_COPY) {
1420       ut_ad(m_current_state == CLONE_SNAPSHOT_DONE);
1421 
1422     } else {
1423       ut_ad(false);
1424     }
1425 #endif /* UNIV_DEBUG */
1426 
1427     /* Apply state is behind. Need to send state metadata */
1428     m_send_state_meta = true;
1429 
1430     mutex_exit(&m_state_mutex);
1431     return;
1432   }
1433 
1434   m_send_state_meta = false;
1435   m_transferred_file_meta = temp_locator.m_metadata_transferred;
1436 
1437   /* Set progress information for current state */
1438   temp_locator.deserialize(loc, loc_len, &m_chunk_info);
1439 
1440   m_chunk_info.init_chunk_nums();
1441 
1442   mutex_exit(&m_state_mutex);
1443 
1444   print_chunk_info(&m_chunk_info);
1445 }
1446 
init_state()1447 void Clone_Task_Manager::init_state() {
1448   ut_ad(mutex_own(&m_state_mutex));
1449 
1450   auto num_chunks = m_clone_snapshot->get_num_chunks();
1451 
1452   auto heap = m_clone_snapshot->lock_heap();
1453 
1454   m_chunk_info.m_reserved_chunks.reset(num_chunks, heap);
1455 
1456   m_clone_snapshot->release_heap(heap);
1457 
1458   m_chunk_info.m_incomplete_chunks.clear();
1459 
1460   m_chunk_info.m_min_unres_chunk = 1;
1461   ut_ad(m_chunk_info.m_reserved_chunks.get_min_unset_bit() == 1);
1462 
1463   m_chunk_info.m_max_res_chunk = 0;
1464   ut_ad(m_chunk_info.m_reserved_chunks.get_max_set_bit() == 0);
1465 
1466   m_chunk_info.m_total_chunks = num_chunks;
1467 }
1468 
ack_state(const Clone_Desc_State * state_desc)1469 void Clone_Task_Manager::ack_state(const Clone_Desc_State *state_desc) {
1470   mutex_enter(&m_state_mutex);
1471 
1472   m_ack_state = state_desc->m_state;
1473   ut_ad(m_current_state == m_ack_state);
1474   ib::info(ER_IB_CLONE_OPERATION)
1475       << "Clone set state change ACK: " << m_ack_state;
1476 
1477   mutex_exit(&m_state_mutex);
1478 }
1479 
wait_ack(Clone_Handle * clone,Clone_Task * task,Ha_clone_cbk * callback)1480 int Clone_Task_Manager::wait_ack(Clone_Handle *clone, Clone_Task *task,
1481                                  Ha_clone_cbk *callback) {
1482   mutex_enter(&m_state_mutex);
1483 
1484   ++m_num_tasks_finished;
1485 
1486   /* All chunks are finished */
1487   reset_chunk(task);
1488 
1489   if (!task->m_is_master) {
1490     mutex_exit(&m_state_mutex);
1491     return (0);
1492   }
1493 
1494   int err = 0;
1495 
1496   if (m_current_state != m_ack_state) {
1497     bool is_timeout = false;
1498     int alert_count = 0;
1499     err = Clone_Sys::wait_default(
1500         [&](bool alert, bool &result) {
1501           ut_ad(mutex_own(&m_state_mutex));
1502           result = (m_current_state != m_ack_state);
1503 
1504           /* Check for error from other tasks */
1505           err = handle_error_other_task(task->m_has_thd);
1506 
1507           if (err == 0 && result && alert) {
1508             /* Print messages every 1 minute - default is 5 seconds. */
1509             if (++alert_count == 12) {
1510               alert_count = 0;
1511               ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master waiting "
1512                                                "for state change ACK ";
1513             }
1514             err = clone->send_keep_alive(task, callback);
1515           }
1516           return (err);
1517         },
1518         &m_state_mutex, is_timeout);
1519 
1520     /* Wait too long */
1521     if (err == 0 && is_timeout) {
1522       ut_ad(false);
1523       ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master wait for state change ACK"
1524                                        " timed out";
1525 
1526       my_error(ER_INTERNAL_ERROR, MYF(0),
1527                "Innodb clone state ack wait too long");
1528 
1529       err = ER_INTERNAL_ERROR;
1530     }
1531   }
1532   mutex_exit(&m_state_mutex);
1533 
1534   if (err == 0) {
1535     ib::info(ER_IB_CLONE_OPERATION) << "Clone Master received state change ACK";
1536   }
1537 
1538   return (err);
1539 }
1540 
finish_state(Clone_Task * task)1541 int Clone_Task_Manager::finish_state(Clone_Task *task) {
1542   mutex_enter(&m_state_mutex);
1543 
1544   if (task->m_is_master) {
1545     /* Check if ACK was sent before restart */
1546     if (m_ack_state != m_current_state) {
1547       ut_ad(m_ack_state < m_current_state);
1548       ++m_num_tasks_finished;
1549     } else {
1550       ut_ad(m_restart_count > 0);
1551     }
1552     m_ack_state = m_current_state;
1553 
1554   } else {
1555     ++m_num_tasks_finished;
1556   }
1557 
1558   /* All chunks are finished */
1559   reset_chunk(task);
1560 
1561   /* Check for error from other tasks */
1562   auto err = handle_error_other_task(task->m_has_thd);
1563 
1564   if (!task->m_is_master || err != 0) {
1565     mutex_exit(&m_state_mutex);
1566     return (err);
1567   }
1568 
1569   ut_ad(task->m_is_master);
1570 
1571 #ifdef UNIV_DEBUG
1572   /* Wait before ending state, if needed */
1573   if (!task->m_ignore_sync) {
1574     mutex_exit(&m_state_mutex);
1575     debug_wait(0, task);
1576     mutex_enter(&m_state_mutex);
1577   }
1578 #endif /* UNIV_DEBUG */
1579 
1580   if (m_num_tasks_finished < m_num_tasks) {
1581     bool is_timeout = false;
1582     int alert_count = 0;
1583     err = Clone_Sys::wait_default(
1584         [&](bool alert, bool &result) {
1585           ut_ad(mutex_own(&m_state_mutex));
1586           result = (m_num_tasks_finished < m_num_tasks);
1587 
1588           /* Check for error from other tasks */
1589           err = handle_error_other_task(task->m_has_thd);
1590 
1591           if (err == 0 && result && alert) {
1592             /* Print messages every 1 minute - default is 5 seconds. */
1593             if (++alert_count == 12) {
1594               alert_count = 0;
1595               ib::info(ER_IB_CLONE_TIMEOUT)
1596                   << "Clone Apply Master waiting for "
1597                      "workers before sending ACK."
1598                   << " Total = " << m_num_tasks
1599                   << " Finished = " << m_num_tasks_finished;
1600             }
1601           }
1602           return (err);
1603         },
1604         &m_state_mutex, is_timeout);
1605 
1606     if (err == 0 && is_timeout) {
1607       ut_ad(false);
1608       ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Apply Master wait timed out";
1609 
1610       my_error(ER_INTERNAL_ERROR, MYF(0),
1611                "Clone Apply Master wait timed out before sending ACK");
1612 
1613       err = ER_INTERNAL_ERROR;
1614     }
1615   }
1616 
1617   mutex_exit(&m_state_mutex);
1618   return (err);
1619 }
1620 
change_state(Clone_Task * task,Clone_Desc_State * state_desc,Snapshot_State new_state,Clone_Alert_Func cbk,uint & num_wait)1621 int Clone_Task_Manager::change_state(Clone_Task *task,
1622                                      Clone_Desc_State *state_desc,
1623                                      Snapshot_State new_state,
1624                                      Clone_Alert_Func cbk, uint &num_wait) {
1625   mutex_enter(&m_state_mutex);
1626 
1627   num_wait = 0;
1628 
1629   /* Check for error from other tasks */
1630   auto err = handle_error_other_task(task->m_has_thd);
1631 
1632   if (err != 0) {
1633     mutex_exit(&m_state_mutex);
1634     return (err);
1635   }
1636 
1637   /* First requesting task needs to initiate the state transition. */
1638   if (!in_transit_state()) {
1639     m_num_tasks_transit = m_num_tasks;
1640     m_next_state = new_state;
1641   }
1642 
1643   /* Master needs to wait for all other tasks. */
1644   if (task->m_is_master && m_num_tasks_transit > 1) {
1645     num_wait = m_num_tasks_transit;
1646 
1647     mutex_exit(&m_state_mutex);
1648     return (0);
1649   }
1650 
1651   /* Need to wait for transition to next state */
1652   if (!task->m_is_master) {
1653     /* Move the current task over to the next state */
1654     ut_ad(m_num_tasks_transit > 0);
1655     --m_num_tasks_transit;
1656 
1657     num_wait = m_num_tasks_transit;
1658     ut_ad(num_wait > 0);
1659 
1660     mutex_exit(&m_state_mutex);
1661     return (0);
1662   }
1663 
1664   /* Last task requesting the state change. All other tasks have
1665   already moved over to next state and waiting for the transition
1666   to complete. Now it is safe to do the snapshot state transition. */
1667 
1668   ut_ad(task->m_is_master);
1669   mutex_exit(&m_state_mutex);
1670 
1671   uint num_pending = 0;
1672 
1673   if (m_clone_snapshot->is_copy()) {
1674     ib::info(ER_IB_CLONE_OPERATION)
1675         << "Clone State Change : Number of tasks = " << m_num_tasks;
1676   } else {
1677     ib::info(ER_IB_CLONE_OPERATION)
1678         << "Clone Apply State Change : Number of tasks = " << m_num_tasks;
1679   }
1680 
1681   err = m_clone_snapshot->change_state(
1682       state_desc, m_next_state, task->m_current_buffer,
1683       task->m_buffer_alloc_len, cbk, num_pending);
1684   if (err != 0) {
1685     return (err);
1686   }
1687 
1688   /* Need to wait for other concurrent clone attached to current snapshot. */
1689   if (num_pending > 0) {
1690     bool is_timeout = false;
1691     int alert_count = 0;
1692     err = Clone_Sys::wait_default(
1693         [&](bool alert, bool &result) {
1694           num_pending = m_clone_snapshot->check_state(m_next_state, false);
1695           result = (num_pending > 0);
1696 
1697           /* Check for possible shutdown/kill */
1698           mutex_enter(&m_state_mutex);
1699           err = handle_error_other_task(task->m_has_thd);
1700           mutex_exit(&m_state_mutex);
1701 
1702           if (err == 0 && result && alert) {
1703             /* Print messages every 1 minute - default is 5 seconds. */
1704             if (++alert_count == 12) {
1705               alert_count = 0;
1706               ib::info(ER_IB_CLONE_TIMEOUT)
1707                   << "Clone: master state change waiting for other clone";
1708             }
1709           }
1710           return (err);
1711         },
1712         nullptr, is_timeout);
1713 
1714     if (err != 0) {
1715       /* Exit from state transition */
1716       num_pending = m_clone_snapshot->check_state(m_next_state, true);
1717       return (err);
1718 
1719     } else if (is_timeout) {
1720       /* Exit from state transition */
1721       num_pending = m_clone_snapshot->check_state(m_next_state, true);
1722       if (num_pending != 0) {
1723         ut_ad(false);
1724         ib::info(ER_IB_CLONE_TIMEOUT) << "Clone: master state change timed out";
1725         my_error(ER_INTERNAL_ERROR, MYF(0),
1726                  "Clone: master state change wait for other clones timed out: "
1727                  "Wait too long for state transition");
1728         return (ER_INTERNAL_ERROR);
1729       }
1730     }
1731   }
1732 
1733   mutex_enter(&m_state_mutex);
1734 
1735   /* Check for error from other tasks. Must finish the state transition
1736   even in case of an error. */
1737   err = handle_error_other_task(task->m_has_thd);
1738 
1739   m_current_state = m_next_state;
1740   m_next_state = CLONE_SNAPSHOT_NONE;
1741 
1742   --m_num_tasks_transit;
1743   /* In case of error, the other tasks might have exited. */
1744   ut_ad(m_num_tasks_transit == 0 || err != 0);
1745   m_num_tasks_transit = 0;
1746 
1747   /* For restart, m_num_tasks_finished may not be up to date */
1748   ut_ad(m_num_tasks_finished == m_num_tasks || err != 0);
1749   m_num_tasks_finished = 0;
1750 
1751   ut_d(task->m_ignore_sync = false);
1752   ut_d(task->m_debug_counter = 0);
1753 
1754   /* Initialize next state after transition. */
1755   init_state();
1756 
1757   mutex_exit(&m_state_mutex);
1758 
1759   return (err);
1760 }
1761 
check_state(Clone_Task * task,Snapshot_State new_state,bool exit_on_wait,int in_err,uint32_t & num_wait)1762 int Clone_Task_Manager::check_state(Clone_Task *task, Snapshot_State new_state,
1763                                     bool exit_on_wait, int in_err,
1764                                     uint32_t &num_wait) {
1765   mutex_enter(&m_state_mutex);
1766 
1767   num_wait = 0;
1768 
1769   if (in_err != 0) {
1770     /* Save error for other tasks */
1771     if (m_saved_error == 0) {
1772       m_saved_error = in_err;
1773     }
1774     /* Mark transit incomplete */
1775     if (in_transit_state()) {
1776       ++m_num_tasks_transit;
1777     }
1778     mutex_exit(&m_state_mutex);
1779     return (in_err);
1780   }
1781 
1782   /* Check for error from other tasks */
1783   auto err = handle_error_other_task(task->m_has_thd);
1784 
1785   if (err != 0) {
1786     mutex_exit(&m_state_mutex);
1787     return (err);
1788   }
1789 
1790   /* Check if current transition is still in progress. */
1791   if (in_transit_state() && new_state == m_next_state) {
1792     num_wait = m_num_tasks_transit;
1793 
1794     ut_ad(num_wait > 0);
1795 
1796     if (exit_on_wait) {
1797       /* Mark error for other tasks */
1798       m_saved_error = ER_INTERNAL_ERROR;
1799       /* Mark transit incomplete */
1800       ++m_num_tasks_transit;
1801     }
1802   }
1803 
1804   mutex_exit(&m_state_mutex);
1805 
1806   return (0);
1807 }
1808 
Clone_Handle(Clone_Handle_Type handle_type,uint clone_version,uint clone_index)1809 Clone_Handle::Clone_Handle(Clone_Handle_Type handle_type, uint clone_version,
1810                            uint clone_index)
1811     : m_clone_handle_type(handle_type),
1812       m_clone_handle_state(CLONE_STATE_INIT),
1813       m_clone_locator(),
1814       m_locator_length(),
1815       m_restart_loc(),
1816       m_restart_loc_len(),
1817       m_clone_desc_version(clone_version),
1818       m_clone_arr_index(clone_index),
1819       m_clone_id(),
1820       m_ref_count(),
1821       m_allow_restart(false),
1822       m_clone_dir(),
1823       m_clone_task_manager() {
1824   mutex_create(LATCH_ID_CLONE_TASK, m_clone_task_manager.get_mutex());
1825 
1826   Clone_Desc_Locator loc_desc;
1827   loc_desc.init(0, 0, CLONE_SNAPSHOT_NONE, clone_version, clone_index);
1828 
1829   auto loc = &m_version_locator[0];
1830   uint len = CLONE_DESC_MAX_BASE_LEN;
1831 
1832   memset(loc, 0, CLONE_DESC_MAX_BASE_LEN);
1833 
1834   loc_desc.serialize(loc, len, nullptr, nullptr);
1835 
1836   ut_ad(len <= CLONE_DESC_MAX_BASE_LEN);
1837 }
1838 
~Clone_Handle()1839 Clone_Handle::~Clone_Handle() {
1840   mutex_free(m_clone_task_manager.get_mutex());
1841 
1842   if (!is_init()) {
1843     clone_sys->detach_snapshot(m_clone_task_manager.get_snapshot(),
1844                                m_clone_handle_type);
1845   }
1846   ut_ad(m_ref_count == 0);
1847 }
1848 
create_clone_directory()1849 int Clone_Handle::create_clone_directory() {
1850   ut_ad(!is_copy_clone());
1851   dberr_t db_err = DB_SUCCESS;
1852   std::string file_name;
1853 
1854   if (!replace_datadir()) {
1855     /* Create data directory, if we not replacing the current one. */
1856     db_err = os_file_create_subdirs_if_needed(m_clone_dir);
1857     if (db_err == DB_SUCCESS) {
1858       auto status = os_file_create_directory(m_clone_dir, false);
1859       /* Create mysql schema directory. */
1860       file_name.assign(m_clone_dir);
1861       file_name.append(OS_PATH_SEPARATOR_STR);
1862       if (status) {
1863         file_name.append("mysql");
1864         status = os_file_create_directory(file_name.c_str(), true);
1865       }
1866       if (!status) {
1867         db_err = DB_ERROR;
1868       }
1869     }
1870     file_name.assign(m_clone_dir);
1871     file_name.append(OS_PATH_SEPARATOR_STR);
1872   }
1873 
1874   /* Create clone status directory. */
1875   if (db_err == DB_SUCCESS) {
1876     file_name.append(CLONE_FILES_DIR);
1877     auto status = os_file_create_directory(file_name.c_str(), false);
1878     if (!status) {
1879       db_err = DB_ERROR;
1880     }
1881   }
1882   /* Check and report error. */
1883   if (db_err != DB_SUCCESS) {
1884     char errbuf[MYSYS_STRERROR_SIZE];
1885     my_error(ER_CANT_CREATE_DB, MYF(0), m_clone_dir, errno,
1886              my_strerror(errbuf, sizeof(errbuf), errno));
1887 
1888     return (ER_CANT_CREATE_DB);
1889   }
1890   return (0);
1891 }
1892 
init(const byte * ref_loc,uint ref_len,Ha_clone_type type,const char * data_dir)1893 int Clone_Handle::init(const byte *ref_loc, uint ref_len, Ha_clone_type type,
1894                        const char *data_dir) {
1895   ib_uint64_t snapshot_id;
1896   Clone_Snapshot *snapshot;
1897 
1898   m_clone_dir = data_dir;
1899 
1900   bool enable_monitor = true;
1901 
1902   /* Generate unique clone identifiers for copy clone handle. */
1903   if (is_copy_clone()) {
1904     m_clone_id = clone_sys->get_next_id();
1905     snapshot_id = clone_sys->get_next_id();
1906 
1907     /* For local clone, monitor while applying data. */
1908     if (ref_loc == nullptr) {
1909       enable_monitor = false;
1910     }
1911 
1912   } else {
1913     /* We don't provision instance on which active clone is running. */
1914     if (replace_datadir() && clone_sys->check_active_clone(false)) {
1915       my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_CLONES);
1916       return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
1917     }
1918     /* Return keeping the clone in INIT state. The locator
1919     would only have the version information. */
1920     if (ref_loc == nullptr) {
1921       return (0);
1922     }
1923 
1924     auto err = create_clone_directory();
1925     if (err != 0) {
1926       return (err);
1927     }
1928 
1929     /* Set clone identifiers from reference locator for apply clone
1930     handle. The reference locator is from copy clone handle. */
1931     Clone_Desc_Locator loc_desc;
1932 
1933     loc_desc.deserialize(ref_loc, ref_len, nullptr);
1934 
1935     m_clone_id = loc_desc.m_clone_id;
1936     snapshot_id = loc_desc.m_snapshot_id;
1937 
1938     ut_ad(m_clone_id != CLONE_LOC_INVALID_ID);
1939     ut_ad(snapshot_id != CLONE_LOC_INVALID_ID);
1940   }
1941 
1942   /* Create and attach to snapshot. */
1943   auto err = clone_sys->attach_snapshot(m_clone_handle_type, type, snapshot_id,
1944                                         enable_monitor, snapshot);
1945 
1946   if (err != 0) {
1947     return (err);
1948   }
1949 
1950   /* Initialize clone task manager. */
1951   m_clone_task_manager.init(snapshot);
1952 
1953   m_clone_handle_state = CLONE_STATE_ACTIVE;
1954 
1955   return (0);
1956 }
1957 
get_locator(uint & loc_len)1958 byte *Clone_Handle::get_locator(uint &loc_len) {
1959   Clone_Desc_Locator loc_desc;
1960 
1961   /* Return version locator during initialization. */
1962   if (is_init()) {
1963     loc_len = CLONE_DESC_MAX_BASE_LEN;
1964     return (&m_version_locator[0]);
1965   }
1966 
1967   auto snapshot = m_clone_task_manager.get_snapshot();
1968 
1969   auto heap = snapshot->lock_heap();
1970 
1971   build_descriptor(&loc_desc);
1972 
1973   loc_desc.serialize(m_clone_locator, m_locator_length, nullptr, heap);
1974 
1975   loc_len = m_locator_length;
1976 
1977   snapshot->release_heap(heap);
1978 
1979   return (m_clone_locator);
1980 }
1981 
build_descriptor(Clone_Desc_Locator * loc_desc)1982 void Clone_Handle::build_descriptor(Clone_Desc_Locator *loc_desc) {
1983   Clone_Snapshot *snapshot;
1984   ib_uint64_t snapshot_id = CLONE_LOC_INVALID_ID;
1985   Snapshot_State state = CLONE_SNAPSHOT_NONE;
1986 
1987   snapshot = m_clone_task_manager.get_snapshot();
1988 
1989   if (snapshot) {
1990     state = snapshot->get_state();
1991     snapshot_id = snapshot->get_id();
1992   }
1993 
1994   loc_desc->init(m_clone_id, snapshot_id, state, m_clone_desc_version,
1995                  m_clone_arr_index);
1996 }
1997 
drop_task(THD * thd,uint task_id,int in_err,bool & is_master)1998 bool Clone_Handle::drop_task(THD *thd, uint task_id, int in_err,
1999                              bool &is_master) {
2000   /* No task is added in INIT state. The drop task is still called and
2001   should be ignored. */
2002   if (is_init()) {
2003     /* Only relevant for apply clone master */
2004     ut_ad(!is_copy_clone());
2005     ut_ad(task_id == 0);
2006     is_master = true;
2007     return (false);
2008   }
2009   /* Cannot be in IDLE state as master waits for tasks to drop before idling */
2010   ut_ad(!is_idle());
2011 
2012   /* Close and reset file related information */
2013   auto task = m_clone_task_manager.get_task_by_index(task_id);
2014 
2015   close_file(task);
2016 
2017   ut_ad(mutex_own(clone_sys->get_mutex()));
2018   mutex_exit(clone_sys->get_mutex());
2019 
2020   auto wait_restart = m_clone_task_manager.drop_task(thd, task_id, is_master);
2021   mutex_enter(clone_sys->get_mutex());
2022 
2023   /* Need to wait for restart, if network error */
2024   if (is_copy_clone() && m_allow_restart && wait_restart) {
2025     ut_ad(is_master);
2026     return (true);
2027   }
2028 
2029   return (false);
2030 }
2031 
move_to_next_state(Clone_Task * task,Ha_clone_cbk * callback,Clone_Desc_State * state_desc)2032 int Clone_Handle::move_to_next_state(Clone_Task *task, Ha_clone_cbk *callback,
2033                                      Clone_Desc_State *state_desc) {
2034   auto snapshot = m_clone_task_manager.get_snapshot();
2035   /* Use input state only for apply. */
2036   auto next_state =
2037       is_copy_clone() ? snapshot->get_next_state() : state_desc->m_state;
2038 
2039   Clone_Alert_Func alert_callback;
2040 
2041   if (is_copy_clone()) {
2042     /* Send Keep alive to recipient during long wait. */
2043     alert_callback = [&]() {
2044       auto err = send_keep_alive(task, callback);
2045       return (err);
2046     };
2047   }
2048 
2049   /* Move to new state */
2050   uint num_wait = 0;
2051   auto err = m_clone_task_manager.change_state(task, state_desc, next_state,
2052                                                alert_callback, num_wait);
2053 
2054   /* Need to wait for all other tasks to move over, if any. */
2055   if (num_wait > 0) {
2056     bool is_timeout = false;
2057     int alert_count = 0;
2058     err = Clone_Sys::wait_default(
2059         [&](bool alert, bool &result) {
2060           /* For multi threaded clone, master task does the state change. */
2061           if (task->m_is_master) {
2062             err = m_clone_task_manager.change_state(
2063                 task, state_desc, next_state, alert_callback, num_wait);
2064           } else {
2065             err = m_clone_task_manager.check_state(task, next_state, false, 0,
2066                                                    num_wait);
2067           }
2068           result = (num_wait > 0);
2069 
2070           if (err == 0 && result && alert) {
2071             /* Print messages every 1 minute - default is 5 seconds. */
2072             if (++alert_count == 12) {
2073               alert_count = 0;
2074               ib::info(ER_IB_CLONE_TIMEOUT) << "Clone: master state change "
2075                                                "waiting for workers";
2076             }
2077             if (is_copy_clone()) {
2078               err = send_keep_alive(task, callback);
2079             }
2080           }
2081           return (err);
2082         },
2083         nullptr, is_timeout);
2084 
2085     if (err == 0 && !is_timeout) {
2086       return (0);
2087     }
2088 
2089     if (!task->m_is_master) {
2090       /* Exit from state transition */
2091       err = m_clone_task_manager.check_state(task, next_state, is_timeout, err,
2092                                              num_wait);
2093       if (err != 0 || num_wait == 0) {
2094         return (err);
2095       }
2096     }
2097 
2098     if (err == 0 && is_timeout) {
2099       ut_ad(false);
2100       ib::info(ER_IB_CLONE_TIMEOUT) << "Clone: state change: "
2101                                        "wait for other tasks timed out";
2102 
2103       my_error(ER_INTERNAL_ERROR, MYF(0),
2104                "Clone: state change wait for other tasks timed out: "
2105                "Wait too long for state transition");
2106       return (ER_INTERNAL_ERROR);
2107     }
2108   }
2109   return (err);
2110 }
2111 
open_file(Clone_Task * task,Clone_File_Meta * file_meta,ulint file_type,bool create_file,bool set_and_close)2112 int Clone_Handle::open_file(Clone_Task *task, Clone_File_Meta *file_meta,
2113                             ulint file_type, bool create_file,
2114                             bool set_and_close) {
2115   pfs_os_file_t handle;
2116   os_file_type_t type;
2117   ulint option;
2118 
2119   bool success;
2120   bool exists;
2121   bool read_only;
2122 
2123   /* Check if file exists */
2124   auto status = os_file_status(file_meta->m_file_name, &exists, &type);
2125   if (!status) {
2126     return (0);
2127   }
2128 
2129   if (create_file) {
2130     option = exists ? OS_FILE_OPEN : OS_FILE_CREATE_PATH;
2131     read_only = false;
2132   } else {
2133     ut_ad(exists);
2134     option = OS_FILE_OPEN;
2135     read_only = true;
2136   }
2137 
2138   option |= OS_FILE_ON_ERROR_NO_EXIT;
2139 
2140   handle = os_file_create(innodb_clone_file_key, file_meta->m_file_name, option,
2141                           OS_FILE_NORMAL, file_type, read_only, &success);
2142 
2143   if (success && set_and_close) {
2144     ut_ad(create_file);
2145 
2146     os_file_close(handle);
2147 
2148     if (success) {
2149       return (0);
2150     }
2151   }
2152 
2153   if (!success) {
2154     char errbuf[MYSYS_STRERROR_SIZE];
2155 
2156     int err =
2157         (option == OS_FILE_OPEN) ? ER_CANT_OPEN_FILE : ER_CANT_CREATE_FILE;
2158 
2159     my_error(err, MYF(0), file_meta->m_file_name, errno,
2160              my_strerror(errbuf, sizeof(errbuf), errno));
2161 
2162     return (err);
2163   }
2164 
2165   if (task == nullptr) {
2166     os_file_close(handle);
2167     return (0);
2168   }
2169 
2170   /* Set file descriptor in task. */
2171   close_file(task);
2172   task->m_current_file_des = handle;
2173 
2174   ut_ad(handle.m_file != OS_FILE_CLOSED);
2175 
2176   task->m_file_cache = true;
2177 
2178   /* Set cache to false if direct IO(O_DIRECT) is used. */
2179   if (file_type == OS_CLONE_DATA_FILE) {
2180     task->m_file_cache = !srv_is_direct_io();
2181 
2182     DBUG_EXECUTE_IF("clone_no_zero_copy", task->m_file_cache = false;);
2183   }
2184 
2185   task->m_current_file_index = file_meta->m_file_index;
2186 
2187   return (0);
2188 }
2189 
close_file(Clone_Task * task)2190 int Clone_Handle::close_file(Clone_Task *task) {
2191   bool success = true;
2192 
2193   /* Close file, if opened. */
2194   if (task->m_current_file_des.m_file != OS_FILE_CLOSED) {
2195     success = os_file_close(task->m_current_file_des);
2196   }
2197 
2198   task->m_current_file_des.m_file = OS_FILE_CLOSED;
2199   task->m_current_file_index = 0;
2200   task->m_file_cache = true;
2201 
2202   if (!success) {
2203     my_error(ER_INTERNAL_ERROR, MYF(0), "Innodb error while closing file");
2204     return (ER_INTERNAL_ERROR);
2205   }
2206 
2207   return (0);
2208 }
2209 
file_callback(Ha_clone_cbk * cbk,Clone_Task * task,uint len,bool buf_cbk,uint64_t offset,const char * name,uint line)2210 int Clone_Handle::file_callback(Ha_clone_cbk *cbk, Clone_Task *task, uint len,
2211                                 bool buf_cbk, uint64_t offset
2212 #ifdef UNIV_PFS_IO
2213                                 ,
2214                                 const char *name, uint line
2215 #endif /* UNIV_PFS_IO */
2216 ) {
2217   int err;
2218   Ha_clone_file file;
2219 
2220   /* Platform specific code to set file handle */
2221 #ifdef _WIN32
2222   file.type = Ha_clone_file::FILE_HANDLE;
2223   file.file_handle = static_cast<void *>(task->m_current_file_des.m_file);
2224 #else
2225   file.type = Ha_clone_file::FILE_DESC;
2226   file.file_desc = task->m_current_file_des.m_file;
2227 #endif /* _WIN32 */
2228 
2229   /* Register for PFS IO */
2230 #ifdef UNIV_PFS_IO
2231   PSI_file_locker_state state;
2232   struct PSI_file_locker *locker;
2233   enum PSI_file_operation psi_op;
2234 
2235   locker = nullptr;
2236   psi_op = is_copy_clone() ? PSI_FILE_READ : PSI_FILE_WRITE;
2237 
2238   register_pfs_file_io_begin(&state, locker, task->m_current_file_des, len,
2239                              psi_op, name, line);
2240 #endif /* UNIV_PFS_IO */
2241 
2242   /* Call appropriate callback to transfer data. */
2243   if (is_copy_clone()) {
2244     /* Send data from file. */
2245     err = cbk->file_cbk(file, len);
2246 
2247   } else if (buf_cbk) {
2248     unsigned char *data_buf = nullptr;
2249     uint32_t data_len = 0;
2250     /* Get data buffer */
2251     err = cbk->apply_buffer_cbk(data_buf, data_len);
2252     if (err == 0) {
2253       /* Modify and write data buffer to file. */
2254       err = modify_and_write(task, offset, data_buf, data_len);
2255     }
2256   } else {
2257     /* Write directly to file. */
2258     err = cbk->apply_file_cbk(file);
2259   }
2260 
2261 #ifdef UNIV_PFS_IO
2262   register_pfs_file_io_end(locker, len);
2263 #endif /* UNIV_PFS_IO */
2264 
2265   return (err);
2266 }
2267