1 /*****************************************************************************
2
3 Copyright (c) 2017, 2019, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 /** @file clone/clone0apply.cc
28 Innodb apply snapshot data
29
30 *******************************************************/
31
32 #include <fstream>
33 #include <sstream>
34
35 #include "buf0dump.h"
36 #include "clone0api.h"
37 #include "clone0clone.h"
38 #include "dict0dict.h"
39 #include "log0log.h"
40 #include "sql/handler.h"
41
get_file_from_desc(Clone_File_Meta * & file_desc,const char * data_dir,bool desc_create,bool & desc_exists)42 int Clone_Snapshot::get_file_from_desc(Clone_File_Meta *&file_desc,
43 const char *data_dir, bool desc_create,
44 bool &desc_exists) {
45 int err = 0;
46
47 mutex_enter(&m_snapshot_mutex);
48
49 auto idx = file_desc->m_file_index;
50
51 ut_ad(m_snapshot_handle_type == CLONE_HDL_APPLY);
52
53 ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY ||
54 m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY);
55
56 Clone_File_Vec &file_vector = (m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY)
57 ? m_data_file_vector
58 : m_redo_file_vector;
59
60 desc_exists = false;
61
62 /* File metadata is already there, possibly sent by another task. */
63 if (file_vector[idx] != nullptr) {
64 file_desc = file_vector[idx];
65 desc_exists = true;
66
67 } else if (desc_create) {
68 /* Create the descriptor. */
69 err = create_desc(data_dir, file_desc);
70 }
71
72 mutex_exit(&m_snapshot_mutex);
73
74 return (err);
75 }
76
update_file_name(const char * data_dir,Clone_File_Meta * file_desc,char * path,size_t path_len)77 int Clone_Snapshot::update_file_name(const char *data_dir,
78 Clone_File_Meta *file_desc, char *path,
79 size_t path_len) {
80 auto space_id = file_desc->m_space_id;
81
82 if (data_dir != nullptr || m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
83 return (0);
84 }
85
86 /* Update buffer pool dump file path for provisioning. */
87 if (file_desc->m_space_id == dict_sys_t::s_invalid_space_id) {
88 ut_ad(0 == strcmp(file_desc->m_file_name, SRV_BUF_DUMP_FILENAME_DEFAULT));
89 buf_dump_generate_path(path, path_len);
90 file_desc->m_file_name = path;
91 file_desc->m_file_name_len = strlen(path) + 1;
92 return (0);
93 }
94
95 /* Change name to system configured file when replacing current directory. */
96 if (!fsp_is_system_tablespace(space_id)) {
97 return (0);
98 }
99
100 /* Find out the node index of the file within system tablespace. */
101 auto loop_index = file_desc->m_file_index;
102 decltype(loop_index) node_index = 0;
103
104 while (loop_index > 0) {
105 --loop_index;
106 auto cur_desc = m_data_file_vector[loop_index];
107 /* Loop through all files of current tablespace. */
108 if (cur_desc->m_space_id != space_id) {
109 break;
110 }
111 ++node_index;
112 }
113
114 auto last_file_index =
115 static_cast<decltype(node_index)>(srv_sys_space.m_files.size() - 1);
116
117 /* Check if the file is beyond maximum configured files. */
118 if (node_index > last_file_index) {
119 std::ostringstream err_strm;
120 err_strm << "innodb_data_file_path: Recipient file count: "
121 << last_file_index + 1 << " is less than Donor file count.";
122
123 std::string err_str(err_strm.str());
124
125 my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
126
127 return (ER_CLONE_SYS_CONFIG);
128 }
129
130 auto &file = srv_sys_space.m_files[node_index];
131 page_size_t page_sz(srv_sys_space.flags());
132
133 auto size_bytes = static_cast<uint64_t>(file.size());
134 size_bytes *= page_sz.physical();
135
136 /* Check if the file size matches with configured files. */
137 if (file_desc->m_file_size != size_bytes) {
138 /* For last file it could mismatch if auto extend is specified. */
139 if (node_index != last_file_index ||
140 !srv_sys_space.can_auto_extend_last_file()) {
141 std::ostringstream err_strm;
142
143 err_strm << "innodb_data_file_path: Recipient value for " << node_index
144 << "th file size: " << size_bytes
145 << " doesn't match Donor file size: " << file_desc->m_file_size;
146
147 std::string err_str(err_strm.str());
148
149 my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
150
151 return (ER_CLONE_SYS_CONFIG);
152 }
153 }
154 /* Change filename to currently configured name. */
155 file_desc->m_file_name = file.filepath();
156 file_desc->m_file_name_len = strlen(file_desc->m_file_name) + 1;
157
158 return (0);
159 }
160
compute_path_length(const char * data_dir,const Clone_File_Meta * file_desc)161 size_t Clone_Snapshot::compute_path_length(const char *data_dir,
162 const Clone_File_Meta *file_desc) {
163 bool is_absolute_path = false;
164 auto alloc_len = sizeof(Clone_File_Meta);
165
166 alloc_len += sizeof(CLONE_INNODB_REPLACED_FILE_EXTN);
167
168 if (file_desc->m_file_name == nullptr) {
169 alloc_len += MAX_LOG_FILE_NAME;
170 } else {
171 alloc_len += file_desc->m_file_name_len;
172 std::string name;
173 name.assign(file_desc->m_file_name, file_desc->m_file_name_len);
174 is_absolute_path = Fil_path::is_absolute_path(name);
175 }
176
177 /* For absolute path, name length is the total length. */
178 if (is_absolute_path) {
179 return (alloc_len);
180 }
181
182 /* Add data directory length for relative path. */
183 if (data_dir != nullptr) {
184 alloc_len += strlen(data_dir);
185 ++alloc_len;
186 return (alloc_len);
187 }
188
189 /* While replacing current data directory, calculate length
190 based on current system configuration. */
191
192 /* Use server redo file location */
193 if (m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
194 alloc_len += strlen(srv_log_group_home_dir);
195 ++alloc_len;
196 return (alloc_len);
197 }
198
199 ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY);
200
201 /* Use server undo file location */
202 if (fsp_is_undo_tablespace(file_desc->m_space_id)) {
203 alloc_len += strlen(srv_undo_dir);
204 ++alloc_len;
205 }
206 return (alloc_len);
207 }
208
handle_existing_file(bool replace,Clone_File_Meta * file_desc)209 int Clone_Snapshot::handle_existing_file(bool replace,
210 Clone_File_Meta *file_desc) {
211 /* For undo tablespace, check for duplicate file name. Currently it
212 is possible to create multiple undo tablespaces of same name under
213 different directory. This should not be recommended and in future
214 we aim to disallow specifying file name for tablespaces and generate
215 it internally based on space ID. Till that time, Clone needs to identify
216 and disallow undo tablespaces of same name as Clone creates all undo
217 tablespaces under innodb_undo_directory configuration in recipient. */
218 if (fsp_is_undo_tablespace(file_desc->m_space_id)) {
219 std::string clone_file(file_desc->m_file_name);
220 clone_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
221
222 for (auto undo_index : m_undo_file_indexes) {
223 auto undo_meta = m_data_file_vector[undo_index];
224 if (undo_meta == nullptr) {
225 continue;
226 }
227
228 /* undo_meta: already added undo file with or without #clone extension.
229 The #clone extension is present when recipient also has the same file.
230 file_desc: current undo file name without #clone extension.
231 clone_file: current undo file name with #clone extension.
232 Since the existing undo file may or may not have the #clone extension
233 we need to match both. */
234 if (0 == strcmp(undo_meta->m_file_name, file_desc->m_file_name) ||
235 0 == strcmp(undo_meta->m_file_name, clone_file.c_str())) {
236 std::ostringstream err_strm;
237 err_strm << "Found multiple undo files with same name: "
238 << file_desc->m_file_name;
239 std::string err_str(err_strm.str());
240 my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
241 return (ER_CLONE_SYS_CONFIG);
242 }
243 }
244 m_undo_file_indexes.push_back(file_desc->m_file_index);
245 ut_ad(m_undo_file_indexes.size() <= FSP_MAX_UNDO_TABLESPACES);
246 }
247
248 std::string file_name;
249 file_name.assign(file_desc->m_file_name);
250
251 auto type = Fil_path::get_file_type(file_name);
252
253 /* Nothing to do if file doesn't exist */
254 if (type == OS_FILE_TYPE_MISSING) {
255 int err = 0;
256 if (replace) {
257 /* Add file to new file list to enable rollback. */
258 err = clone_add_to_list_file(CLONE_INNODB_NEW_FILES,
259 file_desc->m_file_name);
260 }
261 return (err);
262 }
263
264 if (type != OS_FILE_TYPE_FILE) {
265 /* Either the stat() call failed or the name is a
266 directory/block device, or permission error etc. */
267 char errbuf[MYSYS_STRERROR_SIZE];
268 my_error(ER_ERROR_ON_WRITE, MYF(0), file_name.c_str(), errno,
269 my_strerror(errbuf, sizeof(errbuf), errno));
270 return (ER_ERROR_ON_WRITE);
271 }
272
273 ut_a(type == OS_FILE_TYPE_FILE);
274
275 /* For cloning to different data directory, we must ensure that the
276 file is not present. This would always fail for local clone. */
277 if (!replace) {
278 my_error(ER_FILE_EXISTS_ERROR, MYF(0), file_name.c_str());
279 return (ER_FILE_EXISTS_ERROR);
280 }
281
282 /* Save original data file name. */
283 std::string data_file(file_desc->m_file_name);
284
285 /* For clone to current data directory, we need to clone system files
286 to a file with different name and then move back during restart. */
287 auto file_extn_loc = const_cast<char *>(file_desc->m_file_name);
288 file_extn_loc += file_desc->m_file_name_len;
289
290 /* Overwrite null terminator. */
291 --file_extn_loc;
292 strcpy(file_extn_loc, CLONE_INNODB_REPLACED_FILE_EXTN);
293
294 file_desc->m_file_name_len += sizeof(CLONE_INNODB_REPLACED_FILE_EXTN);
295
296 /* Check that file with clone extension is not present */
297 file_name.assign(file_desc->m_file_name);
298 type = Fil_path::get_file_type(file_name);
299
300 if (type != OS_FILE_TYPE_MISSING) {
301 my_error(ER_FILE_EXISTS_ERROR, MYF(0), file_name.c_str());
302 return (ER_FILE_EXISTS_ERROR);
303 }
304
305 /* Add file name to files to be replaced before recovery. */
306 auto err =
307 clone_add_to_list_file(CLONE_INNODB_REPLACED_FILES, data_file.c_str());
308 return (err);
309 }
310
build_file_path(const char * data_dir,ulint alloc_size,Clone_File_Meta * & file_desc)311 int Clone_Snapshot::build_file_path(const char *data_dir, ulint alloc_size,
312 Clone_File_Meta *&file_desc) {
313 /* Check if data directory is being replaced. */
314 bool replace_dir = (data_dir == nullptr);
315
316 /* Allocate for file path string. */
317 auto path = static_cast<char *>(mem_heap_alloc(m_snapshot_heap, alloc_size));
318
319 if (path == nullptr) {
320 my_error(ER_OUTOFMEMORY, MYF(0), alloc_size);
321 return (ER_OUTOFMEMORY);
322 }
323
324 /* Copy file metadata */
325 auto file_meta = reinterpret_cast<Clone_File_Meta *>(path);
326 path += sizeof(Clone_File_Meta);
327 *file_meta = *file_desc;
328
329 bool is_absolute_path = false;
330 std::string file_name;
331
332 /* Check if absolute or relative path. */
333 if (file_desc->m_file_name != nullptr) {
334 file_name.assign(file_desc->m_file_name, file_desc->m_file_name_len);
335 is_absolute_path = Fil_path::is_absolute_path(file_name);
336 }
337
338 file_meta->m_file_name = static_cast<const char *>(path);
339
340 /* Copy path and file name together for absolute path. */
341 if (is_absolute_path) {
342 ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY);
343 auto is_hard_path = test_if_hard_path(file_desc->m_file_name);
344
345 /* Check if the absolute path is not in right format */
346 if (is_hard_path == 0) {
347 my_error(ER_WRONG_VALUE, MYF(0), "file path", file_desc->m_file_name);
348 return (ER_WRONG_VALUE);
349 }
350
351 strcpy(path, file_desc->m_file_name);
352
353 auto err = handle_existing_file(replace_dir, file_meta);
354
355 file_desc = file_meta;
356
357 return (err);
358 }
359
360 const char *file_path = data_dir;
361
362 /* Use configured path when cloning into current data directory. */
363 if (file_path == nullptr) {
364 /* Get file path from redo configuration. */
365 if (m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
366 file_path = srv_log_group_home_dir;
367
368 /* Get file path from undo configuration. */
369 } else if (fsp_is_undo_tablespace(file_desc->m_space_id)) {
370 file_path = srv_undo_dir;
371 }
372 }
373
374 /* Copy file path. */
375 if (file_path != nullptr) {
376 auto path_len = strlen(file_path);
377 strcpy(path, file_path);
378
379 /* Add path separator at the end of file path, if not there. */
380 if (file_path[path_len - 1] != OS_PATH_SEPARATOR) {
381 path[path_len] = OS_PATH_SEPARATOR;
382 ++path;
383 }
384 path += path_len;
385 }
386
387 /* Copy file name */
388 if (m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
389 /* This is redo file. Use standard name. */
390 snprintf(path, MAX_LOG_FILE_NAME, "%s%u", ib_logfile_basename,
391 file_desc->m_file_index);
392 } else {
393 ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY);
394 ut_ad(file_desc->m_file_name != nullptr);
395 /* For relative path remove "./" if there. */
396 if (Fil_path::has_prefix(file_name, Fil_path::DOT_SLASH)) {
397 file_name.erase(0, 2);
398 }
399
400 /* Copy adjusted file name */
401 strcpy(path, file_name.c_str());
402 }
403
404 file_meta->m_file_name_len = strlen(file_meta->m_file_name) + 1;
405
406 /* Check and handle when file is already present in recipient. */
407 auto err = handle_existing_file(replace_dir, file_meta);
408 file_desc = file_meta;
409
410 return (err);
411 }
412
create_desc(const char * data_dir,Clone_File_Meta * & file_desc)413 int Clone_Snapshot::create_desc(const char *data_dir,
414 Clone_File_Meta *&file_desc) {
415 /* Update file name from configuration for system space */
416 char path[OS_FILE_MAX_PATH];
417 auto err = update_file_name(data_dir, file_desc, &path[0], sizeof(path));
418
419 if (err != 0) {
420 return (err);
421 }
422
423 /* Find out length of complete path string for file */
424 auto alloc_size =
425 static_cast<ulint>(compute_path_length(data_dir, file_desc));
426
427 /* Build complete path for the new file to be added. */
428 err = build_file_path(data_dir, alloc_size, file_desc);
429
430 return (err);
431 }
432
add_file_from_desc(Clone_File_Meta * & file_desc)433 bool Clone_Snapshot::add_file_from_desc(Clone_File_Meta *&file_desc) {
434 mutex_enter(&m_snapshot_mutex);
435
436 ut_ad(m_snapshot_handle_type == CLONE_HDL_APPLY);
437
438 if (m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY) {
439 m_data_file_vector[file_desc->m_file_index] = file_desc;
440 } else {
441 ut_ad(m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY);
442 m_redo_file_vector[file_desc->m_file_index] = file_desc;
443 }
444
445 mutex_exit(&m_snapshot_mutex);
446
447 /** Check if it the last file */
448 if (file_desc->m_file_index == m_num_data_files - 1) {
449 return true;
450 }
451
452 return (false);
453 }
454
apply_task_metadata(Clone_Task * task,Ha_clone_cbk * callback)455 int Clone_Handle::apply_task_metadata(Clone_Task *task,
456 Ha_clone_cbk *callback) {
457 ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
458 uint desc_len = 0;
459 auto serial_desc = callback->get_data_desc(&desc_len);
460
461 Clone_Desc_Task_Meta task_desc;
462 auto success = task_desc.deserialize(serial_desc, desc_len);
463
464 if (!success) {
465 ut_ad(false);
466 int err = ER_CLONE_PROTOCOL;
467 my_error(err, MYF(0), "Wrong Clone RPC: Invalid Task Descriptor");
468 return (err);
469 }
470 task->m_task_meta = task_desc.m_task_meta;
471 return (0);
472 }
473
check_space()474 int Clone_Handle::check_space() {
475 /* Do space check only during file copy. */
476 if (m_clone_task_manager.get_state() != CLONE_SNAPSHOT_FILE_COPY) {
477 return (0);
478 }
479 uint64_t free_space;
480 auto MySQL_datadir_abs_path = MySQL_datadir_path.abs_path();
481 auto data_dir =
482 (replace_datadir() ? MySQL_datadir_abs_path.c_str() : get_datadir());
483
484 auto db_err = os_get_free_space(data_dir, free_space);
485 /* We skip space check if the OS interface returns error. */
486 if (db_err != DB_SUCCESS) {
487 ib::warn(ER_IB_CLONE_VALIDATE)
488 << "Clone could not validate available free space";
489 return (0);
490 }
491
492 auto snapshot = m_clone_task_manager.get_snapshot();
493 auto bytes_disk = snapshot->get_disk_estimate();
494
495 std::string avaiable_space;
496 std::string clone_space;
497 ut_format_byte_value(bytes_disk, clone_space);
498 ut_format_byte_value(free_space, avaiable_space);
499
500 int err = 0;
501 if (bytes_disk > free_space) {
502 err = ER_CLONE_DISK_SPACE;
503 my_error(err, MYF(0), clone_space.c_str(), avaiable_space.c_str());
504 }
505
506 ib::info(ER_IB_CLONE_VALIDATE)
507 << "Clone estimated size: " << clone_space.c_str()
508 << " Available space: " << avaiable_space.c_str();
509 return (err);
510 }
511
apply_state_metadata(Clone_Task * task,Ha_clone_cbk * callback)512 int Clone_Handle::apply_state_metadata(Clone_Task *task,
513 Ha_clone_cbk *callback) {
514 int err = 0;
515 uint desc_len = 0;
516 auto serial_desc = callback->get_data_desc(&desc_len);
517
518 Clone_Desc_State state_desc;
519 auto success = state_desc.deserialize(serial_desc, desc_len);
520
521 if (!success) {
522 ut_ad(false);
523 err = ER_CLONE_PROTOCOL;
524 my_error(err, MYF(0), "Wrong Clone RPC: Invalid State Descriptor");
525 return (err);
526 }
527 if (m_clone_handle_type == CLONE_HDL_COPY) {
528 ut_ad(state_desc.m_is_ack);
529 m_clone_task_manager.ack_state(&state_desc);
530 return (0);
531 }
532
533 ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
534
535 /* ACK descriptor is sent for keeping the connection alive. */
536 if (state_desc.m_is_ack) {
537 return (0);
538 }
539
540 /* Reset current chunk information */
541 auto &task_meta = task->m_task_meta;
542 task_meta.m_chunk_num = 0;
543 task_meta.m_block_num = 0;
544
545 /* Move to the new state */
546 if (state_desc.m_is_start) {
547 #ifdef UNIV_DEBUG
548 /* Network failure before moving to new state */
549 err = m_clone_task_manager.debug_restart(task, err, 5);
550 #endif /* UNIV_DEBUG */
551
552 /** Notify state change via callback. */
553 notify_state_change(task, callback, &state_desc);
554
555 err = move_to_next_state(task, nullptr, &state_desc);
556
557 #ifdef UNIV_DEBUG
558 /* Network failure after moving to new state */
559 err = m_clone_task_manager.debug_restart(task, err, 0);
560 #endif /* UNIV_DEBUG */
561
562 /* Check if enough space available on disk */
563 if (err == 0) {
564 err = check_space();
565 }
566
567 return (err);
568 }
569
570 /* It is the end of current state. Close active file. */
571 err = close_file(task);
572
573 #ifdef UNIV_DEBUG
574 /* Network failure before finishing state */
575 err = m_clone_task_manager.debug_restart(task, err, 2);
576 #endif /* UNIV_DEBUG */
577
578 if (err != 0) {
579 return (err);
580 }
581
582 ut_ad(state_desc.m_state == m_clone_task_manager.get_state());
583
584 /* Mark current state finished for the task */
585 err = m_clone_task_manager.finish_state(task);
586
587 #ifdef UNIV_DEBUG
588 /* Network failure before sending ACK */
589 err = m_clone_task_manager.debug_restart(task, err, 3);
590 #endif /* UNIV_DEBUG */
591
592 /* Send acknowledgement back to remote server */
593 if (err == 0 && task->m_is_master) {
594 err = ack_state_metadata(task, callback, &state_desc);
595
596 if (err != 0) {
597 ib::info(ER_IB_CLONE_OPERATION)
598 << "Clone Apply Master ACK finshed state: " << state_desc.m_state;
599 }
600 }
601
602 #ifdef UNIV_DEBUG
603 /* Network failure after sending ACK */
604 err = m_clone_task_manager.debug_restart(task, err, 4);
605 #endif /* UNIV_DEBUG */
606
607 return (err);
608 }
609
notify_state_change(Clone_Task * task,Ha_clone_cbk * callback,Clone_Desc_State * state_desc)610 void Clone_Handle::notify_state_change(Clone_Task *task, Ha_clone_cbk *callback,
611 Clone_Desc_State *state_desc) {
612 if (!task->m_is_master) {
613 return;
614 }
615 callback->mark_state_change(state_desc->m_estimate);
616 callback->buffer_cbk(nullptr, 0);
617 callback->clear_flags();
618 }
619
ack_state_metadata(Clone_Task * task,Ha_clone_cbk * callback,Clone_Desc_State * state_desc)620 int Clone_Handle::ack_state_metadata(Clone_Task *task, Ha_clone_cbk *callback,
621 Clone_Desc_State *state_desc) {
622 ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
623
624 state_desc->m_is_ack = true;
625
626 byte desc_buf[CLONE_DESC_MAX_BASE_LEN];
627
628 auto serial_desc = &desc_buf[0];
629 uint desc_len = CLONE_DESC_MAX_BASE_LEN;
630
631 state_desc->serialize(serial_desc, desc_len, nullptr);
632
633 callback->set_data_desc(serial_desc, desc_len);
634 callback->clear_flags();
635
636 auto err = callback->buffer_cbk(nullptr, 0);
637
638 return (err);
639 }
640
apply_file_metadata(Clone_Task * task,Ha_clone_cbk * callback)641 int Clone_Handle::apply_file_metadata(Clone_Task *task,
642 Ha_clone_cbk *callback) {
643 ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
644
645 uint desc_len = 0;
646 auto serial_desc = callback->get_data_desc(&desc_len);
647
648 Clone_Desc_File_MetaData file_desc;
649 auto success = file_desc.deserialize(serial_desc, desc_len);
650
651 if (!success) {
652 ut_ad(false);
653 int err = ER_CLONE_PROTOCOL;
654 my_error(err, MYF(0), "Wrong Clone RPC: Invalid File Descriptor");
655 return (err);
656 }
657 auto file_meta = &file_desc.m_file_meta;
658 auto snapshot = m_clone_task_manager.get_snapshot();
659
660 ut_ad(snapshot->get_state() == file_desc.m_state);
661
662 bool desc_exists;
663
664 /* Check file metadata entry based on the descriptor. */
665 auto err =
666 snapshot->get_file_from_desc(file_meta, m_clone_dir, false, desc_exists);
667 if (err != 0 || desc_exists) {
668 return (err);
669 }
670
671 mutex_enter(m_clone_task_manager.get_mutex());
672
673 /* Create file metadata entry based on the descriptor. */
674 err = snapshot->get_file_from_desc(file_meta, m_clone_dir, true, desc_exists);
675 file_meta->m_punch_hole = false;
676
677 if (err != 0 || desc_exists) {
678 mutex_exit(m_clone_task_manager.get_mutex());
679
680 /* Save error with file name. */
681 if (err != 0) {
682 m_clone_task_manager.set_error(err, file_meta->m_file_name);
683 }
684 return (err);
685 }
686
687 if (file_desc.m_state == CLONE_SNAPSHOT_FILE_COPY) {
688 auto file_type = OS_CLONE_DATA_FILE;
689
690 if (file_meta->m_space_id == dict_sys_t::s_invalid_space_id) {
691 file_type = OS_CLONE_LOG_FILE;
692 }
693
694 /* Create the file */
695 err = open_file(nullptr, file_meta, file_type, true, false);
696
697 /* If last file is received, set all file metadata transferred */
698 if (snapshot->add_file_from_desc(file_meta)) {
699 m_clone_task_manager.set_file_meta_transferred();
700 }
701
702 mutex_exit(m_clone_task_manager.get_mutex());
703
704 if (err != 0) {
705 return (err);
706 }
707
708 /* Check and set punch hole for compressed page table. */
709 if (file_type == OS_CLONE_DATA_FILE &&
710 file_meta->m_compress_type != Compression::NONE) {
711 page_size_t page_size(file_meta->m_fsp_flags);
712
713 /* Disable punch hole if donor compression is not effective. */
714 if (page_size.is_compressed() ||
715 file_meta->m_fsblk_size * 2 > srv_page_size) {
716 file_meta->m_punch_hole = false;
717 return (0);
718 }
719
720 os_file_stat_t stat_info;
721 os_file_get_status(file_meta->m_file_name, &stat_info, false, false);
722
723 /* Check and disable punch hole if recipient cannot support it. */
724 if (!IORequest::is_punch_hole_supported() ||
725 stat_info.block_size * 2 > srv_page_size) {
726 file_meta->m_punch_hole = false;
727 } else {
728 file_meta->m_punch_hole = true;
729 }
730
731 /* Currently the format for compressed and encrypted page is
732 dependent on file system block size. */
733 if (file_meta->m_encrypt_type != Encryption::NONE &&
734 file_meta->m_fsblk_size != stat_info.block_size) {
735 auto donor_str = std::to_string(file_meta->m_fsblk_size);
736 auto recipient_str = std::to_string(stat_info.block_size);
737
738 my_error(ER_CLONE_CONFIG, MYF(0), "FS Block Size", donor_str.c_str(),
739 recipient_str.c_str());
740 err = ER_CLONE_CONFIG;
741 }
742 }
743 return (err);
744 }
745
746 ut_ad(file_desc.m_state == CLONE_SNAPSHOT_REDO_COPY);
747
748 /* open and reserve the redo file size */
749 err = open_file(nullptr, file_meta, OS_CLONE_LOG_FILE, true, true);
750
751 snapshot->add_file_from_desc(file_meta);
752
753 /* For redo copy, check and add entry for the second file. */
754 if (err == 0 && file_meta->m_file_index == 0) {
755 file_meta = &file_desc.m_file_meta;
756 file_meta->m_file_index++;
757
758 err =
759 snapshot->get_file_from_desc(file_meta, m_clone_dir, true, desc_exists);
760
761 file_meta->m_punch_hole = false;
762
763 if (err == 0 && !desc_exists) {
764 err = open_file(nullptr, file_meta, OS_CLONE_LOG_FILE, true, true);
765 snapshot->add_file_from_desc(file_meta);
766 }
767 }
768
769 mutex_exit(m_clone_task_manager.get_mutex());
770 return (err);
771 }
772
punch_holes(os_file_t file,const byte * buffer,uint32_t len,uint64_t start_off,uint32_t page_len,uint32_t block_size)773 dberr_t Clone_Handle::punch_holes(os_file_t file, const byte *buffer,
774 uint32_t len, uint64_t start_off,
775 uint32_t page_len, uint32_t block_size) {
776 dberr_t err = DB_SUCCESS;
777
778 /* Loop through all pages in current data block and punch hole. */
779 while (len >= page_len) {
780 /* Validate compressed page type */
781 auto page_type = mach_read_from_2(buffer + FIL_PAGE_TYPE);
782 if (page_type == FIL_PAGE_COMPRESSED ||
783 page_type == FIL_PAGE_COMPRESSED_AND_ENCRYPTED) {
784 auto comp_len = mach_read_from_2(buffer + FIL_PAGE_COMPRESS_SIZE_V1);
785 comp_len += FIL_PAGE_DATA;
786
787 /* Align compressed length */
788 comp_len = ut_calc_align(comp_len, block_size);
789
790 auto offset = static_cast<ulint>(start_off + comp_len);
791 auto hole_size = static_cast<ulint>(page_len - comp_len);
792
793 err = os_file_punch_hole(file, offset, hole_size);
794 if (err != DB_SUCCESS) {
795 break;
796 }
797 }
798 start_off += page_len;
799 buffer += page_len;
800 len -= page_len;
801 }
802 /* Must have consumed all data. */
803 ut_ad(err != DB_SUCCESS || len == 0);
804 return (err);
805 }
806
modify_and_write(const Clone_Task * task,uint64_t offset,unsigned char * buffer,uint32_t buf_len)807 int Clone_Handle::modify_and_write(const Clone_Task *task, uint64_t offset,
808 unsigned char *buffer, uint32_t buf_len) {
809 ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
810
811 auto snapshot = m_clone_task_manager.get_snapshot();
812 auto file_meta = snapshot->get_file_by_index(task->m_current_file_index);
813
814 bool encryption = (file_meta->m_encrypt_type != Encryption::NONE);
815
816 if (encryption) {
817 bool success = true;
818
819 bool is_page_copy = (snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
820 bool key_page = (is_page_copy && offset == 0);
821
822 bool is_log_file = (snapshot->get_state() == CLONE_SNAPSHOT_REDO_COPY);
823 bool key_log = (is_log_file && file_meta->m_file_index == 0 && offset == 0);
824
825 if (key_page) {
826 /* Encrypt tablespace key with master key for encrypted tablespace. */
827 page_size_t page_size(file_meta->m_fsp_flags);
828 success = snapshot->encrypt_key_in_header(page_size, buffer);
829
830 } else if (key_log) {
831 /* Encrypt redo log key with master key */
832 success = snapshot->encrypt_key_in_log_header(buffer, buf_len);
833 }
834 if (!success) {
835 ut_ad(false);
836 int err = ER_INTERNAL_ERROR;
837 my_error(err, MYF(0), "Innodb Clone Apply Failed to Encrypt Key");
838 return (err);
839 }
840 }
841
842 /* No more compression/encryption is needed. */
843 IORequest request(IORequest::WRITE);
844 request.disable_compression();
845 request.clear_encrypted();
846
847 /* Write buffer to file. */
848 errno = 0;
849 auto db_err =
850 os_file_write(request, "Clone data file", task->m_current_file_des,
851 reinterpret_cast<char *>(buffer), offset, buf_len);
852 if (db_err != DB_SUCCESS) {
853 char errbuf[MYSYS_STRERROR_SIZE];
854 my_error(ER_ERROR_ON_WRITE, MYF(0), file_meta->m_file_name, errno,
855 my_strerror(errbuf, sizeof(errbuf), errno));
856
857 return (ER_ERROR_ON_WRITE);
858 }
859
860 /* Attempt to punch holes if page compression is enabled. */
861 if (file_meta->m_punch_hole) {
862 page_size_t page_size(file_meta->m_fsp_flags);
863
864 ut_ad(file_meta->m_compress_type != Compression::NONE ||
865 file_meta->m_file_size > file_meta->m_alloc_size);
866 ut_ad(IORequest::is_punch_hole_supported());
867 ut_ad(!page_size.is_compressed());
868
869 auto page_length = page_size.physical();
870 auto start_offset = offset;
871
872 ut_a(buf_len >= page_length);
873 /* Skip first page */
874 if (start_offset == 0) {
875 start_offset += page_length;
876 buffer += page_length;
877 buf_len -= page_length;
878 }
879 auto db_err = punch_holes(task->m_current_file_des.m_file, buffer, buf_len,
880 start_offset, page_length,
881 static_cast<uint32_t>(file_meta->m_fsblk_size));
882 if (db_err != DB_SUCCESS) {
883 ut_ad(db_err == DB_IO_NO_PUNCH_HOLE);
884 ib::info(ER_IB_CLONE_PUNCH_HOLE)
885 << "Innodb Clone Apply failed to punch hole: "
886 << file_meta->m_file_name;
887 file_meta->m_punch_hole = false;
888 }
889 }
890 return (0);
891 }
892
receive_data(Clone_Task * task,uint64_t offset,uint64_t file_size,uint32_t size,Ha_clone_cbk * callback)893 int Clone_Handle::receive_data(Clone_Task *task, uint64_t offset,
894 uint64_t file_size, uint32_t size,
895 Ha_clone_cbk *callback) {
896 ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
897
898 auto snapshot = m_clone_task_manager.get_snapshot();
899
900 auto file_meta = snapshot->get_file_by_index(task->m_current_file_index);
901
902 bool is_page_copy = (snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
903 bool is_log_file = (snapshot->get_state() == CLONE_SNAPSHOT_REDO_COPY);
904
905 /* During page and redo copy, we encrypt the key in header page. */
906 bool key_page = (is_page_copy && offset == 0);
907 bool key_log = (is_log_file && file_meta->m_file_index == 0 && offset == 0);
908
909 if (key_page) {
910 /* Check and update file size for space header page */
911 if (file_meta->m_file_size < file_size) {
912 snapshot->update_file_size(task->m_current_file_index, file_size);
913 }
914 }
915
916 auto file_type = OS_CLONE_DATA_FILE;
917
918 if (is_log_file || is_page_copy ||
919 file_meta->m_space_id == dict_sys_t::s_invalid_space_id) {
920 file_type = OS_CLONE_LOG_FILE;
921 }
922
923 /* Open destination file for first block. */
924 if (task->m_current_file_des.m_file == OS_FILE_CLOSED) {
925 ut_ad(file_meta != nullptr);
926
927 auto err = open_file(task, file_meta, file_type, true, false);
928
929 if (err != 0) {
930 /* Save error with file name. */
931 m_clone_task_manager.set_error(err, file_meta->m_file_name);
932 return (err);
933 }
934 }
935
936 ut_ad(task->m_current_file_index == file_meta->m_file_index);
937
938 /* Copy data to current destination file using callback. */
939 char errbuf[MYSYS_STRERROR_SIZE];
940
941 auto file_hdl = task->m_current_file_des.m_file;
942 auto success = os_file_seek(nullptr, file_hdl, offset);
943
944 if (!success) {
945 my_error(ER_ERROR_ON_READ, MYF(0), file_meta->m_file_name, errno,
946 my_strerror(errbuf, sizeof(errbuf), errno));
947 /* Save error with file name. */
948 m_clone_task_manager.set_error(ER_ERROR_ON_READ, file_meta->m_file_name);
949 return (ER_ERROR_ON_READ);
950 }
951
952 if (task->m_file_cache) {
953 callback->set_os_buffer_cache();
954 /* For data file recommend zero copy for cached IO. */
955 if (!is_log_file) {
956 callback->set_zero_copy();
957 }
958 }
959
960 callback->set_dest_name(file_meta->m_file_name);
961
962 bool modify_buffer = false;
963
964 /* In case of page compression we need to punch hole. */
965 if (file_meta->m_punch_hole) {
966 ut_ad(!is_log_file);
967 modify_buffer = true;
968 }
969
970 /* We need to encrypt the tablespace key by master key. */
971 if (file_meta->m_encrypt_type != Encryption::NONE && (key_page || key_log)) {
972 modify_buffer = true;
973 }
974 auto err = file_callback(callback, task, size, modify_buffer, offset
975 #ifdef UNIV_PFS_IO
976 ,
977 __FILE__, __LINE__
978 #endif /* UNIV_PFS_IO */
979 );
980
981 task->m_data_size += size;
982
983 if (err != 0) {
984 /* Save error with file name. */
985 m_clone_task_manager.set_error(err, file_meta->m_file_name);
986 }
987 return (err);
988 }
989
apply_data(Clone_Task * task,Ha_clone_cbk * callback)990 int Clone_Handle::apply_data(Clone_Task *task, Ha_clone_cbk *callback) {
991 ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
992
993 /* Extract the data descriptor. */
994 uint desc_len = 0;
995 auto serial_desc = callback->get_data_desc(&desc_len);
996
997 Clone_Desc_Data data_desc;
998 auto success = data_desc.deserialize(serial_desc, desc_len);
999
1000 if (!success) {
1001 ut_ad(false);
1002 int err = ER_CLONE_PROTOCOL;
1003 my_error(err, MYF(0), "Wrong Clone RPC: Invalid Data Descriptor");
1004 return (err);
1005 }
1006 /* Identify the task for the current block of data. */
1007 int err = 0;
1008 auto task_meta = &data_desc.m_task_meta;
1009
1010 /* The data is from a different file. Close the current one. */
1011 if (task->m_current_file_index != data_desc.m_file_index) {
1012 err = close_file(task);
1013 if (err != 0) {
1014 return (err);
1015 }
1016 task->m_current_file_index = data_desc.m_file_index;
1017 }
1018
1019 /* Receive data from callback and apply. */
1020 err = receive_data(task, data_desc.m_file_offset, data_desc.m_file_size,
1021 data_desc.m_data_len, callback);
1022
1023 /* Close file in case of error. */
1024 if (err != 0) {
1025 close_file(task);
1026 } else {
1027 err = m_clone_task_manager.set_chunk(task, task_meta);
1028 }
1029
1030 return (err);
1031 }
1032
apply(THD * thd,uint task_id,Ha_clone_cbk * callback)1033 int Clone_Handle::apply(THD *thd, uint task_id, Ha_clone_cbk *callback) {
1034 int err = 0;
1035 uint desc_len = 0;
1036
1037 auto clone_desc = callback->get_data_desc(&desc_len);
1038 ut_ad(clone_desc != nullptr);
1039
1040 Clone_Desc_Header header;
1041 auto success = header.deserialize(clone_desc, desc_len);
1042
1043 if (!success) {
1044 ut_ad(false);
1045 err = ER_CLONE_PROTOCOL;
1046 my_error(err, MYF(0), "Wrong Clone RPC: Invalid Descriptor Header");
1047 return (err);
1048 }
1049
1050 /* Check the descriptor type in header and apply */
1051 auto task = m_clone_task_manager.get_task_by_index(task_id);
1052
1053 switch (header.m_type) {
1054 case CLONE_DESC_TASK_METADATA:
1055 err = apply_task_metadata(task, callback);
1056 break;
1057
1058 case CLONE_DESC_STATE:
1059 err = apply_state_metadata(task, callback);
1060 break;
1061
1062 case CLONE_DESC_FILE_METADATA:
1063 err = apply_file_metadata(task, callback);
1064 break;
1065
1066 case CLONE_DESC_DATA:
1067 err = apply_data(task, callback);
1068 break;
1069
1070 default:
1071 ut_ad(false);
1072 break;
1073 }
1074
1075 if (err != 0) {
1076 close_file(task);
1077 }
1078
1079 return (err);
1080 }
1081
restart_apply(THD * thd,const byte * & loc,uint & loc_len)1082 int Clone_Handle::restart_apply(THD *thd, const byte *&loc, uint &loc_len) {
1083 auto init_loc = m_restart_loc;
1084 auto init_len = m_restart_loc_len;
1085 auto alloc_len = m_restart_loc_len;
1086
1087 /* Get latest locator */
1088 loc = get_locator(loc_len);
1089
1090 m_clone_task_manager.reinit_apply_state(loc, loc_len, init_loc, init_len,
1091 alloc_len);
1092
1093 /* Return the original locator if no state information */
1094 if (init_loc == nullptr) {
1095 return (0);
1096 }
1097
1098 loc = init_loc;
1099 loc_len = init_len;
1100
1101 /* Reset restart loc buffer if newly allocated */
1102 if (alloc_len > m_restart_loc_len) {
1103 m_restart_loc = init_loc;
1104 m_restart_loc_len = alloc_len;
1105 }
1106
1107 ut_ad(loc == m_restart_loc);
1108
1109 auto master_task = m_clone_task_manager.get_task_by_index(0);
1110
1111 auto err = close_file(master_task);
1112
1113 return (err);
1114 }
1115
update_file_size(uint32_t file_index,uint64_t file_size)1116 void Clone_Snapshot::update_file_size(uint32_t file_index, uint64_t file_size) {
1117 /* Update file size when file is extended during page copy */
1118 ut_ad(m_snapshot_state == CLONE_SNAPSHOT_PAGE_COPY);
1119
1120 auto cur_file = get_file_by_index(file_index);
1121
1122 while (file_size > cur_file->m_file_size) {
1123 ++file_index;
1124
1125 if (file_index >= m_num_data_files) {
1126 /* Update file size for the last file. */
1127 cur_file->m_file_size = file_size;
1128 break;
1129 }
1130
1131 auto next_file = get_file_by_index(file_index);
1132
1133 if (next_file->m_space_id != cur_file->m_space_id) {
1134 /* Update file size for the last file. */
1135 cur_file->m_file_size = file_size;
1136 break;
1137 }
1138
1139 /* Only system tablespace can have multiple nodes. */
1140 ut_ad(cur_file->m_space_id == 0);
1141
1142 file_size -= cur_file->m_file_size;
1143 cur_file = next_file;
1144 }
1145 }
1146
init_apply_state(Clone_Desc_State * state_desc)1147 int Clone_Snapshot::init_apply_state(Clone_Desc_State *state_desc) {
1148 set_state_info(state_desc);
1149
1150 int err = 0;
1151 switch (m_snapshot_state) {
1152 case CLONE_SNAPSHOT_FILE_COPY:
1153 ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State FILE COPY: ";
1154 break;
1155
1156 case CLONE_SNAPSHOT_PAGE_COPY:
1157 ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State PAGE COPY: ";
1158 break;
1159
1160 case CLONE_SNAPSHOT_REDO_COPY:
1161 ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State REDO COPY: ";
1162 break;
1163
1164 case CLONE_SNAPSHOT_DONE:
1165 /* Extend and flush data files. */
1166 ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State FLUSH DATA: ";
1167 err = extend_and_flush_files(false);
1168 if (err != 0) {
1169 ib::info(ER_IB_CLONE_OPERATION)
1170 << "Clone Apply FLUSH DATA failed code: " << err;
1171 break;
1172 }
1173 /* Flush redo files. */
1174 ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State FLUSH REDO: ";
1175 err = extend_and_flush_files(true);
1176 if (err != 0) {
1177 ib::info(ER_IB_CLONE_OPERATION)
1178 << "Clone Apply FLUSH REDO failed code: " << err;
1179 break;
1180 }
1181 ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State DONE";
1182 break;
1183
1184 case CLONE_SNAPSHOT_NONE:
1185 case CLONE_SNAPSHOT_INIT:
1186 default:
1187 ut_ad(false);
1188 err = ER_INTERNAL_ERROR;
1189 my_error(err, MYF(0), "Innodb Clone Snapshot Invalid state");
1190 break;
1191 }
1192 return (err);
1193 }
1194
extend_and_flush_files(bool flush_redo)1195 int Clone_Snapshot::extend_and_flush_files(bool flush_redo) {
1196 auto &file_vector = (flush_redo) ? m_redo_file_vector : m_data_file_vector;
1197
1198 for (auto file_meta : file_vector) {
1199 char errbuf[MYSYS_STRERROR_SIZE];
1200 bool success = true;
1201
1202 auto file =
1203 os_file_create(innodb_clone_file_key, file_meta->m_file_name,
1204 OS_FILE_OPEN | OS_FILE_ON_ERROR_NO_EXIT, OS_FILE_NORMAL,
1205 OS_CLONE_DATA_FILE, false, &success);
1206
1207 if (!success) {
1208 my_error(ER_CANT_OPEN_FILE, MYF(0), file_meta->m_file_name, errno,
1209 my_strerror(errbuf, sizeof(errbuf), errno));
1210
1211 return (ER_CANT_OPEN_FILE);
1212 }
1213
1214 auto file_size = os_file_get_size(file);
1215
1216 if (file_size < file_meta->m_file_size) {
1217 success = os_file_set_size(file_meta->m_file_name, file, file_size,
1218 file_meta->m_file_size, false, true);
1219 } else {
1220 success = os_file_flush(file);
1221 }
1222
1223 os_file_close(file);
1224
1225 if (!success) {
1226 my_error(ER_ERROR_ON_WRITE, MYF(0), file_meta->m_file_name, errno,
1227 my_strerror(errbuf, sizeof(errbuf), errno));
1228
1229 return (ER_ERROR_ON_WRITE);
1230 }
1231 }
1232 return (0);
1233 }
1234