1 /*****************************************************************************
2
3 Copyright (c) 2017, 2020, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 /** @file clone/clone0clone.cc
28 Innodb Clone System
29
30 *******************************************************/
31
32 #include "clone0clone.h"
33 #include <string>
34 #ifdef UNIV_DEBUG
35 #include "current_thd.h" /* current_thd */
36 #include "debug_sync.h" /* DBUG_SIGNAL_WAIT_FOR */
37 #endif /* UNIV_DEBUG */
38
39 /** Global Clone System */
40 Clone_Sys *clone_sys = nullptr;
41
42 /** Clone System state */
43 Clone_Sys_State Clone_Sys::s_clone_sys_state = {CLONE_SYS_INACTIVE};
44
45 /** Number of active abort requests */
46 uint Clone_Sys::s_clone_abort_count = 0;
47
48 /** Number of active wait requests */
49 uint Clone_Sys::s_clone_wait_count = 0;
50
Clone_Sys()51 Clone_Sys::Clone_Sys()
52 : m_clone_arr(),
53 m_num_clones(),
54 m_num_apply_clones(),
55 m_snapshot_arr(),
56 m_num_snapshots(),
57 m_num_apply_snapshots(),
58 m_clone_id_generator() {
59 mutex_create(LATCH_ID_CLONE_SYS, &m_clone_sys_mutex);
60 }
61
~Clone_Sys()62 Clone_Sys::~Clone_Sys() {
63 mutex_free(&m_clone_sys_mutex);
64
65 #ifdef UNIV_DEBUG
66 /* Verify that no active clone is present */
67 int idx;
68 for (idx = 0; idx < CLONE_ARR_SIZE; idx++) {
69 ut_ad(m_clone_arr[idx] == nullptr);
70 }
71 ut_ad(m_num_clones == 0);
72 ut_ad(m_num_apply_clones == 0);
73
74 for (idx = 0; idx < SNAPSHOT_ARR_SIZE; idx++) {
75 ut_ad(m_snapshot_arr[idx] == nullptr);
76 }
77 ut_ad(m_num_snapshots == 0);
78 ut_ad(m_num_apply_snapshots == 0);
79
80 #endif /* UNIV_DEBUG */
81 }
82
find_clone(const byte * ref_loc,uint loc_len,Clone_Handle_Type hdl_type)83 Clone_Handle *Clone_Sys::find_clone(const byte *ref_loc, uint loc_len,
84 Clone_Handle_Type hdl_type) {
85 int idx;
86 bool match_found;
87
88 Clone_Desc_Locator loc_desc;
89 Clone_Desc_Locator ref_desc;
90 Clone_Handle *clone_hdl;
91
92 ut_ad(mutex_own(&m_clone_sys_mutex));
93
94 if (ref_loc == nullptr) {
95 return (nullptr);
96 }
97
98 ref_desc.deserialize(ref_loc, loc_len, nullptr);
99
100 match_found = false;
101 clone_hdl = nullptr;
102
103 for (idx = 0; idx < CLONE_ARR_SIZE; idx++) {
104 clone_hdl = m_clone_arr[idx];
105
106 if (clone_hdl == nullptr || clone_hdl->is_init()) {
107 continue;
108 }
109
110 if (clone_hdl->match_hdl_type(hdl_type)) {
111 clone_hdl->build_descriptor(&loc_desc);
112
113 if (loc_desc.match(&ref_desc)) {
114 match_found = true;
115 break;
116 }
117 }
118 }
119
120 if (match_found) {
121 clone_hdl->attach();
122 return (clone_hdl);
123 }
124
125 return (nullptr);
126 }
127
find_free_index(Clone_Handle_Type hdl_type,uint & free_index)128 int Clone_Sys::find_free_index(Clone_Handle_Type hdl_type, uint &free_index) {
129 free_index = CLONE_ARR_SIZE;
130
131 uint target_index = CLONE_ARR_SIZE;
132 Clone_Handle *target_clone = nullptr;
133
134 for (uint idx = 0; idx < CLONE_ARR_SIZE; idx++) {
135 auto clone_hdl = m_clone_arr[idx];
136
137 if (clone_hdl == nullptr) {
138 free_index = idx;
139 break;
140 }
141
142 /* If existing clone has some error, it is on its way to exit. */
143 auto err = clone_hdl->check_error(nullptr);
144 if (hdl_type == CLONE_HDL_COPY && (clone_hdl->is_idle() || err != 0)) {
145 target_clone = clone_hdl;
146 target_index = idx;
147 }
148 }
149
150 if (free_index == CLONE_ARR_SIZE ||
151 (hdl_type == CLONE_HDL_COPY && m_num_clones == MAX_CLONES) ||
152 (hdl_type == CLONE_HDL_APPLY && m_num_apply_clones == MAX_CLONES)) {
153 if (target_clone == nullptr) {
154 my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_CLONES);
155 return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
156 }
157 } else {
158 return (0);
159 }
160
161 /* We can abort idle clone and use the index. */
162 ut_ad(target_clone != nullptr);
163 ut_ad(mutex_own(&m_clone_sys_mutex));
164 ut_ad(hdl_type == CLONE_HDL_COPY);
165
166 target_clone->set_state(CLONE_STATE_ABORT);
167
168 free_index = target_index;
169
170 /* Sleep for 100 milliseconds. */
171 Clone_Msec sleep_time(100);
172 /* Generate alert message every second. */
173 Clone_Sec alert_interval(1);
174 /* Wait for 5 seconds for idle client to abort. */
175 Clone_Sec time_out(5);
176
177 bool is_timeout = false;
178 auto err = Clone_Sys::wait(
179 sleep_time, time_out, alert_interval,
180 [&](bool alert, bool &result) {
181 ut_ad(mutex_own(clone_sys->get_mutex()));
182 auto current_clone = m_clone_arr[target_index];
183 result = (current_clone != nullptr);
184
185 if (thd_killed(nullptr)) {
186 ib::info(ER_IB_CLONE_START_STOP)
187 << "Clone Begin Master wait for abort interrupted";
188 my_error(ER_QUERY_INTERRUPTED, MYF(0));
189 return (ER_QUERY_INTERRUPTED);
190
191 } else if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
192 ib::info(ER_IB_CLONE_START_STOP)
193 << "Clone Begin Master wait for abort interrupted by DDL";
194 my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
195 return (ER_CLONE_DDL_IN_PROGRESS);
196
197 } else if (result) {
198 ut_ad(current_clone->is_abort());
199 }
200
201 if (!result) {
202 ib::info(ER_IB_CLONE_START_STOP) << "Clone Master aborted idle task";
203
204 } else if (alert) {
205 ib::info(ER_IB_CLONE_TIMEOUT)
206 << "Clone Master waiting for idle task abort";
207 }
208 return (0);
209 },
210 clone_sys->get_mutex(), is_timeout);
211
212 if (err == 0 && is_timeout) {
213 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master wait for abort timed out";
214 my_error(ER_INTERNAL_ERROR, MYF(0),
215 "Innodb Clone Copy failed to abort idle clone [timeout]");
216 err = ER_INTERNAL_ERROR;
217 }
218 return (err);
219 }
220
add_clone(const byte * loc,Clone_Handle_Type hdl_type,Clone_Handle * & clone_hdl)221 int Clone_Sys::add_clone(const byte *loc, Clone_Handle_Type hdl_type,
222 Clone_Handle *&clone_hdl) {
223 ut_ad(mutex_own(&m_clone_sys_mutex));
224 ut_ad(m_num_clones <= MAX_CLONES);
225 ut_ad(m_num_apply_clones <= MAX_CLONES);
226
227 auto version = choose_desc_version(loc);
228
229 /* Find a free index to allocate new clone. */
230 uint free_idx;
231 auto err = find_free_index(hdl_type, free_idx);
232 if (err != 0) {
233 return (err);
234 }
235
236 /* Create a new clone. */
237 clone_hdl = UT_NEW(Clone_Handle(hdl_type, version, free_idx), mem_key_clone);
238
239 if (clone_hdl == nullptr) {
240 my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Clone_Handle));
241 return (ER_OUTOFMEMORY);
242 }
243
244 m_clone_arr[free_idx] = clone_hdl;
245
246 if (hdl_type == CLONE_HDL_COPY) {
247 ++m_num_clones;
248 } else {
249 ut_ad(hdl_type == CLONE_HDL_APPLY);
250 ++m_num_apply_clones;
251 }
252
253 clone_hdl->attach();
254
255 return (0);
256 }
257
drop_clone(Clone_Handle * clone_handle)258 void Clone_Sys::drop_clone(Clone_Handle *clone_handle) {
259 ut_ad(mutex_own(&m_clone_sys_mutex));
260
261 if (clone_handle->detach() > 0) {
262 return;
263 }
264
265 auto index = clone_handle->get_index();
266
267 ut_ad(m_clone_arr[index] == clone_handle);
268
269 m_clone_arr[index] = nullptr;
270
271 if (clone_handle->is_copy_clone()) {
272 ut_ad(m_num_clones > 0);
273 --m_num_clones;
274
275 } else {
276 ut_ad(m_num_apply_clones > 0);
277 --m_num_apply_clones;
278 }
279
280 UT_DELETE(clone_handle);
281 }
282
get_clone_by_index(const byte * loc,uint loc_len)283 Clone_Handle *Clone_Sys::get_clone_by_index(const byte *loc, uint loc_len) {
284 Clone_Desc_Locator loc_desc;
285 Clone_Handle *clone_hdl;
286
287 loc_desc.deserialize(loc, loc_len, nullptr);
288
289 #ifdef UNIV_DEBUG
290 Clone_Desc_Header *header = &loc_desc.m_header;
291 ut_ad(header->m_type == CLONE_DESC_LOCATOR);
292 #endif
293 clone_hdl = m_clone_arr[loc_desc.m_clone_index];
294
295 ut_ad(clone_hdl != nullptr);
296
297 return (clone_hdl);
298 }
299
attach_snapshot(Clone_Handle_Type hdl_type,Ha_clone_type clone_type,ib_uint64_t snapshot_id,bool is_pfs_monitor,Clone_Snapshot * & snapshot)300 int Clone_Sys::attach_snapshot(Clone_Handle_Type hdl_type,
301 Ha_clone_type clone_type,
302 ib_uint64_t snapshot_id, bool is_pfs_monitor,
303 Clone_Snapshot *&snapshot) {
304 uint idx;
305 uint free_idx = SNAPSHOT_ARR_SIZE;
306
307 ut_ad(mutex_own(&m_clone_sys_mutex));
308
309 /* Try to attach to an existing snapshot. */
310 for (idx = 0; idx < SNAPSHOT_ARR_SIZE; idx++) {
311 snapshot = m_snapshot_arr[idx];
312
313 if (snapshot != nullptr) {
314 if (snapshot->attach(hdl_type, is_pfs_monitor)) {
315 return (0);
316 }
317 } else if (free_idx == SNAPSHOT_ARR_SIZE) {
318 free_idx = idx;
319 }
320 }
321
322 if (free_idx == SNAPSHOT_ARR_SIZE ||
323 (hdl_type == CLONE_HDL_COPY && m_num_snapshots == MAX_SNAPSHOTS) ||
324 (hdl_type == CLONE_HDL_APPLY && m_num_apply_snapshots == MAX_SNAPSHOTS)) {
325 my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_SNAPSHOTS);
326 return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
327 }
328
329 /* Create a new snapshot. */
330 snapshot = UT_NEW(Clone_Snapshot(hdl_type, clone_type, free_idx, snapshot_id),
331 mem_key_clone);
332
333 if (snapshot == nullptr) {
334 my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Clone_Snapshot));
335 return (ER_OUTOFMEMORY);
336 }
337
338 m_snapshot_arr[free_idx] = snapshot;
339
340 if (hdl_type == CLONE_HDL_COPY) {
341 ++m_num_snapshots;
342 } else {
343 ut_ad(hdl_type == CLONE_HDL_APPLY);
344 ++m_num_apply_snapshots;
345 }
346
347 snapshot->attach(hdl_type, is_pfs_monitor);
348
349 return (0);
350 }
351
detach_snapshot(Clone_Snapshot * snapshot,Clone_Handle_Type hdl_type)352 void Clone_Sys::detach_snapshot(Clone_Snapshot *snapshot,
353 Clone_Handle_Type hdl_type) {
354 uint num_clones;
355
356 ut_ad(mutex_own(&m_clone_sys_mutex));
357 num_clones = snapshot->detach();
358
359 if (num_clones != 0) {
360 return;
361 }
362
363 /* All clones are detached. Drop the snapshot. */
364 uint index;
365
366 index = snapshot->get_index();
367 ut_ad(m_snapshot_arr[index] == snapshot);
368
369 UT_DELETE(snapshot);
370
371 m_snapshot_arr[index] = nullptr;
372
373 if (hdl_type == CLONE_HDL_COPY) {
374 ut_ad(m_num_snapshots > 0);
375 --m_num_snapshots;
376
377 } else {
378 ut_ad(hdl_type == CLONE_HDL_APPLY);
379 ut_ad(m_num_apply_snapshots > 0);
380 --m_num_apply_snapshots;
381 }
382 }
383
check_active_clone(bool print_alert)384 bool Clone_Sys::check_active_clone(bool print_alert) {
385 ut_ad(mutex_own(&m_clone_sys_mutex));
386
387 bool active_clone = false;
388 /* Check for active clone operations. */
389 for (int idx = 0; idx < CLONE_ARR_SIZE; idx++) {
390 auto clone_hdl = m_clone_arr[idx];
391
392 if (clone_hdl != nullptr && clone_hdl->is_copy_clone()) {
393 active_clone = true;
394 break;
395 }
396 }
397
398 if (active_clone && print_alert) {
399 ib::info(ER_IB_CLONE_TIMEOUT) << "DDL waiting for CLONE to abort";
400 }
401 return (active_clone);
402 }
403
mark_abort(bool force)404 bool Clone_Sys::mark_abort(bool force) {
405 ut_ad(mutex_own(&m_clone_sys_mutex));
406
407 /* Check for active clone operations. */
408 auto active_clone = check_active_clone(false);
409
410 /* If active clone is running and force is not set then
411 return without setting abort state. */
412 if (active_clone && !force) {
413 return (false);
414 }
415
416 ++s_clone_abort_count;
417
418 if (s_clone_sys_state != CLONE_SYS_ABORT) {
419 ut_ad(s_clone_abort_count == 1);
420 s_clone_sys_state = CLONE_SYS_ABORT;
421
422 DEBUG_SYNC_C("clone_marked_abort");
423 }
424
425 if (active_clone) {
426 ut_ad(force);
427
428 /* Sleep for 1 second */
429 Clone_Msec sleep_time(Clone_Sec(1));
430 /* Generate alert message every minute. */
431 Clone_Sec alert_time(Clone_Min(1));
432 /* Timeout in 15 minutes - safeguard against hang, should not happen */
433 Clone_Sec time_out(Clone_Min(15));
434
435 bool is_timeout = false;
436
437 wait(
438 sleep_time, time_out, alert_time,
439 [&](bool alert, bool &result) {
440 ut_ad(mutex_own(&m_clone_sys_mutex));
441 result = check_active_clone(alert);
442
443 return (0);
444 },
445 &m_clone_sys_mutex, is_timeout);
446
447 if (is_timeout) {
448 ut_ad(false);
449 ib::warn(ER_IB_CLONE_TIMEOUT) << "DDL wait for CLONE abort timed out"
450 ", Continuing DDL.";
451 }
452 }
453 return (true);
454 }
455
mark_active()456 void Clone_Sys::mark_active() {
457 ut_ad(mutex_own(&m_clone_sys_mutex));
458
459 ut_ad(s_clone_abort_count > 0);
460 --s_clone_abort_count;
461
462 if (s_clone_abort_count == 0) {
463 s_clone_sys_state = CLONE_SYS_ACTIVE;
464 }
465 }
466
mark_wait()467 bool Clone_Sys::mark_wait() {
468 ut_ad(mutex_own(&m_clone_sys_mutex));
469
470 /* Check for active clone operations. */
471 auto active_clone = check_active_clone(false);
472
473 /* If active clone is running return. */
474 if (active_clone) {
475 return (false);
476 }
477
478 /* Let any new clone operation wait till mark_free is called. */
479 ++s_clone_wait_count;
480 return (true);
481 }
482
mark_free()483 void Clone_Sys::mark_free() {
484 ut_ad(mutex_own(&m_clone_sys_mutex));
485 ut_ad(s_clone_wait_count > 0);
486 --s_clone_wait_count;
487 }
488
wait_for_free(THD * thd)489 int Clone_Sys::wait_for_free(THD *thd) {
490 ut_ad(mutex_own(&m_clone_sys_mutex));
491
492 if (s_clone_wait_count == 0) {
493 return (0);
494 }
495
496 auto wait_condition = [&](bool alert, bool &result) {
497 ut_ad(mutex_own(&m_clone_sys_mutex));
498 result = (s_clone_wait_count == 0);
499 if (alert) {
500 ib::info(ER_IB_CLONE_OPERATION)
501 << "CLONE waiting for redo/undo encryption";
502 }
503 if (thd_killed(thd)) {
504 my_error(ER_QUERY_INTERRUPTED, MYF(0));
505 return (ER_QUERY_INTERRUPTED);
506 }
507 return (0);
508 };
509
510 /* Sleep for 100 milliseconds */
511 Clone_Msec sleep_time(100);
512 /* Generate alert message 5 second. */
513 Clone_Sec alert_time(5);
514 /* Timeout in 5 minutes - safeguard against hang, should not happen */
515 Clone_Sec time_out(Clone_Min(5));
516
517 bool is_timeout = false;
518 auto err = wait(sleep_time, time_out, alert_time, wait_condition,
519 &m_clone_sys_mutex, is_timeout);
520
521 if (err != 0) {
522 return (err);
523 }
524
525 if (is_timeout) {
526 ut_ad(false);
527 my_error(ER_INTERNAL_ERROR, MYF(0),
528 "Innodb Clone timeout waiting for background");
529 return (ER_INTERNAL_ERROR);
530 }
531
532 return (0);
533 }
534
get_next_id()535 ib_uint64_t Clone_Sys::get_next_id() {
536 ut_ad(mutex_own(&m_clone_sys_mutex));
537
538 return (++m_clone_id_generator);
539 }
540
541 #ifdef UNIV_DEBUG
debug_wait(uint chunk_num,Clone_Task * task)542 void Clone_Task_Manager::debug_wait(uint chunk_num, Clone_Task *task) {
543 auto state = m_clone_snapshot->get_state();
544 auto nchunks = m_clone_snapshot->get_num_chunks();
545
546 /* Stop somewhere in the middle of current stage */
547 if (!task->m_is_master || task->m_ignore_sync ||
548 (chunk_num != 0 && chunk_num < (nchunks / 2 + 1))) {
549 return;
550 }
551
552 if (state == CLONE_SNAPSHOT_FILE_COPY) {
553 DBUG_SIGNAL_WAIT_FOR(current_thd, "gr_clone_wait", "gr_clone_paused",
554 "gr_clone_continue");
555
556 DEBUG_SYNC_C("clone_file_copy");
557
558 } else if (state == CLONE_SNAPSHOT_PAGE_COPY) {
559 DEBUG_SYNC_C("clone_page_copy");
560
561 } else if (state == CLONE_SNAPSHOT_REDO_COPY) {
562 DEBUG_SYNC_C("clone_redo_copy");
563 }
564
565 task->m_ignore_sync = true;
566 }
567
debug_restart(Clone_Task * task,int in_err,int restart_count)568 int Clone_Task_Manager::debug_restart(Clone_Task *task, int in_err,
569 int restart_count) {
570 auto err = in_err;
571
572 if (err != 0 || restart_count < task->m_debug_counter || !task->m_is_master) {
573 return (err);
574 }
575
576 /* Restart somewhere in the middle of all chunks */
577 if (restart_count == 1) {
578 auto nchunks = m_clone_snapshot->get_num_chunks();
579 auto cur_chunk = task->m_task_meta.m_chunk_num;
580
581 if (cur_chunk != 0 && cur_chunk < (nchunks / 2 + 1)) {
582 return (err);
583 }
584 }
585
586 DBUG_EXECUTE_IF("clone_restart_apply", err = ER_NET_READ_ERROR;);
587
588 if (err != 0) {
589 my_error(err, MYF(0));
590 }
591
592 /* Allow restart from next point */
593 task->m_debug_counter = restart_count + 1;
594
595 return (err);
596 }
597 #endif /* UNIV_DEBUG */
598
init(Clone_Snapshot * snapshot)599 void Clone_Task_Manager::init(Clone_Snapshot *snapshot) {
600 uint idx;
601
602 m_clone_snapshot = snapshot;
603
604 m_current_state = snapshot->get_state();
605
606 /* ACK state is the previous state of current state */
607 if (m_current_state == CLONE_SNAPSHOT_INIT) {
608 m_ack_state = CLONE_SNAPSHOT_NONE;
609 } else {
610 /* If clone is attaching to active snapshot with
611 other concurrent clone */
612 ut_ad(m_current_state == CLONE_SNAPSHOT_FILE_COPY);
613 m_ack_state = CLONE_SNAPSHOT_INIT;
614 }
615
616 m_chunk_info.m_total_chunks = 0;
617
618 m_chunk_info.m_min_unres_chunk = 1;
619 m_chunk_info.m_max_res_chunk = 0;
620
621 /* Initialize all tasks in inactive state. */
622 for (idx = 0; idx < CLONE_MAX_TASKS; idx++) {
623 Clone_Task *task;
624
625 task = m_clone_tasks + idx;
626 task->m_task_state = CLONE_TASK_INACTIVE;
627
628 task->m_serial_desc = nullptr;
629 task->m_alloc_len = 0;
630
631 task->m_current_file_des.m_file = OS_FILE_CLOSED;
632 task->m_current_file_index = 0;
633 task->m_file_cache = true;
634
635 task->m_current_buffer = nullptr;
636 task->m_buffer_alloc_len = 0;
637 task->m_is_master = false;
638 task->m_has_thd = false;
639 task->m_data_size = 0;
640 ut_d(task->m_ignore_sync = false);
641 ut_d(task->m_debug_counter = 2);
642 }
643
644 m_num_tasks = 0;
645 m_num_tasks_finished = 0;
646 m_num_tasks_transit = 0;
647 m_restart_count = 0;
648
649 m_next_state = CLONE_SNAPSHOT_NONE;
650 m_send_state_meta = false;
651 m_transferred_file_meta = false;
652 m_saved_error = 0;
653
654 /* Allocate error file name */
655 auto heap = m_clone_snapshot->lock_heap();
656
657 m_err_file_name = static_cast<char *>(mem_heap_alloc(heap, FN_REFLEN_SE));
658
659 m_err_file_len = FN_REFLEN_SE;
660
661 m_clone_snapshot->release_heap(heap);
662
663 /* Initialize error file name */
664 memset(m_err_file_name, 0, m_err_file_len);
665
666 strncpy(m_err_file_name, "Clone File", m_err_file_len);
667 }
668
reserve_task(THD * thd,uint & task_id)669 void Clone_Task_Manager::reserve_task(THD *thd, uint &task_id) {
670 ut_ad(mutex_own(&m_state_mutex));
671
672 Clone_Task *task = nullptr;
673
674 task_id = 0;
675
676 /* Find inactive task in the array. */
677 for (; task_id < CLONE_MAX_TASKS; task_id++) {
678 task = m_clone_tasks + task_id;
679 auto task_meta = &task->m_task_meta;
680
681 if (task->m_task_state == CLONE_TASK_INACTIVE) {
682 task->m_task_state = CLONE_TASK_ACTIVE;
683
684 task_meta->m_task_index = task_id;
685 task_meta->m_chunk_num = 0;
686 task_meta->m_block_num = 0;
687
688 /* Set first task as master task */
689 if (task_id == 0) {
690 ut_ad(thd != nullptr);
691 task->m_is_master = true;
692 }
693
694 /* Whether the task has an associated user session */
695 task->m_has_thd = (thd != nullptr);
696
697 break;
698 }
699
700 task = nullptr;
701 }
702
703 ut_ad(task != nullptr);
704 }
705
alloc_buffer(Clone_Task * task)706 int Clone_Task_Manager::alloc_buffer(Clone_Task *task) {
707 if (task->m_alloc_len != 0) {
708 /* Task buffers are already allocated in case
709 clone operation is restarted. */
710
711 ut_ad(task->m_buffer_alloc_len != 0);
712 ut_ad(task->m_serial_desc != nullptr);
713 ut_ad(task->m_current_buffer != nullptr);
714
715 return (0);
716 }
717
718 /* Allocate task descriptor. */
719 auto heap = m_clone_snapshot->lock_heap();
720
721 /* Maximum variable length of descriptor. */
722 auto alloc_len =
723 static_cast<uint>(m_clone_snapshot->get_max_file_name_length());
724
725 /* Check with maximum path name length. */
726 if (alloc_len < FN_REFLEN_SE) {
727 alloc_len = FN_REFLEN_SE;
728 }
729
730 /* Maximum fixed length of descriptor */
731 alloc_len += CLONE_DESC_MAX_BASE_LEN;
732
733 /* Add some buffer. */
734 alloc_len += CLONE_DESC_MAX_BASE_LEN;
735
736 ut_ad(task->m_alloc_len == 0);
737 ut_ad(task->m_buffer_alloc_len == 0);
738
739 task->m_alloc_len = alloc_len;
740 task->m_buffer_alloc_len = m_clone_snapshot->get_dyn_buffer_length();
741
742 alloc_len += task->m_buffer_alloc_len;
743
744 alloc_len += CLONE_ALIGN_DIRECT_IO;
745
746 ut_ad(task->m_serial_desc == nullptr);
747
748 task->m_serial_desc = static_cast<byte *>(mem_heap_alloc(heap, alloc_len));
749
750 m_clone_snapshot->release_heap(heap);
751
752 if (task->m_serial_desc == nullptr) {
753 my_error(ER_OUTOFMEMORY, MYF(0), alloc_len);
754 return (ER_OUTOFMEMORY);
755 }
756
757 if (task->m_buffer_alloc_len > 0) {
758 task->m_current_buffer = static_cast<byte *>(ut_align(
759 task->m_serial_desc + task->m_alloc_len, CLONE_ALIGN_DIRECT_IO));
760 }
761
762 return (0);
763 }
764
handle_error_other_task(bool set_error)765 int Clone_Task_Manager::handle_error_other_task(bool set_error) {
766 char errbuf[MYSYS_STRERROR_SIZE];
767
768 if (set_error && m_saved_error != 0) {
769 ib::info(ER_IB_CLONE_OPERATION)
770 << "Clone error from other task code: " << m_saved_error;
771 }
772
773 if (!set_error) {
774 return (m_saved_error);
775 }
776
777 /* Handle shutdown and KILL */
778 if (thd_killed(nullptr)) {
779 my_error(ER_QUERY_INTERRUPTED, MYF(0));
780 return (ER_QUERY_INTERRUPTED);
781 }
782
783 /* Check if DDL has marked for abort. Ignore for client apply. */
784 if ((m_clone_snapshot == nullptr || m_clone_snapshot->is_copy()) &&
785 Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
786 my_error(ER_CLONE_DDL_IN_PROGRESS, MYF(0));
787 return (ER_CLONE_DDL_IN_PROGRESS);
788 }
789
790 switch (m_saved_error) {
791 case ER_CLONE_DDL_IN_PROGRESS:
792 case ER_QUERY_INTERRUPTED:
793 my_error(m_saved_error, MYF(0));
794 break;
795
796 /* Network errors */
797 case ER_NET_PACKET_TOO_LARGE:
798 case ER_NET_PACKETS_OUT_OF_ORDER:
799 case ER_NET_UNCOMPRESS_ERROR:
800 case ER_NET_READ_ERROR:
801 case ER_NET_READ_INTERRUPTED:
802 case ER_NET_ERROR_ON_WRITE:
803 case ER_NET_WRITE_INTERRUPTED:
804 case ER_NET_WAIT_ERROR:
805 my_error(m_saved_error, MYF(0));
806 break;
807
808 /* IO Errors */
809 case ER_CANT_OPEN_FILE:
810 case ER_CANT_CREATE_FILE:
811 case ER_ERROR_ON_READ:
812 case ER_ERROR_ON_WRITE:
813 ut_ad(m_err_file_name != nullptr);
814 ut_ad(m_err_file_len != 0);
815 my_error(m_saved_error, MYF(0), m_err_file_name, errno,
816 my_strerror(errbuf, sizeof(errbuf), errno));
817 break;
818
819 case ER_FILE_EXISTS_ERROR:
820 ut_ad(m_err_file_name != nullptr);
821 ut_ad(m_err_file_len != 0);
822 my_error(m_saved_error, MYF(0), m_err_file_name);
823 break;
824
825 case ER_WRONG_VALUE:
826 ut_ad(m_err_file_name != nullptr);
827 ut_ad(m_err_file_len != 0);
828 my_error(m_saved_error, MYF(0), "file path", m_err_file_name);
829 break;
830
831 case ER_CLONE_DONOR:
832 /* Will get the error message from remote */
833 break;
834
835 case 0:
836 break;
837
838 default:
839 my_error(ER_INTERNAL_ERROR, MYF(0),
840 "Innodb Clone error in concurrent task");
841 }
842
843 return (m_saved_error);
844 }
845
wait_before_add(const byte * ref_loc,uint loc_len)846 bool Clone_Task_Manager::wait_before_add(const byte *ref_loc, uint loc_len) {
847 ut_ad(mutex_own(&m_state_mutex));
848
849 /* 1. Don't wait if master task. */
850 if (m_num_tasks == 0) {
851 return (false);
852 }
853
854 /* 2. Wait for state transition to get over */
855 if (in_transit_state()) {
856 return (true);
857 }
858
859 /* 3. For copy state(donor), wait for the state to reach file copy. */
860 ut_ad(m_current_state != CLONE_SNAPSHOT_NONE);
861 if (ref_loc == nullptr) {
862 return (m_current_state == CLONE_SNAPSHOT_INIT);
863 }
864
865 Clone_Desc_Locator ref_desc;
866 ref_desc.deserialize(ref_loc, loc_len, nullptr);
867
868 ut_ad(m_current_state <= ref_desc.m_state);
869
870 /* 4. For apply state (recipient), wait for apply state to reach
871 the copy state in reference locator. */
872 if (m_current_state != ref_desc.m_state) {
873 return (true);
874 }
875
876 /* 4A. For file copy state, wait for all metadata to be transferred. */
877 if (m_current_state == CLONE_SNAPSHOT_FILE_COPY &&
878 !is_file_metadata_transferred()) {
879 return (true);
880 }
881 return (false);
882 }
883
add_task(THD * thd,const byte * ref_loc,uint loc_len,uint & task_id)884 int Clone_Task_Manager::add_task(THD *thd, const byte *ref_loc, uint loc_len,
885 uint &task_id) {
886 mutex_enter(&m_state_mutex);
887
888 /* Check for error from other tasks */
889 bool raise_error = (thd != nullptr);
890
891 auto err = handle_error_other_task(raise_error);
892
893 if (err != 0) {
894 mutex_exit(&m_state_mutex);
895 return (err);
896 }
897
898 if (wait_before_add(ref_loc, loc_len)) {
899 bool is_timeout = false;
900 int alert_count = 0;
901 err = Clone_Sys::wait_default(
902 [&](bool alert, bool &result) {
903 ut_ad(mutex_own(&m_state_mutex));
904 result = wait_before_add(ref_loc, loc_len);
905
906 /* Check for error from other tasks */
907 err = handle_error_other_task(raise_error);
908
909 if (err == 0 && result && alert) {
910 /* Print messages every 1 minute - default is 5 seconds. */
911 if (++alert_count == 12) {
912 alert_count = 0;
913 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Add task waiting "
914 "for state change";
915 }
916 }
917 return (err);
918 },
919 &m_state_mutex, is_timeout);
920
921 if (err != 0) {
922 mutex_exit(&m_state_mutex);
923 return (err);
924
925 } else if (is_timeout) {
926 ut_ad(false);
927 mutex_exit(&m_state_mutex);
928
929 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Add task timed out";
930
931 my_error(ER_INTERNAL_ERROR, MYF(0),
932 "Clone Add task failed: "
933 "Wait too long for state transition");
934 return (ER_INTERNAL_ERROR);
935 }
936 }
937
938 /* We wait for state transition before adding new task. */
939 ut_ad(!in_transit_state());
940
941 if (m_num_tasks == CLONE_MAX_TASKS) {
942 err = ER_CLONE_TOO_MANY_CONCURRENT_CLONES;
943 my_error(err, MYF(0), CLONE_MAX_TASKS);
944
945 mutex_exit(&m_state_mutex);
946 return (err);
947 }
948
949 reserve_task(thd, task_id);
950 ut_ad(task_id <= m_num_tasks);
951
952 ++m_num_tasks;
953
954 mutex_exit(&m_state_mutex);
955 return (0);
956 }
957
drop_task(THD * thd,uint task_id,bool & is_master)958 bool Clone_Task_Manager::drop_task(THD *thd, uint task_id, bool &is_master) {
959 mutex_enter(&m_state_mutex);
960
961 if (in_transit_state()) {
962 ut_ad(m_num_tasks_transit > 0);
963 --m_num_tasks_transit;
964 }
965
966 ut_ad(m_num_tasks > 0);
967 --m_num_tasks;
968
969 auto task = get_task_by_index(task_id);
970
971 add_incomplete_chunk(task);
972
973 reset_chunk(task);
974
975 ut_ad(task->m_task_state == CLONE_TASK_ACTIVE);
976 task->m_task_state = CLONE_TASK_INACTIVE;
977
978 is_master = task->m_is_master;
979
980 if (!is_master) {
981 mutex_exit(&m_state_mutex);
982 return (false);
983 }
984
985 /* Master needs to wait for other tasks to get dropped */
986 if (m_num_tasks > 0) {
987 bool is_timeout = false;
988 int alert_count = 0;
989 auto err = Clone_Sys::wait_default(
990 [&](bool alert, bool &result) {
991 ut_ad(mutex_own(&m_state_mutex));
992 result = (m_num_tasks > 0);
993
994 if (thd_killed(thd)) {
995 return (ER_QUERY_INTERRUPTED);
996
997 } else if (Clone_Sys::s_clone_sys_state == CLONE_SYS_ABORT) {
998 return (ER_CLONE_DDL_IN_PROGRESS);
999 }
1000 if (alert && result) {
1001 /* Print messages every 1 minute - default is 5 seconds. */
1002 if (++alert_count == 12) {
1003 alert_count = 0;
1004 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master drop task waiting "
1005 "for other tasks";
1006 }
1007 }
1008 return (0);
1009 },
1010 &m_state_mutex, is_timeout);
1011
1012 if (err != 0) {
1013 mutex_exit(&m_state_mutex);
1014 return (false);
1015
1016 } else if (is_timeout) {
1017 ut_ad(false);
1018 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master drop task timed out";
1019
1020 mutex_exit(&m_state_mutex);
1021 return (false);
1022 }
1023 }
1024
1025 mutex_exit(&m_state_mutex);
1026
1027 /* Restart after network error */
1028 auto current_err = handle_error_other_task(false);
1029 if (is_network_error(current_err)) {
1030 return (true);
1031 }
1032 return (false);
1033 }
1034
get_next_chunk()1035 uint32_t Clone_Task_Manager::get_next_chunk() {
1036 auto &max_chunk = m_chunk_info.m_max_res_chunk;
1037 auto &min_chunk = m_chunk_info.m_min_unres_chunk;
1038
1039 ut_ad(max_chunk <= m_chunk_info.m_total_chunks);
1040
1041 if (min_chunk > m_chunk_info.m_total_chunks) {
1042 /* No more chunks left for current state. */
1043 return (0);
1044 }
1045
1046 /* Return the minimum unreserved chunk */
1047 auto ret_chunk = min_chunk;
1048
1049 /* Mark the chunk reserved. The chunk must be unreserved. */
1050 ut_ad(!m_chunk_info.m_reserved_chunks[min_chunk]);
1051 m_chunk_info.m_reserved_chunks[min_chunk] = true;
1052
1053 /* Increase max reserved chunk if needed */
1054 if (max_chunk < min_chunk) {
1055 max_chunk = min_chunk;
1056 }
1057
1058 ut_ad(max_chunk == m_chunk_info.m_reserved_chunks.get_max_set_bit());
1059
1060 /* Set the next unreserved chunk */
1061 while (m_chunk_info.m_reserved_chunks[min_chunk]) {
1062 ++min_chunk;
1063
1064 /* Exit if all chunks are over */
1065 if (min_chunk > max_chunk || min_chunk > m_chunk_info.m_total_chunks) {
1066 ut_ad(min_chunk > m_chunk_info.m_total_chunks ||
1067 !m_chunk_info.m_reserved_chunks[min_chunk]);
1068
1069 break;
1070 }
1071 }
1072
1073 return (ret_chunk);
1074 }
1075
get_next_incomplete_chunk(uint32 & block_num)1076 uint32_t Clone_Task_Manager::get_next_incomplete_chunk(uint32 &block_num) {
1077 block_num = 0;
1078
1079 auto &chunks = m_chunk_info.m_incomplete_chunks;
1080
1081 if (chunks.empty()) {
1082 return (0);
1083 }
1084
1085 auto it = chunks.begin();
1086
1087 auto chunk_num = it->first;
1088
1089 block_num = it->second;
1090
1091 chunks.erase(it);
1092
1093 return (chunk_num);
1094 }
1095
reserve_next_chunk(Clone_Task * task,uint32_t & ret_chunk,uint32_t & ret_block)1096 int Clone_Task_Manager::reserve_next_chunk(Clone_Task *task,
1097 uint32_t &ret_chunk,
1098 uint32_t &ret_block) {
1099 mutex_enter(&m_state_mutex);
1100 ret_chunk = 0;
1101
1102 /* Check for error from other tasks */
1103 auto err = handle_error_other_task(task->m_has_thd);
1104 if (err != 0) {
1105 mutex_exit(&m_state_mutex);
1106 return (err);
1107 }
1108
1109 if (process_inclomplete_chunk()) {
1110 /* Get next incomplete chunk. */
1111 ret_chunk = get_next_incomplete_chunk(ret_block);
1112 ut_ad(ret_chunk != 0);
1113
1114 } else {
1115 /* Get next unreserved chunk. */
1116 ret_block = 0;
1117 ret_chunk = get_next_chunk();
1118 }
1119
1120 reset_chunk(task);
1121 mutex_exit(&m_state_mutex);
1122 return (0);
1123 }
1124
set_chunk(Clone_Task * task,Clone_Task_Meta * new_meta)1125 int Clone_Task_Manager::set_chunk(Clone_Task *task, Clone_Task_Meta *new_meta) {
1126 auto cur_meta = &task->m_task_meta;
1127 int err = 0;
1128
1129 ut_ad(cur_meta->m_task_index == new_meta->m_task_index);
1130 cur_meta->m_task_index = new_meta->m_task_index;
1131
1132 /* Check if this is a new chunk */
1133 if (cur_meta->m_chunk_num != new_meta->m_chunk_num) {
1134 mutex_enter(&m_state_mutex);
1135
1136 /* Mark the current chunk reserved */
1137 m_chunk_info.m_reserved_chunks[new_meta->m_chunk_num] = true;
1138
1139 /* Check and remove the chunk from incomplete chunk list. */
1140 auto &chunks = m_chunk_info.m_incomplete_chunks;
1141
1142 auto key_value = chunks.find(new_meta->m_chunk_num);
1143
1144 if (key_value != chunks.end()) {
1145 ut_ad(key_value->second < new_meta->m_block_num);
1146 chunks.erase(key_value);
1147 }
1148
1149 reset_chunk(task);
1150
1151 /* Check for error from other tasks */
1152 err = handle_error_other_task(task->m_has_thd);
1153
1154 mutex_exit(&m_state_mutex);
1155
1156 cur_meta->m_chunk_num = new_meta->m_chunk_num;
1157
1158 #ifdef UNIV_DEBUG
1159 /* Network failure in the middle of a state */
1160 err = debug_restart(task, err, 1);
1161
1162 /* Wait in the middle of state */
1163 debug_wait(cur_meta->m_chunk_num, task);
1164 #endif /* UNIV_DEBUG */
1165 }
1166
1167 cur_meta->m_block_num = new_meta->m_block_num;
1168
1169 return (err);
1170 }
1171
add_incomplete_chunk(Clone_Task * task)1172 void Clone_Task_Manager::add_incomplete_chunk(Clone_Task *task) {
1173 /* Track incomplete chunks during apply */
1174 if (m_clone_snapshot->is_copy()) {
1175 return;
1176 }
1177
1178 auto &task_meta = task->m_task_meta;
1179
1180 /* The task doesn't have any incomplete chunks */
1181 if (task_meta.m_chunk_num == 0) {
1182 return;
1183 }
1184
1185 auto &chunks = m_chunk_info.m_incomplete_chunks;
1186
1187 chunks[task_meta.m_chunk_num] = task_meta.m_block_num;
1188
1189 ib::info(ER_IB_CLONE_RESTART)
1190 << "Clone Apply add incomplete Chunk = " << task_meta.m_chunk_num
1191 << " Block = " << task_meta.m_block_num
1192 << " Task = " << task_meta.m_task_index;
1193 }
1194
1195 /** Print completed chunk information
1196 @param[in] chunk_info chunk information */
print_chunk_info(Chunk_Info * chunk_info)1197 static void print_chunk_info(Chunk_Info *chunk_info) {
1198 for (auto &chunk : chunk_info->m_incomplete_chunks) {
1199 ib::info(ER_IB_CLONE_RESTART)
1200 << "Incomplete: Chunk = " << chunk.first << " Block = " << chunk.second;
1201 }
1202
1203 auto min = chunk_info->m_reserved_chunks.get_min_unset_bit();
1204 auto max = chunk_info->m_reserved_chunks.get_max_set_bit();
1205
1206 auto size = chunk_info->m_reserved_chunks.size_bits();
1207
1208 ib::info(ER_IB_CLONE_RESTART)
1209 << "Number of Chunks: " << size << " Min = " << min << " Max = " << max;
1210
1211 ut_ad(min != max);
1212
1213 if (max > min) {
1214 ib::info(ER_IB_CLONE_RESTART)
1215 << "Reserved Chunk Information : " << min << " - " << max
1216 << " Chunks: " << max - min + 1;
1217
1218 for (uint32_t index = min; index <= max;) {
1219 uint32_t ind = 0;
1220
1221 const int STR_SIZE = 64;
1222 char str[STR_SIZE + 1];
1223
1224 while (index <= max && ind < STR_SIZE) {
1225 str[ind] = chunk_info->m_reserved_chunks[index] ? '1' : '0';
1226 ++index;
1227 ++ind;
1228 }
1229
1230 ut_ad(ind <= STR_SIZE);
1231 str[ind] = '\0';
1232
1233 ib::info(ER_IB_CLONE_RESTART) << str;
1234 }
1235 }
1236 }
1237
reinit_apply_state(const byte * ref_loc,uint ref_len,byte * & new_loc,uint & new_len,uint & alloc_len)1238 void Clone_Task_Manager::reinit_apply_state(const byte *ref_loc, uint ref_len,
1239 byte *&new_loc, uint &new_len,
1240 uint &alloc_len) {
1241 ut_ad(m_current_state != CLONE_SNAPSHOT_NONE);
1242 ut_ad(!m_clone_snapshot->is_copy());
1243
1244 /* Only master task should be present */
1245 ut_ad(m_num_tasks == 1);
1246
1247 /* Reset State transition information */
1248 reset_transition();
1249
1250 /* Reset Error information */
1251 reset_error();
1252
1253 /* Check if current state is finished and acknowledged */
1254 ut_ad(m_ack_state <= m_current_state);
1255
1256 if (m_ack_state == m_current_state) {
1257 ++m_num_tasks_finished;
1258 }
1259
1260 ++m_restart_count;
1261
1262 switch (m_current_state) {
1263 case CLONE_SNAPSHOT_INIT:
1264 ib::info(ER_IB_CLONE_RESTART) << "Clone Apply Restarting State: INIT";
1265 break;
1266
1267 case CLONE_SNAPSHOT_FILE_COPY:
1268 ib::info(ER_IB_CLONE_OPERATION)
1269 << "Clone Apply Restarting State: FILE COPY";
1270 break;
1271
1272 case CLONE_SNAPSHOT_PAGE_COPY:
1273 ib::info(ER_IB_CLONE_OPERATION)
1274 << "Clone Apply Restarting State: PAGE COPY";
1275 break;
1276
1277 case CLONE_SNAPSHOT_REDO_COPY:
1278 ib::info(ER_IB_CLONE_OPERATION)
1279 << "Clone Apply Restarting State: REDO COPY";
1280 break;
1281
1282 case CLONE_SNAPSHOT_DONE:
1283 ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply Restarting State: DONE";
1284 break;
1285
1286 case CLONE_SNAPSHOT_NONE:
1287 /* fall through */
1288
1289 default:
1290 ut_ad(false);
1291 }
1292
1293 if (m_current_state == CLONE_SNAPSHOT_INIT ||
1294 m_current_state == CLONE_SNAPSHOT_DONE ||
1295 m_current_state == CLONE_SNAPSHOT_NONE) {
1296 new_loc = nullptr;
1297 new_len = 0;
1298 return;
1299 }
1300
1301 /* Add incomplete chunks from master task */
1302 auto task = get_task_by_index(0);
1303
1304 add_incomplete_chunk(task);
1305
1306 /* Reset task information */
1307 mutex_enter(&m_state_mutex);
1308 reset_chunk(task);
1309 mutex_exit(&m_state_mutex);
1310
1311 /* Allocate for locator if required */
1312 Clone_Desc_Locator temp_locator;
1313
1314 temp_locator.deserialize(ref_loc, ref_len, nullptr);
1315
1316 /* Update current state information */
1317 temp_locator.m_state = m_current_state;
1318
1319 /* Update sub-state information */
1320 temp_locator.m_metadata_transferred = m_transferred_file_meta;
1321
1322 auto len = temp_locator.m_header.m_length;
1323 len += static_cast<uint>(m_chunk_info.get_serialized_length(0));
1324
1325 if (len > alloc_len) {
1326 /* Allocate for more for possible reuse */
1327 len = CLONE_DESC_MAX_BASE_LEN;
1328 ut_ad(len >= temp_locator.m_header.m_length);
1329
1330 len += static_cast<uint>(m_chunk_info.get_serialized_length(
1331 static_cast<uint32_t>(CLONE_MAX_TASKS)));
1332
1333 auto heap = m_clone_snapshot->lock_heap();
1334
1335 new_loc = static_cast<byte *>(mem_heap_zalloc(heap, len));
1336 alloc_len = len;
1337
1338 m_clone_snapshot->release_heap(heap);
1339 }
1340
1341 new_len = alloc_len;
1342
1343 temp_locator.serialize(new_loc, new_len, &m_chunk_info, nullptr);
1344
1345 print_chunk_info(&m_chunk_info);
1346 }
1347
reinit_copy_state(const byte * loc,uint loc_len)1348 void Clone_Task_Manager::reinit_copy_state(const byte *loc, uint loc_len) {
1349 ut_ad(m_clone_snapshot->is_copy());
1350 ut_ad(m_num_tasks == 0);
1351
1352 mutex_enter(&m_state_mutex);
1353
1354 /* Reset State transition information */
1355 reset_transition();
1356
1357 /* Reset Error information */
1358 reset_error();
1359
1360 ++m_restart_count;
1361
1362 switch (m_current_state) {
1363 case CLONE_SNAPSHOT_INIT:
1364 ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: INIT";
1365 break;
1366
1367 case CLONE_SNAPSHOT_FILE_COPY:
1368 ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: FILE COPY";
1369 break;
1370
1371 case CLONE_SNAPSHOT_PAGE_COPY:
1372 ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: PAGE COPY";
1373 break;
1374
1375 case CLONE_SNAPSHOT_REDO_COPY:
1376 ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: REDO COPY";
1377 break;
1378
1379 case CLONE_SNAPSHOT_DONE:
1380 ib::info(ER_IB_CLONE_RESTART) << "Clone Restarting State: DONE";
1381 break;
1382
1383 case CLONE_SNAPSHOT_NONE:
1384 /* fall through */
1385
1386 default:
1387 ut_ad(false);
1388 }
1389
1390 if (m_current_state == CLONE_SNAPSHOT_NONE) {
1391 ut_ad(false);
1392 mutex_exit(&m_state_mutex);
1393 return;
1394 }
1395
1396 /* Reset to beginning of current state */
1397 init_state();
1398
1399 /* Compare local and remote state */
1400 Clone_Desc_Locator temp_locator;
1401
1402 temp_locator.deserialize(loc, loc_len, nullptr);
1403
1404 /* If Local state is ahead, we must have finished the
1405 previous state confirmed by ACK. It is enough to
1406 start from current state. */
1407 if (temp_locator.m_state != m_current_state) {
1408 #ifdef UNIV_DEBUG
1409 /* Current state could be just one state ahead */
1410 if (temp_locator.m_state == CLONE_SNAPSHOT_INIT) {
1411 ut_ad(m_current_state == CLONE_SNAPSHOT_FILE_COPY);
1412
1413 } else if (temp_locator.m_state == CLONE_SNAPSHOT_FILE_COPY) {
1414 ut_ad(m_current_state == CLONE_SNAPSHOT_PAGE_COPY);
1415
1416 } else if (temp_locator.m_state == CLONE_SNAPSHOT_PAGE_COPY) {
1417 ut_ad(m_current_state == CLONE_SNAPSHOT_REDO_COPY);
1418
1419 } else if (temp_locator.m_state == CLONE_SNAPSHOT_REDO_COPY) {
1420 ut_ad(m_current_state == CLONE_SNAPSHOT_DONE);
1421
1422 } else {
1423 ut_ad(false);
1424 }
1425 #endif /* UNIV_DEBUG */
1426
1427 /* Apply state is behind. Need to send state metadata */
1428 m_send_state_meta = true;
1429
1430 mutex_exit(&m_state_mutex);
1431 return;
1432 }
1433
1434 m_send_state_meta = false;
1435 m_transferred_file_meta = temp_locator.m_metadata_transferred;
1436
1437 /* Set progress information for current state */
1438 temp_locator.deserialize(loc, loc_len, &m_chunk_info);
1439
1440 m_chunk_info.init_chunk_nums();
1441
1442 mutex_exit(&m_state_mutex);
1443
1444 print_chunk_info(&m_chunk_info);
1445 }
1446
init_state()1447 void Clone_Task_Manager::init_state() {
1448 ut_ad(mutex_own(&m_state_mutex));
1449
1450 auto num_chunks = m_clone_snapshot->get_num_chunks();
1451
1452 auto heap = m_clone_snapshot->lock_heap();
1453
1454 m_chunk_info.m_reserved_chunks.reset(num_chunks, heap);
1455
1456 m_clone_snapshot->release_heap(heap);
1457
1458 m_chunk_info.m_incomplete_chunks.clear();
1459
1460 m_chunk_info.m_min_unres_chunk = 1;
1461 ut_ad(m_chunk_info.m_reserved_chunks.get_min_unset_bit() == 1);
1462
1463 m_chunk_info.m_max_res_chunk = 0;
1464 ut_ad(m_chunk_info.m_reserved_chunks.get_max_set_bit() == 0);
1465
1466 m_chunk_info.m_total_chunks = num_chunks;
1467 }
1468
ack_state(const Clone_Desc_State * state_desc)1469 void Clone_Task_Manager::ack_state(const Clone_Desc_State *state_desc) {
1470 mutex_enter(&m_state_mutex);
1471
1472 m_ack_state = state_desc->m_state;
1473 ut_ad(m_current_state == m_ack_state);
1474 ib::info(ER_IB_CLONE_OPERATION)
1475 << "Clone set state change ACK: " << m_ack_state;
1476
1477 mutex_exit(&m_state_mutex);
1478 }
1479
wait_ack(Clone_Handle * clone,Clone_Task * task,Ha_clone_cbk * callback)1480 int Clone_Task_Manager::wait_ack(Clone_Handle *clone, Clone_Task *task,
1481 Ha_clone_cbk *callback) {
1482 mutex_enter(&m_state_mutex);
1483
1484 ++m_num_tasks_finished;
1485
1486 /* All chunks are finished */
1487 reset_chunk(task);
1488
1489 if (!task->m_is_master) {
1490 mutex_exit(&m_state_mutex);
1491 return (0);
1492 }
1493
1494 int err = 0;
1495
1496 if (m_current_state != m_ack_state) {
1497 bool is_timeout = false;
1498 int alert_count = 0;
1499 err = Clone_Sys::wait_default(
1500 [&](bool alert, bool &result) {
1501 ut_ad(mutex_own(&m_state_mutex));
1502 result = (m_current_state != m_ack_state);
1503
1504 /* Check for error from other tasks */
1505 err = handle_error_other_task(task->m_has_thd);
1506
1507 if (err == 0 && result && alert) {
1508 /* Print messages every 1 minute - default is 5 seconds. */
1509 if (++alert_count == 12) {
1510 alert_count = 0;
1511 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master waiting "
1512 "for state change ACK ";
1513 }
1514 err = clone->send_keep_alive(task, callback);
1515 }
1516 return (err);
1517 },
1518 &m_state_mutex, is_timeout);
1519
1520 /* Wait too long */
1521 if (err == 0 && is_timeout) {
1522 ut_ad(false);
1523 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Master wait for state change ACK"
1524 " timed out";
1525
1526 my_error(ER_INTERNAL_ERROR, MYF(0),
1527 "Innodb clone state ack wait too long");
1528
1529 err = ER_INTERNAL_ERROR;
1530 }
1531 }
1532 mutex_exit(&m_state_mutex);
1533
1534 if (err == 0) {
1535 ib::info(ER_IB_CLONE_OPERATION) << "Clone Master received state change ACK";
1536 }
1537
1538 return (err);
1539 }
1540
finish_state(Clone_Task * task)1541 int Clone_Task_Manager::finish_state(Clone_Task *task) {
1542 mutex_enter(&m_state_mutex);
1543
1544 if (task->m_is_master) {
1545 /* Check if ACK was sent before restart */
1546 if (m_ack_state != m_current_state) {
1547 ut_ad(m_ack_state < m_current_state);
1548 ++m_num_tasks_finished;
1549 } else {
1550 ut_ad(m_restart_count > 0);
1551 }
1552 m_ack_state = m_current_state;
1553
1554 } else {
1555 ++m_num_tasks_finished;
1556 }
1557
1558 /* All chunks are finished */
1559 reset_chunk(task);
1560
1561 /* Check for error from other tasks */
1562 auto err = handle_error_other_task(task->m_has_thd);
1563
1564 if (!task->m_is_master || err != 0) {
1565 mutex_exit(&m_state_mutex);
1566 return (err);
1567 }
1568
1569 ut_ad(task->m_is_master);
1570
1571 #ifdef UNIV_DEBUG
1572 /* Wait before ending state, if needed */
1573 if (!task->m_ignore_sync) {
1574 mutex_exit(&m_state_mutex);
1575 debug_wait(0, task);
1576 mutex_enter(&m_state_mutex);
1577 }
1578 #endif /* UNIV_DEBUG */
1579
1580 if (m_num_tasks_finished < m_num_tasks) {
1581 bool is_timeout = false;
1582 int alert_count = 0;
1583 err = Clone_Sys::wait_default(
1584 [&](bool alert, bool &result) {
1585 ut_ad(mutex_own(&m_state_mutex));
1586 result = (m_num_tasks_finished < m_num_tasks);
1587
1588 /* Check for error from other tasks */
1589 err = handle_error_other_task(task->m_has_thd);
1590
1591 if (err == 0 && result && alert) {
1592 /* Print messages every 1 minute - default is 5 seconds. */
1593 if (++alert_count == 12) {
1594 alert_count = 0;
1595 ib::info(ER_IB_CLONE_TIMEOUT)
1596 << "Clone Apply Master waiting for "
1597 "workers before sending ACK."
1598 << " Total = " << m_num_tasks
1599 << " Finished = " << m_num_tasks_finished;
1600 }
1601 }
1602 return (err);
1603 },
1604 &m_state_mutex, is_timeout);
1605
1606 if (err == 0 && is_timeout) {
1607 ut_ad(false);
1608 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone Apply Master wait timed out";
1609
1610 my_error(ER_INTERNAL_ERROR, MYF(0),
1611 "Clone Apply Master wait timed out before sending ACK");
1612
1613 err = ER_INTERNAL_ERROR;
1614 }
1615 }
1616
1617 mutex_exit(&m_state_mutex);
1618 return (err);
1619 }
1620
change_state(Clone_Task * task,Clone_Desc_State * state_desc,Snapshot_State new_state,Clone_Alert_Func cbk,uint & num_wait)1621 int Clone_Task_Manager::change_state(Clone_Task *task,
1622 Clone_Desc_State *state_desc,
1623 Snapshot_State new_state,
1624 Clone_Alert_Func cbk, uint &num_wait) {
1625 mutex_enter(&m_state_mutex);
1626
1627 num_wait = 0;
1628
1629 /* Check for error from other tasks */
1630 auto err = handle_error_other_task(task->m_has_thd);
1631
1632 if (err != 0) {
1633 mutex_exit(&m_state_mutex);
1634 return (err);
1635 }
1636
1637 /* First requesting task needs to initiate the state transition. */
1638 if (!in_transit_state()) {
1639 m_num_tasks_transit = m_num_tasks;
1640 m_next_state = new_state;
1641 }
1642
1643 /* Master needs to wait for all other tasks. */
1644 if (task->m_is_master && m_num_tasks_transit > 1) {
1645 num_wait = m_num_tasks_transit;
1646
1647 mutex_exit(&m_state_mutex);
1648 return (0);
1649 }
1650
1651 /* Need to wait for transition to next state */
1652 if (!task->m_is_master) {
1653 /* Move the current task over to the next state */
1654 ut_ad(m_num_tasks_transit > 0);
1655 --m_num_tasks_transit;
1656
1657 num_wait = m_num_tasks_transit;
1658 ut_ad(num_wait > 0);
1659
1660 mutex_exit(&m_state_mutex);
1661 return (0);
1662 }
1663
1664 /* Last task requesting the state change. All other tasks have
1665 already moved over to next state and waiting for the transition
1666 to complete. Now it is safe to do the snapshot state transition. */
1667
1668 ut_ad(task->m_is_master);
1669 mutex_exit(&m_state_mutex);
1670
1671 uint num_pending = 0;
1672
1673 if (m_clone_snapshot->is_copy()) {
1674 ib::info(ER_IB_CLONE_OPERATION)
1675 << "Clone State Change : Number of tasks = " << m_num_tasks;
1676 } else {
1677 ib::info(ER_IB_CLONE_OPERATION)
1678 << "Clone Apply State Change : Number of tasks = " << m_num_tasks;
1679 }
1680
1681 err = m_clone_snapshot->change_state(
1682 state_desc, m_next_state, task->m_current_buffer,
1683 task->m_buffer_alloc_len, cbk, num_pending);
1684 if (err != 0) {
1685 return (err);
1686 }
1687
1688 /* Need to wait for other concurrent clone attached to current snapshot. */
1689 if (num_pending > 0) {
1690 bool is_timeout = false;
1691 int alert_count = 0;
1692 err = Clone_Sys::wait_default(
1693 [&](bool alert, bool &result) {
1694 num_pending = m_clone_snapshot->check_state(m_next_state, false);
1695 result = (num_pending > 0);
1696
1697 /* Check for possible shutdown/kill */
1698 mutex_enter(&m_state_mutex);
1699 err = handle_error_other_task(task->m_has_thd);
1700 mutex_exit(&m_state_mutex);
1701
1702 if (err == 0 && result && alert) {
1703 /* Print messages every 1 minute - default is 5 seconds. */
1704 if (++alert_count == 12) {
1705 alert_count = 0;
1706 ib::info(ER_IB_CLONE_TIMEOUT)
1707 << "Clone: master state change waiting for other clone";
1708 }
1709 }
1710 return (err);
1711 },
1712 nullptr, is_timeout);
1713
1714 if (err != 0) {
1715 /* Exit from state transition */
1716 num_pending = m_clone_snapshot->check_state(m_next_state, true);
1717 return (err);
1718
1719 } else if (is_timeout) {
1720 /* Exit from state transition */
1721 num_pending = m_clone_snapshot->check_state(m_next_state, true);
1722 if (num_pending != 0) {
1723 ut_ad(false);
1724 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone: master state change timed out";
1725 my_error(ER_INTERNAL_ERROR, MYF(0),
1726 "Clone: master state change wait for other clones timed out: "
1727 "Wait too long for state transition");
1728 return (ER_INTERNAL_ERROR);
1729 }
1730 }
1731 }
1732
1733 mutex_enter(&m_state_mutex);
1734
1735 /* Check for error from other tasks. Must finish the state transition
1736 even in case of an error. */
1737 err = handle_error_other_task(task->m_has_thd);
1738
1739 m_current_state = m_next_state;
1740 m_next_state = CLONE_SNAPSHOT_NONE;
1741
1742 --m_num_tasks_transit;
1743 /* In case of error, the other tasks might have exited. */
1744 ut_ad(m_num_tasks_transit == 0 || err != 0);
1745 m_num_tasks_transit = 0;
1746
1747 /* For restart, m_num_tasks_finished may not be up to date */
1748 ut_ad(m_num_tasks_finished == m_num_tasks || err != 0);
1749 m_num_tasks_finished = 0;
1750
1751 ut_d(task->m_ignore_sync = false);
1752 ut_d(task->m_debug_counter = 0);
1753
1754 /* Initialize next state after transition. */
1755 init_state();
1756
1757 mutex_exit(&m_state_mutex);
1758
1759 return (err);
1760 }
1761
check_state(Clone_Task * task,Snapshot_State new_state,bool exit_on_wait,int in_err,uint32_t & num_wait)1762 int Clone_Task_Manager::check_state(Clone_Task *task, Snapshot_State new_state,
1763 bool exit_on_wait, int in_err,
1764 uint32_t &num_wait) {
1765 mutex_enter(&m_state_mutex);
1766
1767 num_wait = 0;
1768
1769 if (in_err != 0) {
1770 /* Save error for other tasks */
1771 if (m_saved_error == 0) {
1772 m_saved_error = in_err;
1773 }
1774 /* Mark transit incomplete */
1775 if (in_transit_state()) {
1776 ++m_num_tasks_transit;
1777 }
1778 mutex_exit(&m_state_mutex);
1779 return (in_err);
1780 }
1781
1782 /* Check for error from other tasks */
1783 auto err = handle_error_other_task(task->m_has_thd);
1784
1785 if (err != 0) {
1786 mutex_exit(&m_state_mutex);
1787 return (err);
1788 }
1789
1790 /* Check if current transition is still in progress. */
1791 if (in_transit_state() && new_state == m_next_state) {
1792 num_wait = m_num_tasks_transit;
1793
1794 ut_ad(num_wait > 0);
1795
1796 if (exit_on_wait) {
1797 /* Mark error for other tasks */
1798 m_saved_error = ER_INTERNAL_ERROR;
1799 /* Mark transit incomplete */
1800 ++m_num_tasks_transit;
1801 }
1802 }
1803
1804 mutex_exit(&m_state_mutex);
1805
1806 return (0);
1807 }
1808
Clone_Handle(Clone_Handle_Type handle_type,uint clone_version,uint clone_index)1809 Clone_Handle::Clone_Handle(Clone_Handle_Type handle_type, uint clone_version,
1810 uint clone_index)
1811 : m_clone_handle_type(handle_type),
1812 m_clone_handle_state(CLONE_STATE_INIT),
1813 m_clone_locator(),
1814 m_locator_length(),
1815 m_restart_loc(),
1816 m_restart_loc_len(),
1817 m_clone_desc_version(clone_version),
1818 m_clone_arr_index(clone_index),
1819 m_clone_id(),
1820 m_ref_count(),
1821 m_allow_restart(false),
1822 m_clone_dir(),
1823 m_clone_task_manager() {
1824 mutex_create(LATCH_ID_CLONE_TASK, m_clone_task_manager.get_mutex());
1825
1826 Clone_Desc_Locator loc_desc;
1827 loc_desc.init(0, 0, CLONE_SNAPSHOT_NONE, clone_version, clone_index);
1828
1829 auto loc = &m_version_locator[0];
1830 uint len = CLONE_DESC_MAX_BASE_LEN;
1831
1832 memset(loc, 0, CLONE_DESC_MAX_BASE_LEN);
1833
1834 loc_desc.serialize(loc, len, nullptr, nullptr);
1835
1836 ut_ad(len <= CLONE_DESC_MAX_BASE_LEN);
1837 }
1838
~Clone_Handle()1839 Clone_Handle::~Clone_Handle() {
1840 mutex_free(m_clone_task_manager.get_mutex());
1841
1842 if (!is_init()) {
1843 clone_sys->detach_snapshot(m_clone_task_manager.get_snapshot(),
1844 m_clone_handle_type);
1845 }
1846 ut_ad(m_ref_count == 0);
1847 }
1848
create_clone_directory()1849 int Clone_Handle::create_clone_directory() {
1850 ut_ad(!is_copy_clone());
1851 dberr_t db_err = DB_SUCCESS;
1852 std::string file_name;
1853
1854 if (!replace_datadir()) {
1855 /* Create data directory, if we not replacing the current one. */
1856 db_err = os_file_create_subdirs_if_needed(m_clone_dir);
1857 if (db_err == DB_SUCCESS) {
1858 auto status = os_file_create_directory(m_clone_dir, false);
1859 /* Create mysql schema directory. */
1860 file_name.assign(m_clone_dir);
1861 file_name.append(OS_PATH_SEPARATOR_STR);
1862 if (status) {
1863 file_name.append("mysql");
1864 status = os_file_create_directory(file_name.c_str(), true);
1865 }
1866 if (!status) {
1867 db_err = DB_ERROR;
1868 }
1869 }
1870 file_name.assign(m_clone_dir);
1871 file_name.append(OS_PATH_SEPARATOR_STR);
1872 }
1873
1874 /* Create clone status directory. */
1875 if (db_err == DB_SUCCESS) {
1876 file_name.append(CLONE_FILES_DIR);
1877 auto status = os_file_create_directory(file_name.c_str(), false);
1878 if (!status) {
1879 db_err = DB_ERROR;
1880 }
1881 }
1882 /* Check and report error. */
1883 if (db_err != DB_SUCCESS) {
1884 char errbuf[MYSYS_STRERROR_SIZE];
1885 my_error(ER_CANT_CREATE_DB, MYF(0), m_clone_dir, errno,
1886 my_strerror(errbuf, sizeof(errbuf), errno));
1887
1888 return (ER_CANT_CREATE_DB);
1889 }
1890 return (0);
1891 }
1892
init(const byte * ref_loc,uint ref_len,Ha_clone_type type,const char * data_dir)1893 int Clone_Handle::init(const byte *ref_loc, uint ref_len, Ha_clone_type type,
1894 const char *data_dir) {
1895 ib_uint64_t snapshot_id;
1896 Clone_Snapshot *snapshot;
1897
1898 m_clone_dir = data_dir;
1899
1900 bool enable_monitor = true;
1901
1902 /* Generate unique clone identifiers for copy clone handle. */
1903 if (is_copy_clone()) {
1904 m_clone_id = clone_sys->get_next_id();
1905 snapshot_id = clone_sys->get_next_id();
1906
1907 /* For local clone, monitor while applying data. */
1908 if (ref_loc == nullptr) {
1909 enable_monitor = false;
1910 }
1911
1912 } else {
1913 /* We don't provision instance on which active clone is running. */
1914 if (replace_datadir() && clone_sys->check_active_clone(false)) {
1915 my_error(ER_CLONE_TOO_MANY_CONCURRENT_CLONES, MYF(0), MAX_CLONES);
1916 return (ER_CLONE_TOO_MANY_CONCURRENT_CLONES);
1917 }
1918 /* Return keeping the clone in INIT state. The locator
1919 would only have the version information. */
1920 if (ref_loc == nullptr) {
1921 return (0);
1922 }
1923
1924 auto err = create_clone_directory();
1925 if (err != 0) {
1926 return (err);
1927 }
1928
1929 /* Set clone identifiers from reference locator for apply clone
1930 handle. The reference locator is from copy clone handle. */
1931 Clone_Desc_Locator loc_desc;
1932
1933 loc_desc.deserialize(ref_loc, ref_len, nullptr);
1934
1935 m_clone_id = loc_desc.m_clone_id;
1936 snapshot_id = loc_desc.m_snapshot_id;
1937
1938 ut_ad(m_clone_id != CLONE_LOC_INVALID_ID);
1939 ut_ad(snapshot_id != CLONE_LOC_INVALID_ID);
1940 }
1941
1942 /* Create and attach to snapshot. */
1943 auto err = clone_sys->attach_snapshot(m_clone_handle_type, type, snapshot_id,
1944 enable_monitor, snapshot);
1945
1946 if (err != 0) {
1947 return (err);
1948 }
1949
1950 /* Initialize clone task manager. */
1951 m_clone_task_manager.init(snapshot);
1952
1953 m_clone_handle_state = CLONE_STATE_ACTIVE;
1954
1955 return (0);
1956 }
1957
get_locator(uint & loc_len)1958 byte *Clone_Handle::get_locator(uint &loc_len) {
1959 Clone_Desc_Locator loc_desc;
1960
1961 /* Return version locator during initialization. */
1962 if (is_init()) {
1963 loc_len = CLONE_DESC_MAX_BASE_LEN;
1964 return (&m_version_locator[0]);
1965 }
1966
1967 auto snapshot = m_clone_task_manager.get_snapshot();
1968
1969 auto heap = snapshot->lock_heap();
1970
1971 build_descriptor(&loc_desc);
1972
1973 loc_desc.serialize(m_clone_locator, m_locator_length, nullptr, heap);
1974
1975 loc_len = m_locator_length;
1976
1977 snapshot->release_heap(heap);
1978
1979 return (m_clone_locator);
1980 }
1981
build_descriptor(Clone_Desc_Locator * loc_desc)1982 void Clone_Handle::build_descriptor(Clone_Desc_Locator *loc_desc) {
1983 Clone_Snapshot *snapshot;
1984 ib_uint64_t snapshot_id = CLONE_LOC_INVALID_ID;
1985 Snapshot_State state = CLONE_SNAPSHOT_NONE;
1986
1987 snapshot = m_clone_task_manager.get_snapshot();
1988
1989 if (snapshot) {
1990 state = snapshot->get_state();
1991 snapshot_id = snapshot->get_id();
1992 }
1993
1994 loc_desc->init(m_clone_id, snapshot_id, state, m_clone_desc_version,
1995 m_clone_arr_index);
1996 }
1997
drop_task(THD * thd,uint task_id,int in_err,bool & is_master)1998 bool Clone_Handle::drop_task(THD *thd, uint task_id, int in_err,
1999 bool &is_master) {
2000 /* No task is added in INIT state. The drop task is still called and
2001 should be ignored. */
2002 if (is_init()) {
2003 /* Only relevant for apply clone master */
2004 ut_ad(!is_copy_clone());
2005 ut_ad(task_id == 0);
2006 is_master = true;
2007 return (false);
2008 }
2009 /* Cannot be in IDLE state as master waits for tasks to drop before idling */
2010 ut_ad(!is_idle());
2011
2012 /* Close and reset file related information */
2013 auto task = m_clone_task_manager.get_task_by_index(task_id);
2014
2015 close_file(task);
2016
2017 ut_ad(mutex_own(clone_sys->get_mutex()));
2018 mutex_exit(clone_sys->get_mutex());
2019
2020 auto wait_restart = m_clone_task_manager.drop_task(thd, task_id, is_master);
2021 mutex_enter(clone_sys->get_mutex());
2022
2023 /* Need to wait for restart, if network error */
2024 if (is_copy_clone() && m_allow_restart && wait_restart) {
2025 ut_ad(is_master);
2026 return (true);
2027 }
2028
2029 return (false);
2030 }
2031
move_to_next_state(Clone_Task * task,Ha_clone_cbk * callback,Clone_Desc_State * state_desc)2032 int Clone_Handle::move_to_next_state(Clone_Task *task, Ha_clone_cbk *callback,
2033 Clone_Desc_State *state_desc) {
2034 auto snapshot = m_clone_task_manager.get_snapshot();
2035 /* Use input state only for apply. */
2036 auto next_state =
2037 is_copy_clone() ? snapshot->get_next_state() : state_desc->m_state;
2038
2039 Clone_Alert_Func alert_callback;
2040
2041 if (is_copy_clone()) {
2042 /* Send Keep alive to recipient during long wait. */
2043 alert_callback = [&]() {
2044 auto err = send_keep_alive(task, callback);
2045 return (err);
2046 };
2047 }
2048
2049 /* Move to new state */
2050 uint num_wait = 0;
2051 auto err = m_clone_task_manager.change_state(task, state_desc, next_state,
2052 alert_callback, num_wait);
2053
2054 /* Need to wait for all other tasks to move over, if any. */
2055 if (num_wait > 0) {
2056 bool is_timeout = false;
2057 int alert_count = 0;
2058 err = Clone_Sys::wait_default(
2059 [&](bool alert, bool &result) {
2060 /* For multi threaded clone, master task does the state change. */
2061 if (task->m_is_master) {
2062 err = m_clone_task_manager.change_state(
2063 task, state_desc, next_state, alert_callback, num_wait);
2064 } else {
2065 err = m_clone_task_manager.check_state(task, next_state, false, 0,
2066 num_wait);
2067 }
2068 result = (num_wait > 0);
2069
2070 if (err == 0 && result && alert) {
2071 /* Print messages every 1 minute - default is 5 seconds. */
2072 if (++alert_count == 12) {
2073 alert_count = 0;
2074 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone: master state change "
2075 "waiting for workers";
2076 }
2077 if (is_copy_clone()) {
2078 err = send_keep_alive(task, callback);
2079 }
2080 }
2081 return (err);
2082 },
2083 nullptr, is_timeout);
2084
2085 if (err == 0 && !is_timeout) {
2086 return (0);
2087 }
2088
2089 if (!task->m_is_master) {
2090 /* Exit from state transition */
2091 err = m_clone_task_manager.check_state(task, next_state, is_timeout, err,
2092 num_wait);
2093 if (err != 0 || num_wait == 0) {
2094 return (err);
2095 }
2096 }
2097
2098 if (err == 0 && is_timeout) {
2099 ut_ad(false);
2100 ib::info(ER_IB_CLONE_TIMEOUT) << "Clone: state change: "
2101 "wait for other tasks timed out";
2102
2103 my_error(ER_INTERNAL_ERROR, MYF(0),
2104 "Clone: state change wait for other tasks timed out: "
2105 "Wait too long for state transition");
2106 return (ER_INTERNAL_ERROR);
2107 }
2108 }
2109 return (err);
2110 }
2111
open_file(Clone_Task * task,Clone_File_Meta * file_meta,ulint file_type,bool create_file,bool set_and_close)2112 int Clone_Handle::open_file(Clone_Task *task, Clone_File_Meta *file_meta,
2113 ulint file_type, bool create_file,
2114 bool set_and_close) {
2115 pfs_os_file_t handle;
2116 os_file_type_t type;
2117 ulint option;
2118
2119 bool success;
2120 bool exists;
2121 bool read_only;
2122
2123 /* Check if file exists */
2124 auto status = os_file_status(file_meta->m_file_name, &exists, &type);
2125 if (!status) {
2126 return (0);
2127 }
2128
2129 if (create_file) {
2130 option = exists ? OS_FILE_OPEN : OS_FILE_CREATE_PATH;
2131 read_only = false;
2132 } else {
2133 ut_ad(exists);
2134 option = OS_FILE_OPEN;
2135 read_only = true;
2136 }
2137
2138 option |= OS_FILE_ON_ERROR_NO_EXIT;
2139
2140 handle = os_file_create(innodb_clone_file_key, file_meta->m_file_name, option,
2141 OS_FILE_NORMAL, file_type, read_only, &success);
2142
2143 if (success && set_and_close) {
2144 ut_ad(create_file);
2145
2146 os_file_close(handle);
2147
2148 if (success) {
2149 return (0);
2150 }
2151 }
2152
2153 if (!success) {
2154 char errbuf[MYSYS_STRERROR_SIZE];
2155
2156 int err =
2157 (option == OS_FILE_OPEN) ? ER_CANT_OPEN_FILE : ER_CANT_CREATE_FILE;
2158
2159 my_error(err, MYF(0), file_meta->m_file_name, errno,
2160 my_strerror(errbuf, sizeof(errbuf), errno));
2161
2162 return (err);
2163 }
2164
2165 if (task == nullptr) {
2166 os_file_close(handle);
2167 return (0);
2168 }
2169
2170 /* Set file descriptor in task. */
2171 close_file(task);
2172 task->m_current_file_des = handle;
2173
2174 ut_ad(handle.m_file != OS_FILE_CLOSED);
2175
2176 task->m_file_cache = true;
2177
2178 /* Set cache to false if direct IO(O_DIRECT) is used. */
2179 if (file_type == OS_CLONE_DATA_FILE) {
2180 task->m_file_cache = !srv_is_direct_io();
2181
2182 DBUG_EXECUTE_IF("clone_no_zero_copy", task->m_file_cache = false;);
2183 }
2184
2185 task->m_current_file_index = file_meta->m_file_index;
2186
2187 return (0);
2188 }
2189
close_file(Clone_Task * task)2190 int Clone_Handle::close_file(Clone_Task *task) {
2191 bool success = true;
2192
2193 /* Close file, if opened. */
2194 if (task->m_current_file_des.m_file != OS_FILE_CLOSED) {
2195 success = os_file_close(task->m_current_file_des);
2196 }
2197
2198 task->m_current_file_des.m_file = OS_FILE_CLOSED;
2199 task->m_current_file_index = 0;
2200 task->m_file_cache = true;
2201
2202 if (!success) {
2203 my_error(ER_INTERNAL_ERROR, MYF(0), "Innodb error while closing file");
2204 return (ER_INTERNAL_ERROR);
2205 }
2206
2207 return (0);
2208 }
2209
file_callback(Ha_clone_cbk * cbk,Clone_Task * task,uint len,bool buf_cbk,uint64_t offset,const char * name,uint line)2210 int Clone_Handle::file_callback(Ha_clone_cbk *cbk, Clone_Task *task, uint len,
2211 bool buf_cbk, uint64_t offset
2212 #ifdef UNIV_PFS_IO
2213 ,
2214 const char *name, uint line
2215 #endif /* UNIV_PFS_IO */
2216 ) {
2217 int err;
2218 Ha_clone_file file;
2219
2220 /* Platform specific code to set file handle */
2221 #ifdef _WIN32
2222 file.type = Ha_clone_file::FILE_HANDLE;
2223 file.file_handle = static_cast<void *>(task->m_current_file_des.m_file);
2224 #else
2225 file.type = Ha_clone_file::FILE_DESC;
2226 file.file_desc = task->m_current_file_des.m_file;
2227 #endif /* _WIN32 */
2228
2229 /* Register for PFS IO */
2230 #ifdef UNIV_PFS_IO
2231 PSI_file_locker_state state;
2232 struct PSI_file_locker *locker;
2233 enum PSI_file_operation psi_op;
2234
2235 locker = nullptr;
2236 psi_op = is_copy_clone() ? PSI_FILE_READ : PSI_FILE_WRITE;
2237
2238 register_pfs_file_io_begin(&state, locker, task->m_current_file_des, len,
2239 psi_op, name, line);
2240 #endif /* UNIV_PFS_IO */
2241
2242 /* Call appropriate callback to transfer data. */
2243 if (is_copy_clone()) {
2244 /* Send data from file. */
2245 err = cbk->file_cbk(file, len);
2246
2247 } else if (buf_cbk) {
2248 unsigned char *data_buf = nullptr;
2249 uint32_t data_len = 0;
2250 /* Get data buffer */
2251 err = cbk->apply_buffer_cbk(data_buf, data_len);
2252 if (err == 0) {
2253 /* Modify and write data buffer to file. */
2254 err = modify_and_write(task, offset, data_buf, data_len);
2255 }
2256 } else {
2257 /* Write directly to file. */
2258 err = cbk->apply_file_cbk(file);
2259 }
2260
2261 #ifdef UNIV_PFS_IO
2262 register_pfs_file_io_end(locker, len);
2263 #endif /* UNIV_PFS_IO */
2264
2265 return (err);
2266 }
2267