1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /**
24 @file clone/src/clone_local.cc
25 Clone Plugin: Local clone implementation
26 
27 */
28 
29 #include "plugin/clone/include/clone_local.h"
30 #include "plugin/clone/include/clone_os.h"
31 
32 #include "sql/sql_thd_internal_api.h"
33 
34 /* Namespace for all clone data types */
35 namespace myclone {
36 
37 /** Start concurrent clone  operation.
38 @param[in]	share	shared client information
39 @param[in]	server	shared server handle
40 @param[in]	index	index of current thread */
clone_local(Client_Share * share,Server * server,uint32_t index)41 static void clone_local(Client_Share *share, Server *server, uint32_t index) {
42   THD *thd = nullptr;
43 
44   /* Create a session statement and set PFS keys */
45   mysql_service_clone_protocol->mysql_clone_start_statement(
46       thd, clone_local_thd_key, PSI_NOT_INSTRUMENTED);
47 
48   Local clone_inst(thd, server, share, index, false);
49 
50   /* Worker task has already reported the error. We ignore any error
51   returned  here. */
52   static_cast<void>(clone_inst.clone_exec());
53 
54   /* Drop the statement and session */
55   mysql_service_clone_protocol->mysql_clone_finish_statement(thd);
56 }
57 
Local(THD * thd,Server * server,Client_Share * share,uint32_t index,bool is_master)58 Local::Local(THD *thd, Server *server, Client_Share *share, uint32_t index,
59              bool is_master)
60     : m_clone_server(server), m_clone_client(thd, share, index, is_master) {}
61 
clone()62 int Local::clone() {
63   /* Begin PFS state if no concurrent clone in progress. */
64   auto err = m_clone_client.pfs_begin_state();
65   if (err != 0) {
66     return (err);
67   }
68 
69   /* Move to first stage. */
70   m_clone_client.pfs_change_stage(0);
71 
72   /* Execute clone */
73   err = clone_exec();
74 
75   /* End PFS table state. */
76   const char *err_mesg = nullptr;
77   uint32_t err_number = 0;
78   auto thd = m_clone_client.get_thd();
79 
80   mysql_service_clone_protocol->mysql_clone_get_error(thd, &err_number,
81                                                       &err_mesg);
82   m_clone_client.pfs_end_state(err_number, err_mesg);
83   return (err);
84 }
85 
clone_exec()86 int Local::clone_exec() {
87   auto thd = m_clone_client.get_thd();
88   auto dir_name = m_clone_client.get_data_dir();
89   auto is_master = m_clone_client.is_master();
90   auto acquire_backup_lock = (is_master && clone_ddl_timeout != 0);
91   auto num_workers = m_clone_client.get_max_concurrency() - 1;
92 
93   auto &client_vector = m_clone_client.get_storage_vector();
94   auto &client_tasks = m_clone_client.get_task_vector();
95   auto &server_vector = m_clone_server->get_storage_vector();
96 
97   Task_Vector server_tasks;
98   server_tasks.reserve(MAX_CLONE_STORAGE_ENGINE);
99 
100   /* Acquire DDL lock. Wait for 5 minutes by default. */
101   if (acquire_backup_lock) {
102     auto failed = mysql_service_mysql_backup_lock->acquire(
103         thd, BACKUP_LOCK_SERVICE_DEFAULT, clone_ddl_timeout);
104 
105     if (failed) {
106       return (ER_LOCK_WAIT_TIMEOUT);
107     }
108   }
109 
110   auto begin_mode = is_master ? HA_CLONE_MODE_START : HA_CLONE_MODE_ADD_TASK;
111 
112   /* Begin clone copy from source. */
113   auto error = hton_clone_begin(thd, server_vector, server_tasks,
114                                 HA_CLONE_HYBRID, begin_mode);
115 
116   if (error != 0) {
117     /* Release DDL lock */
118     if (acquire_backup_lock) {
119       mysql_service_mysql_backup_lock->release(thd);
120     }
121     return (error);
122   }
123 
124   /* Spawn parallel threads for clone */
125   if (is_master) {
126     /* Copy Server locators to Client. */
127     client_vector = server_vector;
128 
129     /* Begin clone apply to destination. */
130     error = hton_clone_apply_begin(thd, dir_name, client_vector, client_tasks,
131                                    begin_mode);
132 
133     if (error != 0) {
134       hton_clone_end(thd, server_vector, server_tasks, error);
135 
136       /* Release DDL lock */
137       if (acquire_backup_lock) {
138         mysql_service_mysql_backup_lock->release(thd);
139       }
140       return (error);
141     }
142 
143     /* Spawn concurrent client tasks if auto tuning is OFF. */
144     if (!clone_autotune_concurrency) {
145       /* Limit number of workers based on other configurations. */
146       auto to_spawn = m_clone_client.limit_workers(num_workers);
147       using namespace std::placeholders;
148       auto func = std::bind(clone_local, _1, m_clone_server, _2);
149       m_clone_client.spawn_workers(to_spawn, func);
150     }
151 
152   } else {
153     /* Begin clone apply to destination. For auxiliary threads,
154     use server storage locator with current copy state.
155     1. Auxiliary threads don't overwrite the locator in apply begin
156     2. Auxiliary threads must wait for apply state to reach
157     copy state */
158     error = hton_clone_apply_begin(thd, dir_name, server_vector, client_tasks,
159                                    begin_mode);
160     if (error != 0) {
161       hton_clone_end(thd, server_vector, server_tasks, error);
162       return (error);
163     }
164   }
165 
166   Ha_clone_cbk *clone_callback = new Local_Callback(this);
167 
168   auto buffer_size = m_clone_client.limit_buffer(clone_buffer_size);
169   clone_callback->set_client_buffer_size(buffer_size);
170 
171   /* Copy data from source and apply to destination. */
172   error = hton_clone_copy(thd, server_vector, server_tasks, clone_callback);
173 
174   delete clone_callback;
175 
176   /* Wait for concurrent tasks to finish */
177   m_clone_client.wait_for_workers();
178 
179   /* End clone apply to destination. */
180   hton_clone_apply_end(thd, client_vector, client_tasks, error);
181 
182   /* End clone copy from source. */
183   hton_clone_end(thd, server_vector, server_tasks, error);
184 
185   /* Release DDL lock */
186   if (acquire_backup_lock) {
187     mysql_service_mysql_backup_lock->release(thd);
188   }
189   return (error);
190 }
191 
file_cbk(Ha_clone_file from_file,uint len)192 int Local_Callback::file_cbk(Ha_clone_file from_file, uint len) {
193   DBUG_ASSERT(!m_apply_data);
194 
195   /* Set source file to external handle of "Clone Client". */
196   auto ext_link = get_client_data_link();
197 
198   ext_link->set_file(from_file, len);
199 
200   auto error = apply_data();
201 
202   return (error);
203 }
204 
buffer_cbk(uchar * from_buffer,uint buf_len)205 int Local_Callback::buffer_cbk(uchar *from_buffer, uint buf_len) {
206   int error = 0;
207 
208   if (m_apply_data) {
209     /* Acknowledge data transfer while in apply phase */
210     error = apply_ack();
211     return (error);
212   }
213 
214   /* Set source buffer to external handle of "Clone Client". */
215   auto ext_link = get_client_data_link();
216 
217   ext_link->set_buffer(from_buffer, buf_len);
218 
219   error = apply_data();
220 
221   return (error);
222 }
223 
apply_ack()224 int Local_Callback::apply_ack() {
225   DBUG_ASSERT(m_apply_data);
226 
227   auto client = get_clone_client();
228 
229   uint64_t data_estimate = 0;
230   /* Check and update PFS table while beginning state. */
231   if (is_state_change(data_estimate)) {
232     client->pfs_change_stage(data_estimate);
233     return (0);
234   }
235 
236   /* Update and reset statistics information at state end. */
237   client->update_stat(true);
238 
239   uint loc_len = 0;
240 
241   auto hton = get_hton();
242 
243   auto server = get_clone_server();
244 
245   auto thd = server->get_thd();
246   auto server_loc = server->get_locator(get_loc_index(), loc_len);
247 
248   /* Use master task ID = 0 */
249   auto error = hton->clone_interface.clone_ack(hton, thd, server_loc, loc_len,
250                                                0, 0, this);
251 
252   return (error);
253 }
254 
apply_data()255 int Local_Callback::apply_data() {
256   uint loc_len = 0;
257 
258   auto client = get_clone_client();
259   auto client_loc = client->get_locator(get_loc_index(), loc_len);
260 
261   auto hton = get_hton();
262   auto thd = client->get_thd();
263 
264   /* Check and abort, if killed */
265   if (thd_killed(thd)) {
266     if (client->is_master()) {
267       my_error(ER_QUERY_INTERRUPTED, MYF(0));
268     }
269 
270     return (ER_QUERY_INTERRUPTED);
271   }
272 
273   auto &task_vector = client->get_task_vector();
274 
275   DBUG_ASSERT(get_loc_index() < task_vector.size());
276   auto task_id = task_vector[get_loc_index()];
277 
278   /* Call storage engine to apply the data. */
279   DBUG_ASSERT(!m_apply_data);
280   m_apply_data = true;
281 
282   auto error = hton->clone_interface.clone_apply(hton, thd, client_loc, loc_len,
283                                                  task_id, 0, this);
284 
285   m_apply_data = false;
286 
287   return (error);
288 }
289 
apply_buffer_cbk(uchar * & to_buffer,uint & len)290 int Local_Callback::apply_buffer_cbk(uchar *&to_buffer, uint &len) {
291   Ha_clone_file dummy_file;
292   dummy_file.type = Ha_clone_file::FILE_HANDLE;
293   dummy_file.file_handle = nullptr;
294   return (apply_cbk(dummy_file, false, to_buffer, len));
295 }
296 
apply_file_cbk(Ha_clone_file to_file)297 int Local_Callback::apply_file_cbk(Ha_clone_file to_file) {
298   uchar *bufp = nullptr;
299   uint buf_len = 0;
300   return (apply_cbk(to_file, true, bufp, buf_len));
301 }
302 
apply_cbk(Ha_clone_file to_file,bool apply_file,uchar * & to_buffer,uint & to_len)303 int Local_Callback::apply_cbk(Ha_clone_file to_file, bool apply_file,
304                               uchar *&to_buffer, uint &to_len) {
305   int error;
306 
307   DBUG_ASSERT(m_apply_data);
308 
309   auto client = get_clone_client();
310   auto server = get_clone_server();
311   auto &info = client->get_thread_info();
312 
313   /* Update statistics. */
314   auto num_workers = client->update_stat(false);
315 
316   /* Spawn new concurrent client tasks, if needed. */
317   using namespace std::placeholders;
318   auto func = std::bind(clone_local, _1, server, _2);
319   client->spawn_workers(num_workers, func);
320 
321   auto ext_link = get_client_data_link();
322 
323   auto dest_type = ext_link->get_type();
324 
325   if (dest_type == CLONE_HANDLE_BUFFER) {
326     auto from_buf = ext_link->get_buffer();
327 
328     /* Assert alignment to CLONE_OS_ALIGN for O_DIRECT */
329     DBUG_ASSERT(is_os_buffer_cache() ||
330                 from_buf->m_buffer == clone_os_align(from_buf->m_buffer));
331 
332     if (apply_file) {
333       error = clone_os_copy_buf_to_file(from_buf->m_buffer, to_file,
334                                         from_buf->m_length, get_dest_name());
335     } else {
336       error = 0;
337       to_buffer = from_buf->m_buffer;
338       to_len = static_cast<uint>(from_buf->m_length);
339     }
340 
341     info.update(from_buf->m_length, 0);
342 
343   } else {
344     DBUG_ASSERT(dest_type == CLONE_HANDLE_FILE);
345     uchar *buf_ptr;
346     uint buf_len;
347 
348     if (is_os_buffer_cache() && is_zero_copy() &&
349         clone_os_supports_zero_copy()) {
350       buf_ptr = nullptr;
351       buf_len = 0;
352     } else {
353       /* For direct IO use client buffer. */
354       buf_len = client->limit_buffer(clone_buffer_size);
355       buf_ptr = client->get_aligned_buffer(buf_len);
356 
357       if (buf_ptr == nullptr) {
358         return (ER_OUTOFMEMORY);
359       }
360     }
361 
362     auto from_file = ext_link->get_file();
363 
364     if (apply_file) {
365       error = clone_os_copy_file_to_file(from_file->m_file_desc, to_file,
366                                          from_file->m_length, buf_ptr, buf_len,
367                                          get_source_name(), get_dest_name());
368     } else {
369       to_len = from_file->m_length;
370       to_buffer = client->get_aligned_buffer(to_len);
371       if (to_buffer == nullptr) {
372         return (ER_OUTOFMEMORY); /* purecov: inspected */
373       }
374 
375       error = clone_os_copy_file_to_buf(from_file->m_file_desc, to_buffer,
376                                         to_len, get_source_name());
377     }
378     info.update(from_file->m_length, 0);
379   }
380 
381   /* Check limits and throttle if needed. */
382   client->check_and_throttle();
383 
384   return (error);
385 }
386 }  // namespace myclone
387