1 /*****************************************************************************
2 
3 Copyright (c) 2017, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file clone/clone0apply.cc
28  Innodb apply snapshot data
29 
30  *******************************************************/
31 
32 #include <fstream>
33 #include <sstream>
34 
35 #include "buf0dump.h"
36 #include "clone0api.h"
37 #include "clone0clone.h"
38 #include "dict0dict.h"
39 #include "log0log.h"
40 #include "sql/handler.h"
41 
get_file_from_desc(Clone_File_Meta * & file_desc,const char * data_dir,bool desc_create,bool & desc_exists)42 int Clone_Snapshot::get_file_from_desc(Clone_File_Meta *&file_desc,
43                                        const char *data_dir, bool desc_create,
44                                        bool &desc_exists) {
45   int err = 0;
46 
47   mutex_enter(&m_snapshot_mutex);
48 
49   auto idx = file_desc->m_file_index;
50 
51   ut_ad(m_snapshot_handle_type == CLONE_HDL_APPLY);
52 
53   ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY ||
54         m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY);
55 
56   Clone_File_Vec &file_vector = (m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY)
57                                     ? m_data_file_vector
58                                     : m_redo_file_vector;
59 
60   desc_exists = false;
61 
62   /* File metadata is already there, possibly sent by another task. */
63   if (file_vector[idx] != nullptr) {
64     file_desc = file_vector[idx];
65     desc_exists = true;
66 
67   } else if (desc_create) {
68     /* Create the descriptor. */
69     err = create_desc(data_dir, file_desc);
70   }
71 
72   mutex_exit(&m_snapshot_mutex);
73 
74   return (err);
75 }
76 
update_file_name(const char * data_dir,Clone_File_Meta * file_desc,char * path,size_t path_len)77 int Clone_Snapshot::update_file_name(const char *data_dir,
78                                      Clone_File_Meta *file_desc, char *path,
79                                      size_t path_len) {
80   auto space_id = file_desc->m_space_id;
81 
82   if (data_dir != nullptr || m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
83     return (0);
84   }
85 
86   /* Update buffer pool dump file path for provisioning. */
87   if (file_desc->m_space_id == dict_sys_t::s_invalid_space_id) {
88     ut_ad(0 == strcmp(file_desc->m_file_name, SRV_BUF_DUMP_FILENAME_DEFAULT));
89     buf_dump_generate_path(path, path_len);
90     file_desc->m_file_name = path;
91     file_desc->m_file_name_len = strlen(path) + 1;
92     return (0);
93   }
94 
95   /* Change name to system configured file when replacing current directory. */
96   if (!fsp_is_system_tablespace(space_id)) {
97     return (0);
98   }
99 
100   /* Find out the node index of the file within system tablespace. */
101   auto loop_index = file_desc->m_file_index;
102   decltype(loop_index) node_index = 0;
103 
104   while (loop_index > 0) {
105     --loop_index;
106     auto cur_desc = m_data_file_vector[loop_index];
107     /* Loop through all files of current tablespace. */
108     if (cur_desc->m_space_id != space_id) {
109       break;
110     }
111     ++node_index;
112   }
113 
114   auto last_file_index =
115       static_cast<decltype(node_index)>(srv_sys_space.m_files.size() - 1);
116 
117   /* Check if the file is beyond maximum configured files. */
118   if (node_index > last_file_index) {
119     std::ostringstream err_strm;
120     err_strm << "innodb_data_file_path: Recipient file count: "
121              << last_file_index + 1 << " is less than Donor file count.";
122 
123     std::string err_str(err_strm.str());
124 
125     my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
126 
127     return (ER_CLONE_SYS_CONFIG);
128   }
129 
130   auto &file = srv_sys_space.m_files[node_index];
131   page_size_t page_sz(srv_sys_space.flags());
132 
133   auto size_bytes = static_cast<uint64_t>(file.size());
134   size_bytes *= page_sz.physical();
135 
136   /* Check if the file size matches with configured files. */
137   if (file_desc->m_file_size != size_bytes) {
138     /* For last file it could mismatch if auto extend is specified. */
139     if (node_index != last_file_index ||
140         !srv_sys_space.can_auto_extend_last_file()) {
141       std::ostringstream err_strm;
142 
143       err_strm << "innodb_data_file_path: Recipient value for " << node_index
144                << "th file size: " << size_bytes
145                << " doesn't match Donor file size: " << file_desc->m_file_size;
146 
147       std::string err_str(err_strm.str());
148 
149       my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
150 
151       return (ER_CLONE_SYS_CONFIG);
152     }
153   }
154   /* Change filename to currently configured name. */
155   file_desc->m_file_name = file.filepath();
156   file_desc->m_file_name_len = strlen(file_desc->m_file_name) + 1;
157 
158   return (0);
159 }
160 
compute_path_length(const char * data_dir,const Clone_File_Meta * file_desc)161 size_t Clone_Snapshot::compute_path_length(const char *data_dir,
162                                            const Clone_File_Meta *file_desc) {
163   bool is_absolute_path = false;
164   auto alloc_len = sizeof(Clone_File_Meta);
165 
166   alloc_len += sizeof(CLONE_INNODB_REPLACED_FILE_EXTN);
167 
168   if (file_desc->m_file_name == nullptr) {
169     alloc_len += MAX_LOG_FILE_NAME;
170   } else {
171     alloc_len += file_desc->m_file_name_len;
172     std::string name;
173     name.assign(file_desc->m_file_name, file_desc->m_file_name_len);
174     is_absolute_path = Fil_path::is_absolute_path(name);
175   }
176 
177   /* For absolute path, name length is the total length. */
178   if (is_absolute_path) {
179     return (alloc_len);
180   }
181 
182   /* Add data directory length for relative path. */
183   if (data_dir != nullptr) {
184     alloc_len += strlen(data_dir);
185     ++alloc_len;
186     return (alloc_len);
187   }
188 
189   /* While replacing current data directory, calculate length
190   based on current system configuration. */
191 
192   /* Use server redo file location */
193   if (m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
194     alloc_len += strlen(srv_log_group_home_dir);
195     ++alloc_len;
196     return (alloc_len);
197   }
198 
199   ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY);
200 
201   /* Use server undo file location */
202   if (fsp_is_undo_tablespace(file_desc->m_space_id)) {
203     alloc_len += strlen(srv_undo_dir);
204     ++alloc_len;
205   }
206   return (alloc_len);
207 }
208 
handle_existing_file(bool replace,Clone_File_Meta * file_desc)209 int Clone_Snapshot::handle_existing_file(bool replace,
210                                          Clone_File_Meta *file_desc) {
211   /* For undo tablespace, check for duplicate file name. Currently it
212   is possible to create multiple undo tablespaces of same name under
213   different directory. This should not be recommended and in future
214   we aim to disallow specifying file name for tablespaces and generate
215   it internally based on space ID. Till that time, Clone needs to identify
216   and disallow undo tablespaces of same name as Clone creates all undo
217   tablespaces under innodb_undo_directory configuration in recipient. */
218   if (fsp_is_undo_tablespace(file_desc->m_space_id)) {
219     std::string clone_file(file_desc->m_file_name);
220     clone_file.append(CLONE_INNODB_REPLACED_FILE_EXTN);
221 
222     for (auto undo_index : m_undo_file_indexes) {
223       auto undo_meta = m_data_file_vector[undo_index];
224       if (undo_meta == nullptr) {
225         continue;
226       }
227 
228       /* undo_meta: already added undo file with or without #clone extension.
229       The #clone extension is present when recipient also has the same file.
230       file_desc: current undo file name without #clone extension.
231       clone_file: current undo file name with #clone extension.
232       Since the existing undo file may or may not have the #clone extension
233       we need to match both. */
234       if (0 == strcmp(undo_meta->m_file_name, file_desc->m_file_name) ||
235           0 == strcmp(undo_meta->m_file_name, clone_file.c_str())) {
236         std::ostringstream err_strm;
237         err_strm << "Found multiple undo files with same name: "
238                  << file_desc->m_file_name;
239         std::string err_str(err_strm.str());
240         my_error(ER_CLONE_SYS_CONFIG, MYF(0), err_str.c_str());
241         return (ER_CLONE_SYS_CONFIG);
242       }
243     }
244     m_undo_file_indexes.push_back(file_desc->m_file_index);
245     ut_ad(m_undo_file_indexes.size() <= FSP_MAX_UNDO_TABLESPACES);
246   }
247 
248   std::string file_name;
249   file_name.assign(file_desc->m_file_name);
250 
251   auto type = Fil_path::get_file_type(file_name);
252 
253   /* Nothing to do if file doesn't exist */
254   if (type == OS_FILE_TYPE_MISSING) {
255     int err = 0;
256     if (replace) {
257       /* Add file to new file list to enable rollback. */
258       err = clone_add_to_list_file(CLONE_INNODB_NEW_FILES,
259                                    file_desc->m_file_name);
260     }
261     return (err);
262   }
263 
264   if (type != OS_FILE_TYPE_FILE) {
265     /* Either the stat() call failed or the name is a
266     directory/block device, or permission error etc. */
267     char errbuf[MYSYS_STRERROR_SIZE];
268     my_error(ER_ERROR_ON_WRITE, MYF(0), file_name.c_str(), errno,
269              my_strerror(errbuf, sizeof(errbuf), errno));
270     return (ER_ERROR_ON_WRITE);
271   }
272 
273   ut_a(type == OS_FILE_TYPE_FILE);
274 
275   /* For cloning to different data directory, we must ensure that the
276   file is not present. This would always fail for local clone. */
277   if (!replace) {
278     my_error(ER_FILE_EXISTS_ERROR, MYF(0), file_name.c_str());
279     return (ER_FILE_EXISTS_ERROR);
280   }
281 
282   /* Save original data file name. */
283   std::string data_file(file_desc->m_file_name);
284 
285   /* For clone to current data directory, we need to clone system files
286   to a file with different name and then move back during restart. */
287   auto file_extn_loc = const_cast<char *>(file_desc->m_file_name);
288   file_extn_loc += file_desc->m_file_name_len;
289 
290   /* Overwrite null terminator. */
291   --file_extn_loc;
292   strcpy(file_extn_loc, CLONE_INNODB_REPLACED_FILE_EXTN);
293 
294   file_desc->m_file_name_len += sizeof(CLONE_INNODB_REPLACED_FILE_EXTN);
295 
296   /* Check that file with clone extension is not present */
297   file_name.assign(file_desc->m_file_name);
298   type = Fil_path::get_file_type(file_name);
299 
300   if (type != OS_FILE_TYPE_MISSING) {
301     my_error(ER_FILE_EXISTS_ERROR, MYF(0), file_name.c_str());
302     return (ER_FILE_EXISTS_ERROR);
303   }
304 
305   /* Add file name to files to be replaced before recovery. */
306   auto err =
307       clone_add_to_list_file(CLONE_INNODB_REPLACED_FILES, data_file.c_str());
308   return (err);
309 }
310 
build_file_path(const char * data_dir,ulint alloc_size,Clone_File_Meta * & file_desc)311 int Clone_Snapshot::build_file_path(const char *data_dir, ulint alloc_size,
312                                     Clone_File_Meta *&file_desc) {
313   /* Check if data directory is being replaced. */
314   bool replace_dir = (data_dir == nullptr);
315 
316   /* Allocate for file path string. */
317   auto path = static_cast<char *>(mem_heap_alloc(m_snapshot_heap, alloc_size));
318 
319   if (path == nullptr) {
320     my_error(ER_OUTOFMEMORY, MYF(0), alloc_size);
321     return (ER_OUTOFMEMORY);
322   }
323 
324   /* Copy file metadata */
325   auto file_meta = reinterpret_cast<Clone_File_Meta *>(path);
326   path += sizeof(Clone_File_Meta);
327   *file_meta = *file_desc;
328 
329   bool is_absolute_path = false;
330   std::string file_name;
331 
332   /* Check if absolute or relative path. */
333   if (file_desc->m_file_name != nullptr) {
334     file_name.assign(file_desc->m_file_name, file_desc->m_file_name_len);
335     is_absolute_path = Fil_path::is_absolute_path(file_name);
336   }
337 
338   file_meta->m_file_name = static_cast<const char *>(path);
339 
340   /* Copy path and file name together for absolute path. */
341   if (is_absolute_path) {
342     ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY);
343     auto is_hard_path = test_if_hard_path(file_desc->m_file_name);
344 
345     /* Check if the absolute path is not in right format */
346     if (is_hard_path == 0) {
347       my_error(ER_WRONG_VALUE, MYF(0), "file path", file_desc->m_file_name);
348       return (ER_WRONG_VALUE);
349     }
350 
351     strcpy(path, file_desc->m_file_name);
352 
353     auto err = handle_existing_file(replace_dir, file_meta);
354 
355     file_desc = file_meta;
356 
357     return (err);
358   }
359 
360   const char *file_path = data_dir;
361 
362   /* Use configured path when cloning into current data directory. */
363   if (file_path == nullptr) {
364     /* Get file path from redo configuration. */
365     if (m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
366       file_path = srv_log_group_home_dir;
367 
368       /* Get file path from undo configuration. */
369     } else if (fsp_is_undo_tablespace(file_desc->m_space_id)) {
370       file_path = srv_undo_dir;
371     }
372   }
373 
374   /* Copy file path. */
375   if (file_path != nullptr) {
376     auto path_len = strlen(file_path);
377     strcpy(path, file_path);
378 
379     /* Add path separator at the end of file path, if not there. */
380     if (file_path[path_len - 1] != OS_PATH_SEPARATOR) {
381       path[path_len] = OS_PATH_SEPARATOR;
382       ++path;
383     }
384     path += path_len;
385   }
386 
387   /* Copy file name */
388   if (m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY) {
389     /* This is redo file. Use standard name. */
390     snprintf(path, MAX_LOG_FILE_NAME, "%s%u", ib_logfile_basename,
391              file_desc->m_file_index);
392   } else {
393     ut_ad(m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY);
394     ut_ad(file_desc->m_file_name != nullptr);
395     /* For relative path remove "./" if there. */
396     if (Fil_path::has_prefix(file_name, Fil_path::DOT_SLASH)) {
397       file_name.erase(0, 2);
398     }
399 
400     /* Copy adjusted file name */
401     strcpy(path, file_name.c_str());
402   }
403 
404   file_meta->m_file_name_len = strlen(file_meta->m_file_name) + 1;
405 
406   /* Check and handle when file is already present in recipient. */
407   auto err = handle_existing_file(replace_dir, file_meta);
408   file_desc = file_meta;
409 
410   return (err);
411 }
412 
create_desc(const char * data_dir,Clone_File_Meta * & file_desc)413 int Clone_Snapshot::create_desc(const char *data_dir,
414                                 Clone_File_Meta *&file_desc) {
415   /* Update file name from configuration for system space */
416   char path[OS_FILE_MAX_PATH];
417   auto err = update_file_name(data_dir, file_desc, &path[0], sizeof(path));
418 
419   if (err != 0) {
420     return (err);
421   }
422 
423   /* Find out length of complete path string for file */
424   auto alloc_size =
425       static_cast<ulint>(compute_path_length(data_dir, file_desc));
426 
427   /* Build complete path for the new file to be added. */
428   err = build_file_path(data_dir, alloc_size, file_desc);
429 
430   return (err);
431 }
432 
add_file_from_desc(Clone_File_Meta * & file_desc)433 bool Clone_Snapshot::add_file_from_desc(Clone_File_Meta *&file_desc) {
434   mutex_enter(&m_snapshot_mutex);
435 
436   ut_ad(m_snapshot_handle_type == CLONE_HDL_APPLY);
437 
438   if (m_snapshot_state == CLONE_SNAPSHOT_FILE_COPY) {
439     m_data_file_vector[file_desc->m_file_index] = file_desc;
440   } else {
441     ut_ad(m_snapshot_state == CLONE_SNAPSHOT_REDO_COPY);
442     m_redo_file_vector[file_desc->m_file_index] = file_desc;
443   }
444 
445   mutex_exit(&m_snapshot_mutex);
446 
447   /** Check if it the last file */
448   if (file_desc->m_file_index == m_num_data_files - 1) {
449     return true;
450   }
451 
452   return (false);
453 }
454 
apply_task_metadata(Clone_Task * task,Ha_clone_cbk * callback)455 int Clone_Handle::apply_task_metadata(Clone_Task *task,
456                                       Ha_clone_cbk *callback) {
457   ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
458   uint desc_len = 0;
459   auto serial_desc = callback->get_data_desc(&desc_len);
460 
461   Clone_Desc_Task_Meta task_desc;
462   auto success = task_desc.deserialize(serial_desc, desc_len);
463 
464   if (!success) {
465     ut_ad(false);
466     int err = ER_CLONE_PROTOCOL;
467     my_error(err, MYF(0), "Wrong Clone RPC: Invalid Task Descriptor");
468     return (err);
469   }
470   task->m_task_meta = task_desc.m_task_meta;
471   return (0);
472 }
473 
check_space()474 int Clone_Handle::check_space() {
475   /* Do space check only during file copy. */
476   if (m_clone_task_manager.get_state() != CLONE_SNAPSHOT_FILE_COPY) {
477     return (0);
478   }
479   uint64_t free_space;
480   auto MySQL_datadir_abs_path = MySQL_datadir_path.abs_path();
481   auto data_dir =
482       (replace_datadir() ? MySQL_datadir_abs_path.c_str() : get_datadir());
483 
484   auto db_err = os_get_free_space(data_dir, free_space);
485   /* We skip space check if the OS interface returns error. */
486   if (db_err != DB_SUCCESS) {
487     ib::warn(ER_IB_CLONE_VALIDATE)
488         << "Clone could not validate available free space";
489     return (0);
490   }
491 
492   auto snapshot = m_clone_task_manager.get_snapshot();
493   auto bytes_disk = snapshot->get_disk_estimate();
494 
495   std::string avaiable_space;
496   std::string clone_space;
497   ut_format_byte_value(bytes_disk, clone_space);
498   ut_format_byte_value(free_space, avaiable_space);
499 
500   int err = 0;
501   if (bytes_disk > free_space) {
502     err = ER_CLONE_DISK_SPACE;
503     my_error(err, MYF(0), clone_space.c_str(), avaiable_space.c_str());
504   }
505 
506   ib::info(ER_IB_CLONE_VALIDATE)
507       << "Clone estimated size: " << clone_space.c_str()
508       << " Available space: " << avaiable_space.c_str();
509   return (err);
510 }
511 
apply_state_metadata(Clone_Task * task,Ha_clone_cbk * callback)512 int Clone_Handle::apply_state_metadata(Clone_Task *task,
513                                        Ha_clone_cbk *callback) {
514   int err = 0;
515   uint desc_len = 0;
516   auto serial_desc = callback->get_data_desc(&desc_len);
517 
518   Clone_Desc_State state_desc;
519   auto success = state_desc.deserialize(serial_desc, desc_len);
520 
521   if (!success) {
522     ut_ad(false);
523     err = ER_CLONE_PROTOCOL;
524     my_error(err, MYF(0), "Wrong Clone RPC: Invalid State Descriptor");
525     return (err);
526   }
527   if (m_clone_handle_type == CLONE_HDL_COPY) {
528     ut_ad(state_desc.m_is_ack);
529     m_clone_task_manager.ack_state(&state_desc);
530     return (0);
531   }
532 
533   ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
534 
535   /* ACK descriptor is sent for keeping the connection alive. */
536   if (state_desc.m_is_ack) {
537     return (0);
538   }
539 
540   /* Reset current chunk information */
541   auto &task_meta = task->m_task_meta;
542   task_meta.m_chunk_num = 0;
543   task_meta.m_block_num = 0;
544 
545   /* Move to the new state */
546   if (state_desc.m_is_start) {
547 #ifdef UNIV_DEBUG
548     /* Network failure before moving to new state */
549     err = m_clone_task_manager.debug_restart(task, err, 5);
550 #endif /* UNIV_DEBUG */
551 
552     /** Notify state change via callback. */
553     notify_state_change(task, callback, &state_desc);
554 
555     err = move_to_next_state(task, nullptr, &state_desc);
556 
557 #ifdef UNIV_DEBUG
558     /* Network failure after moving to new state */
559     err = m_clone_task_manager.debug_restart(task, err, 0);
560 #endif /* UNIV_DEBUG */
561 
562     /* Check if enough space available on disk */
563     if (err == 0) {
564       err = check_space();
565     }
566 
567     return (err);
568   }
569 
570   /* It is the end of current state. Close active file. */
571   err = close_file(task);
572 
573 #ifdef UNIV_DEBUG
574   /* Network failure before finishing state */
575   err = m_clone_task_manager.debug_restart(task, err, 2);
576 #endif /* UNIV_DEBUG */
577 
578   if (err != 0) {
579     return (err);
580   }
581 
582   ut_ad(state_desc.m_state == m_clone_task_manager.get_state());
583 
584   /* Mark current state finished for the task */
585   err = m_clone_task_manager.finish_state(task);
586 
587 #ifdef UNIV_DEBUG
588   /* Network failure before sending ACK */
589   err = m_clone_task_manager.debug_restart(task, err, 3);
590 #endif /* UNIV_DEBUG */
591 
592   /* Send acknowledgement back to remote server */
593   if (err == 0 && task->m_is_master) {
594     err = ack_state_metadata(task, callback, &state_desc);
595 
596     if (err != 0) {
597       ib::info(ER_IB_CLONE_OPERATION)
598           << "Clone Apply Master ACK finshed state: " << state_desc.m_state;
599     }
600   }
601 
602 #ifdef UNIV_DEBUG
603   /* Network failure after sending ACK */
604   err = m_clone_task_manager.debug_restart(task, err, 4);
605 #endif /* UNIV_DEBUG */
606 
607   return (err);
608 }
609 
notify_state_change(Clone_Task * task,Ha_clone_cbk * callback,Clone_Desc_State * state_desc)610 void Clone_Handle::notify_state_change(Clone_Task *task, Ha_clone_cbk *callback,
611                                        Clone_Desc_State *state_desc) {
612   if (!task->m_is_master) {
613     return;
614   }
615   callback->mark_state_change(state_desc->m_estimate);
616   callback->buffer_cbk(nullptr, 0);
617   callback->clear_flags();
618 }
619 
ack_state_metadata(Clone_Task * task,Ha_clone_cbk * callback,Clone_Desc_State * state_desc)620 int Clone_Handle::ack_state_metadata(Clone_Task *task, Ha_clone_cbk *callback,
621                                      Clone_Desc_State *state_desc) {
622   ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
623 
624   state_desc->m_is_ack = true;
625 
626   byte desc_buf[CLONE_DESC_MAX_BASE_LEN];
627 
628   auto serial_desc = &desc_buf[0];
629   uint desc_len = CLONE_DESC_MAX_BASE_LEN;
630 
631   state_desc->serialize(serial_desc, desc_len, nullptr);
632 
633   callback->set_data_desc(serial_desc, desc_len);
634   callback->clear_flags();
635 
636   auto err = callback->buffer_cbk(nullptr, 0);
637 
638   return (err);
639 }
640 
apply_file_metadata(Clone_Task * task,Ha_clone_cbk * callback)641 int Clone_Handle::apply_file_metadata(Clone_Task *task,
642                                       Ha_clone_cbk *callback) {
643   ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
644 
645   uint desc_len = 0;
646   auto serial_desc = callback->get_data_desc(&desc_len);
647 
648   Clone_Desc_File_MetaData file_desc;
649   auto success = file_desc.deserialize(serial_desc, desc_len);
650 
651   if (!success) {
652     ut_ad(false);
653     int err = ER_CLONE_PROTOCOL;
654     my_error(err, MYF(0), "Wrong Clone RPC: Invalid File Descriptor");
655     return (err);
656   }
657   auto file_meta = &file_desc.m_file_meta;
658   auto snapshot = m_clone_task_manager.get_snapshot();
659 
660   ut_ad(snapshot->get_state() == file_desc.m_state);
661 
662   bool desc_exists;
663 
664   /* Check file metadata entry based on the descriptor. */
665   auto err =
666       snapshot->get_file_from_desc(file_meta, m_clone_dir, false, desc_exists);
667   if (err != 0 || desc_exists) {
668     return (err);
669   }
670 
671   mutex_enter(m_clone_task_manager.get_mutex());
672 
673   /* Create file metadata entry based on the descriptor. */
674   err = snapshot->get_file_from_desc(file_meta, m_clone_dir, true, desc_exists);
675   file_meta->m_punch_hole = false;
676 
677   if (err != 0 || desc_exists) {
678     mutex_exit(m_clone_task_manager.get_mutex());
679 
680     /* Save error with file name. */
681     if (err != 0) {
682       m_clone_task_manager.set_error(err, file_meta->m_file_name);
683     }
684     return (err);
685   }
686 
687   if (file_desc.m_state == CLONE_SNAPSHOT_FILE_COPY) {
688     auto file_type = OS_CLONE_DATA_FILE;
689 
690     if (file_meta->m_space_id == dict_sys_t::s_invalid_space_id) {
691       file_type = OS_CLONE_LOG_FILE;
692     }
693 
694     /* Create the file */
695     err = open_file(nullptr, file_meta, file_type, true, false);
696 
697     /* If last file is received, set all file metadata transferred */
698     if (snapshot->add_file_from_desc(file_meta)) {
699       m_clone_task_manager.set_file_meta_transferred();
700     }
701 
702     mutex_exit(m_clone_task_manager.get_mutex());
703 
704     if (err != 0) {
705       return (err);
706     }
707 
708     /* Check and set punch hole for compressed page table. */
709     if (file_type == OS_CLONE_DATA_FILE &&
710         file_meta->m_compress_type != Compression::NONE) {
711       page_size_t page_size(file_meta->m_fsp_flags);
712 
713       /* Disable punch hole if donor compression is not effective. */
714       if (page_size.is_compressed() ||
715           file_meta->m_fsblk_size * 2 > srv_page_size) {
716         file_meta->m_punch_hole = false;
717         return (0);
718       }
719 
720       os_file_stat_t stat_info;
721       os_file_get_status(file_meta->m_file_name, &stat_info, false, false);
722 
723       /* Check and disable punch hole if recipient cannot support it. */
724       if (!IORequest::is_punch_hole_supported() ||
725           stat_info.block_size * 2 > srv_page_size) {
726         file_meta->m_punch_hole = false;
727       } else {
728         file_meta->m_punch_hole = true;
729       }
730 
731       /* Currently the format for compressed and encrypted page is
732       dependent on file system block size. */
733       if (file_meta->m_encrypt_type != Encryption::NONE &&
734           file_meta->m_fsblk_size != stat_info.block_size) {
735         auto donor_str = std::to_string(file_meta->m_fsblk_size);
736         auto recipient_str = std::to_string(stat_info.block_size);
737 
738         my_error(ER_CLONE_CONFIG, MYF(0), "FS Block Size", donor_str.c_str(),
739                  recipient_str.c_str());
740         err = ER_CLONE_CONFIG;
741       }
742     }
743     return (err);
744   }
745 
746   ut_ad(file_desc.m_state == CLONE_SNAPSHOT_REDO_COPY);
747 
748   /* open and reserve the redo file size */
749   err = open_file(nullptr, file_meta, OS_CLONE_LOG_FILE, true, true);
750 
751   snapshot->add_file_from_desc(file_meta);
752 
753   /* For redo copy, check and add entry for the second file. */
754   if (err == 0 && file_meta->m_file_index == 0) {
755     file_meta = &file_desc.m_file_meta;
756     file_meta->m_file_index++;
757 
758     err =
759         snapshot->get_file_from_desc(file_meta, m_clone_dir, true, desc_exists);
760 
761     file_meta->m_punch_hole = false;
762 
763     if (err == 0 && !desc_exists) {
764       err = open_file(nullptr, file_meta, OS_CLONE_LOG_FILE, true, true);
765       snapshot->add_file_from_desc(file_meta);
766     }
767   }
768 
769   mutex_exit(m_clone_task_manager.get_mutex());
770   return (err);
771 }
772 
punch_holes(os_file_t file,const byte * buffer,uint32_t len,uint64_t start_off,uint32_t page_len,uint32_t block_size)773 dberr_t Clone_Handle::punch_holes(os_file_t file, const byte *buffer,
774                                   uint32_t len, uint64_t start_off,
775                                   uint32_t page_len, uint32_t block_size) {
776   dberr_t err = DB_SUCCESS;
777 
778   /* Loop through all pages in current data block and punch hole. */
779   while (len >= page_len) {
780     /* Validate compressed page type */
781     auto page_type = mach_read_from_2(buffer + FIL_PAGE_TYPE);
782     if (page_type == FIL_PAGE_COMPRESSED ||
783         page_type == FIL_PAGE_COMPRESSED_AND_ENCRYPTED) {
784       auto comp_len = mach_read_from_2(buffer + FIL_PAGE_COMPRESS_SIZE_V1);
785       comp_len += FIL_PAGE_DATA;
786 
787       /* Align compressed length */
788       comp_len = ut_calc_align(comp_len, block_size);
789 
790       auto offset = static_cast<ulint>(start_off + comp_len);
791       auto hole_size = static_cast<ulint>(page_len - comp_len);
792 
793       err = os_file_punch_hole(file, offset, hole_size);
794       if (err != DB_SUCCESS) {
795         break;
796       }
797     }
798     start_off += page_len;
799     buffer += page_len;
800     len -= page_len;
801   }
802   /* Must have consumed all data. */
803   ut_ad(err != DB_SUCCESS || len == 0);
804   return (err);
805 }
806 
modify_and_write(const Clone_Task * task,uint64_t offset,unsigned char * buffer,uint32_t buf_len)807 int Clone_Handle::modify_and_write(const Clone_Task *task, uint64_t offset,
808                                    unsigned char *buffer, uint32_t buf_len) {
809   ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
810 
811   auto snapshot = m_clone_task_manager.get_snapshot();
812   auto file_meta = snapshot->get_file_by_index(task->m_current_file_index);
813 
814   bool encryption = (file_meta->m_encrypt_type != Encryption::NONE);
815 
816   if (encryption) {
817     bool success = true;
818 
819     bool is_page_copy = (snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
820     bool key_page = (is_page_copy && offset == 0);
821 
822     bool is_log_file = (snapshot->get_state() == CLONE_SNAPSHOT_REDO_COPY);
823     bool key_log = (is_log_file && file_meta->m_file_index == 0 && offset == 0);
824 
825     if (key_page) {
826       /* Encrypt tablespace key with master key for encrypted tablespace. */
827       page_size_t page_size(file_meta->m_fsp_flags);
828       success = snapshot->encrypt_key_in_header(page_size, buffer);
829 
830     } else if (key_log) {
831       /* Encrypt redo log key with master key */
832       success = snapshot->encrypt_key_in_log_header(buffer, buf_len);
833     }
834     if (!success) {
835       ut_ad(false);
836       int err = ER_INTERNAL_ERROR;
837       my_error(err, MYF(0), "Innodb Clone Apply Failed to Encrypt Key");
838       return (err);
839     }
840   }
841 
842   /* No more compression/encryption is needed. */
843   IORequest request(IORequest::WRITE);
844   request.disable_compression();
845   request.clear_encrypted();
846 
847   /* Write buffer to file. */
848   errno = 0;
849   auto db_err =
850       os_file_write(request, "Clone data file", task->m_current_file_des,
851                     reinterpret_cast<char *>(buffer), offset, buf_len);
852   if (db_err != DB_SUCCESS) {
853     char errbuf[MYSYS_STRERROR_SIZE];
854     my_error(ER_ERROR_ON_WRITE, MYF(0), file_meta->m_file_name, errno,
855              my_strerror(errbuf, sizeof(errbuf), errno));
856 
857     return (ER_ERROR_ON_WRITE);
858   }
859 
860   /* Attempt to punch holes if page compression is enabled. */
861   if (file_meta->m_punch_hole) {
862     page_size_t page_size(file_meta->m_fsp_flags);
863 
864     ut_ad(file_meta->m_compress_type != Compression::NONE ||
865           file_meta->m_file_size > file_meta->m_alloc_size);
866     ut_ad(IORequest::is_punch_hole_supported());
867     ut_ad(!page_size.is_compressed());
868 
869     auto page_length = page_size.physical();
870     auto start_offset = offset;
871 
872     ut_a(buf_len >= page_length);
873     /* Skip first page */
874     if (start_offset == 0) {
875       start_offset += page_length;
876       buffer += page_length;
877       buf_len -= page_length;
878     }
879     auto db_err = punch_holes(task->m_current_file_des.m_file, buffer, buf_len,
880                               start_offset, page_length,
881                               static_cast<uint32_t>(file_meta->m_fsblk_size));
882     if (db_err != DB_SUCCESS) {
883       ut_ad(db_err == DB_IO_NO_PUNCH_HOLE);
884       ib::info(ER_IB_CLONE_PUNCH_HOLE)
885           << "Innodb Clone Apply failed to punch hole: "
886           << file_meta->m_file_name;
887       file_meta->m_punch_hole = false;
888     }
889   }
890   return (0);
891 }
892 
receive_data(Clone_Task * task,uint64_t offset,uint64_t file_size,uint32_t size,Ha_clone_cbk * callback)893 int Clone_Handle::receive_data(Clone_Task *task, uint64_t offset,
894                                uint64_t file_size, uint32_t size,
895                                Ha_clone_cbk *callback) {
896   ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
897 
898   auto snapshot = m_clone_task_manager.get_snapshot();
899 
900   auto file_meta = snapshot->get_file_by_index(task->m_current_file_index);
901 
902   bool is_page_copy = (snapshot->get_state() == CLONE_SNAPSHOT_PAGE_COPY);
903   bool is_log_file = (snapshot->get_state() == CLONE_SNAPSHOT_REDO_COPY);
904 
905   /* During page and redo copy, we encrypt the key in header page. */
906   bool key_page = (is_page_copy && offset == 0);
907   bool key_log = (is_log_file && file_meta->m_file_index == 0 && offset == 0);
908 
909   if (key_page) {
910     /* Check and update file size for space header page */
911     if (file_meta->m_file_size < file_size) {
912       snapshot->update_file_size(task->m_current_file_index, file_size);
913     }
914   }
915 
916   auto file_type = OS_CLONE_DATA_FILE;
917 
918   if (is_log_file || is_page_copy ||
919       file_meta->m_space_id == dict_sys_t::s_invalid_space_id) {
920     file_type = OS_CLONE_LOG_FILE;
921   }
922 
923   /* Open destination file for first block. */
924   if (task->m_current_file_des.m_file == OS_FILE_CLOSED) {
925     ut_ad(file_meta != nullptr);
926 
927     auto err = open_file(task, file_meta, file_type, true, false);
928 
929     if (err != 0) {
930       /* Save error with file name. */
931       m_clone_task_manager.set_error(err, file_meta->m_file_name);
932       return (err);
933     }
934   }
935 
936   ut_ad(task->m_current_file_index == file_meta->m_file_index);
937 
938   /* Copy data to current destination file using callback. */
939   char errbuf[MYSYS_STRERROR_SIZE];
940 
941   auto file_hdl = task->m_current_file_des.m_file;
942   auto success = os_file_seek(nullptr, file_hdl, offset);
943 
944   if (!success) {
945     my_error(ER_ERROR_ON_READ, MYF(0), file_meta->m_file_name, errno,
946              my_strerror(errbuf, sizeof(errbuf), errno));
947     /* Save error with file name. */
948     m_clone_task_manager.set_error(ER_ERROR_ON_READ, file_meta->m_file_name);
949     return (ER_ERROR_ON_READ);
950   }
951 
952   if (task->m_file_cache) {
953     callback->set_os_buffer_cache();
954     /* For data file recommend zero copy for cached IO. */
955     if (!is_log_file) {
956       callback->set_zero_copy();
957     }
958   }
959 
960   callback->set_dest_name(file_meta->m_file_name);
961 
962   bool modify_buffer = false;
963 
964   /* In case of page compression we need to punch hole. */
965   if (file_meta->m_punch_hole) {
966     ut_ad(!is_log_file);
967     modify_buffer = true;
968   }
969 
970   /* We need to encrypt the tablespace key by master key. */
971   if (file_meta->m_encrypt_type != Encryption::NONE && (key_page || key_log)) {
972     modify_buffer = true;
973   }
974   auto err = file_callback(callback, task, size, modify_buffer, offset
975 #ifdef UNIV_PFS_IO
976                            ,
977                            __FILE__, __LINE__
978 #endif /* UNIV_PFS_IO */
979   );
980 
981   task->m_data_size += size;
982 
983   if (err != 0) {
984     /* Save error with file name. */
985     m_clone_task_manager.set_error(err, file_meta->m_file_name);
986   }
987   return (err);
988 }
989 
apply_data(Clone_Task * task,Ha_clone_cbk * callback)990 int Clone_Handle::apply_data(Clone_Task *task, Ha_clone_cbk *callback) {
991   ut_ad(m_clone_handle_type == CLONE_HDL_APPLY);
992 
993   /* Extract the data descriptor. */
994   uint desc_len = 0;
995   auto serial_desc = callback->get_data_desc(&desc_len);
996 
997   Clone_Desc_Data data_desc;
998   auto success = data_desc.deserialize(serial_desc, desc_len);
999 
1000   if (!success) {
1001     ut_ad(false);
1002     int err = ER_CLONE_PROTOCOL;
1003     my_error(err, MYF(0), "Wrong Clone RPC: Invalid Data Descriptor");
1004     return (err);
1005   }
1006   /* Identify the task for the current block of data. */
1007   int err = 0;
1008   auto task_meta = &data_desc.m_task_meta;
1009 
1010   /* The data is from a different file. Close the current one. */
1011   if (task->m_current_file_index != data_desc.m_file_index) {
1012     err = close_file(task);
1013     if (err != 0) {
1014       return (err);
1015     }
1016     task->m_current_file_index = data_desc.m_file_index;
1017   }
1018 
1019   /* Receive data from callback and apply. */
1020   err = receive_data(task, data_desc.m_file_offset, data_desc.m_file_size,
1021                      data_desc.m_data_len, callback);
1022 
1023   /* Close file in case of error. */
1024   if (err != 0) {
1025     close_file(task);
1026   } else {
1027     err = m_clone_task_manager.set_chunk(task, task_meta);
1028   }
1029 
1030   return (err);
1031 }
1032 
apply(THD * thd,uint task_id,Ha_clone_cbk * callback)1033 int Clone_Handle::apply(THD *thd, uint task_id, Ha_clone_cbk *callback) {
1034   int err = 0;
1035   uint desc_len = 0;
1036 
1037   auto clone_desc = callback->get_data_desc(&desc_len);
1038   ut_ad(clone_desc != nullptr);
1039 
1040   Clone_Desc_Header header;
1041   auto success = header.deserialize(clone_desc, desc_len);
1042 
1043   if (!success) {
1044     ut_ad(false);
1045     err = ER_CLONE_PROTOCOL;
1046     my_error(err, MYF(0), "Wrong Clone RPC: Invalid Descriptor Header");
1047     return (err);
1048   }
1049 
1050   /* Check the descriptor type in header and apply */
1051   auto task = m_clone_task_manager.get_task_by_index(task_id);
1052 
1053   switch (header.m_type) {
1054     case CLONE_DESC_TASK_METADATA:
1055       err = apply_task_metadata(task, callback);
1056       break;
1057 
1058     case CLONE_DESC_STATE:
1059       err = apply_state_metadata(task, callback);
1060       break;
1061 
1062     case CLONE_DESC_FILE_METADATA:
1063       err = apply_file_metadata(task, callback);
1064       break;
1065 
1066     case CLONE_DESC_DATA:
1067       err = apply_data(task, callback);
1068       break;
1069 
1070     default:
1071       ut_ad(false);
1072       break;
1073   }
1074 
1075   if (err != 0) {
1076     close_file(task);
1077   }
1078 
1079   return (err);
1080 }
1081 
restart_apply(THD * thd,const byte * & loc,uint & loc_len)1082 int Clone_Handle::restart_apply(THD *thd, const byte *&loc, uint &loc_len) {
1083   auto init_loc = m_restart_loc;
1084   auto init_len = m_restart_loc_len;
1085   auto alloc_len = m_restart_loc_len;
1086 
1087   /* Get latest locator */
1088   loc = get_locator(loc_len);
1089 
1090   m_clone_task_manager.reinit_apply_state(loc, loc_len, init_loc, init_len,
1091                                           alloc_len);
1092 
1093   /* Return the original locator if no state information */
1094   if (init_loc == nullptr) {
1095     return (0);
1096   }
1097 
1098   loc = init_loc;
1099   loc_len = init_len;
1100 
1101   /* Reset restart loc buffer if newly allocated */
1102   if (alloc_len > m_restart_loc_len) {
1103     m_restart_loc = init_loc;
1104     m_restart_loc_len = alloc_len;
1105   }
1106 
1107   ut_ad(loc == m_restart_loc);
1108 
1109   auto master_task = m_clone_task_manager.get_task_by_index(0);
1110 
1111   auto err = close_file(master_task);
1112 
1113   return (err);
1114 }
1115 
update_file_size(uint32_t file_index,uint64_t file_size)1116 void Clone_Snapshot::update_file_size(uint32_t file_index, uint64_t file_size) {
1117   /* Update file size when file is extended during page copy */
1118   ut_ad(m_snapshot_state == CLONE_SNAPSHOT_PAGE_COPY);
1119 
1120   auto cur_file = get_file_by_index(file_index);
1121 
1122   while (file_size > cur_file->m_file_size) {
1123     ++file_index;
1124 
1125     if (file_index >= m_num_data_files) {
1126       /* Update file size for the last file. */
1127       cur_file->m_file_size = file_size;
1128       break;
1129     }
1130 
1131     auto next_file = get_file_by_index(file_index);
1132 
1133     if (next_file->m_space_id != cur_file->m_space_id) {
1134       /* Update file size for the last file. */
1135       cur_file->m_file_size = file_size;
1136       break;
1137     }
1138 
1139     /* Only system tablespace can have multiple nodes. */
1140     ut_ad(cur_file->m_space_id == 0);
1141 
1142     file_size -= cur_file->m_file_size;
1143     cur_file = next_file;
1144   }
1145 }
1146 
init_apply_state(Clone_Desc_State * state_desc)1147 int Clone_Snapshot::init_apply_state(Clone_Desc_State *state_desc) {
1148   set_state_info(state_desc);
1149 
1150   int err = 0;
1151   switch (m_snapshot_state) {
1152     case CLONE_SNAPSHOT_FILE_COPY:
1153       ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State FILE COPY: ";
1154       break;
1155 
1156     case CLONE_SNAPSHOT_PAGE_COPY:
1157       ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State PAGE COPY: ";
1158       break;
1159 
1160     case CLONE_SNAPSHOT_REDO_COPY:
1161       ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State REDO COPY: ";
1162       break;
1163 
1164     case CLONE_SNAPSHOT_DONE:
1165       /* Extend and flush data files. */
1166       ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State FLUSH DATA: ";
1167       err = extend_and_flush_files(false);
1168       if (err != 0) {
1169         ib::info(ER_IB_CLONE_OPERATION)
1170             << "Clone Apply FLUSH DATA failed code: " << err;
1171         break;
1172       }
1173       /* Flush redo files. */
1174       ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State FLUSH REDO: ";
1175       err = extend_and_flush_files(true);
1176       if (err != 0) {
1177         ib::info(ER_IB_CLONE_OPERATION)
1178             << "Clone Apply FLUSH REDO failed code: " << err;
1179         break;
1180       }
1181       ib::info(ER_IB_CLONE_OPERATION) << "Clone Apply State DONE";
1182       break;
1183 
1184     case CLONE_SNAPSHOT_NONE:
1185     case CLONE_SNAPSHOT_INIT:
1186     default:
1187       ut_ad(false);
1188       err = ER_INTERNAL_ERROR;
1189       my_error(err, MYF(0), "Innodb Clone Snapshot Invalid state");
1190       break;
1191   }
1192   return (err);
1193 }
1194 
extend_and_flush_files(bool flush_redo)1195 int Clone_Snapshot::extend_and_flush_files(bool flush_redo) {
1196   auto &file_vector = (flush_redo) ? m_redo_file_vector : m_data_file_vector;
1197 
1198   for (auto file_meta : file_vector) {
1199     char errbuf[MYSYS_STRERROR_SIZE];
1200     bool success = true;
1201 
1202     auto file =
1203         os_file_create(innodb_clone_file_key, file_meta->m_file_name,
1204                        OS_FILE_OPEN | OS_FILE_ON_ERROR_NO_EXIT, OS_FILE_NORMAL,
1205                        OS_CLONE_DATA_FILE, false, &success);
1206 
1207     if (!success) {
1208       my_error(ER_CANT_OPEN_FILE, MYF(0), file_meta->m_file_name, errno,
1209                my_strerror(errbuf, sizeof(errbuf), errno));
1210 
1211       return (ER_CANT_OPEN_FILE);
1212     }
1213 
1214     auto file_size = os_file_get_size(file);
1215 
1216     if (file_size < file_meta->m_file_size) {
1217       success = os_file_set_size(file_meta->m_file_name, file, file_size,
1218                                  file_meta->m_file_size, false, true);
1219     } else {
1220       success = os_file_flush(file);
1221     }
1222 
1223     os_file_close(file);
1224 
1225     if (!success) {
1226       my_error(ER_ERROR_ON_WRITE, MYF(0), file_meta->m_file_name, errno,
1227                my_strerror(errbuf, sizeof(errbuf), errno));
1228 
1229       return (ER_ERROR_ON_WRITE);
1230     }
1231   }
1232   return (0);
1233 }
1234