1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 /**
24 @file clone/src/clone_local.cc
25 Clone Plugin: Local clone implementation
26
27 */
28
29 #include "plugin/clone/include/clone_local.h"
30 #include "plugin/clone/include/clone_os.h"
31
32 #include "sql/sql_thd_internal_api.h"
33
34 /* Namespace for all clone data types */
35 namespace myclone {
36
37 /** Start concurrent clone operation.
38 @param[in] share shared client information
39 @param[in] server shared server handle
40 @param[in] index index of current thread */
clone_local(Client_Share * share,Server * server,uint32_t index)41 static void clone_local(Client_Share *share, Server *server, uint32_t index) {
42 THD *thd = nullptr;
43
44 /* Create a session statement and set PFS keys */
45 mysql_service_clone_protocol->mysql_clone_start_statement(
46 thd, clone_local_thd_key, PSI_NOT_INSTRUMENTED);
47
48 Local clone_inst(thd, server, share, index, false);
49
50 /* Worker task has already reported the error. We ignore any error
51 returned here. */
52 static_cast<void>(clone_inst.clone_exec());
53
54 /* Drop the statement and session */
55 mysql_service_clone_protocol->mysql_clone_finish_statement(thd);
56 }
57
Local(THD * thd,Server * server,Client_Share * share,uint32_t index,bool is_master)58 Local::Local(THD *thd, Server *server, Client_Share *share, uint32_t index,
59 bool is_master)
60 : m_clone_server(server), m_clone_client(thd, share, index, is_master) {}
61
clone()62 int Local::clone() {
63 /* Begin PFS state if no concurrent clone in progress. */
64 auto err = m_clone_client.pfs_begin_state();
65 if (err != 0) {
66 return (err);
67 }
68
69 /* Move to first stage. */
70 m_clone_client.pfs_change_stage(0);
71
72 /* Execute clone */
73 err = clone_exec();
74
75 /* End PFS table state. */
76 const char *err_mesg = nullptr;
77 uint32_t err_number = 0;
78 auto thd = m_clone_client.get_thd();
79
80 mysql_service_clone_protocol->mysql_clone_get_error(thd, &err_number,
81 &err_mesg);
82 m_clone_client.pfs_end_state(err_number, err_mesg);
83 return (err);
84 }
85
clone_exec()86 int Local::clone_exec() {
87 auto thd = m_clone_client.get_thd();
88 auto dir_name = m_clone_client.get_data_dir();
89 auto is_master = m_clone_client.is_master();
90 auto acquire_backup_lock = (is_master && clone_ddl_timeout != 0);
91 auto num_workers = m_clone_client.get_max_concurrency() - 1;
92
93 auto &client_vector = m_clone_client.get_storage_vector();
94 auto &client_tasks = m_clone_client.get_task_vector();
95 auto &server_vector = m_clone_server->get_storage_vector();
96
97 Task_Vector server_tasks;
98 server_tasks.reserve(MAX_CLONE_STORAGE_ENGINE);
99
100 /* Acquire DDL lock. Wait for 5 minutes by default. */
101 if (acquire_backup_lock) {
102 auto failed = mysql_service_mysql_backup_lock->acquire(
103 thd, BACKUP_LOCK_SERVICE_DEFAULT, clone_ddl_timeout);
104
105 if (failed) {
106 return (ER_LOCK_WAIT_TIMEOUT);
107 }
108 }
109
110 auto begin_mode = is_master ? HA_CLONE_MODE_START : HA_CLONE_MODE_ADD_TASK;
111
112 /* Begin clone copy from source. */
113 auto error = hton_clone_begin(thd, server_vector, server_tasks,
114 HA_CLONE_HYBRID, begin_mode);
115
116 if (error != 0) {
117 /* Release DDL lock */
118 if (acquire_backup_lock) {
119 mysql_service_mysql_backup_lock->release(thd);
120 }
121 return (error);
122 }
123
124 /* Spawn parallel threads for clone */
125 if (is_master) {
126 /* Copy Server locators to Client. */
127 client_vector = server_vector;
128
129 /* Begin clone apply to destination. */
130 error = hton_clone_apply_begin(thd, dir_name, client_vector, client_tasks,
131 begin_mode);
132
133 if (error != 0) {
134 hton_clone_end(thd, server_vector, server_tasks, error);
135
136 /* Release DDL lock */
137 if (acquire_backup_lock) {
138 mysql_service_mysql_backup_lock->release(thd);
139 }
140 return (error);
141 }
142
143 /* Spawn concurrent client tasks if auto tuning is OFF. */
144 if (!clone_autotune_concurrency) {
145 /* Limit number of workers based on other configurations. */
146 auto to_spawn = m_clone_client.limit_workers(num_workers);
147 using namespace std::placeholders;
148 auto func = std::bind(clone_local, _1, m_clone_server, _2);
149 m_clone_client.spawn_workers(to_spawn, func);
150 }
151
152 } else {
153 /* Begin clone apply to destination. For auxiliary threads,
154 use server storage locator with current copy state.
155 1. Auxiliary threads don't overwrite the locator in apply begin
156 2. Auxiliary threads must wait for apply state to reach
157 copy state */
158 error = hton_clone_apply_begin(thd, dir_name, server_vector, client_tasks,
159 begin_mode);
160 if (error != 0) {
161 hton_clone_end(thd, server_vector, server_tasks, error);
162 return (error);
163 }
164 }
165
166 Ha_clone_cbk *clone_callback = new Local_Callback(this);
167
168 auto buffer_size = m_clone_client.limit_buffer(clone_buffer_size);
169 clone_callback->set_client_buffer_size(buffer_size);
170
171 /* Copy data from source and apply to destination. */
172 error = hton_clone_copy(thd, server_vector, server_tasks, clone_callback);
173
174 delete clone_callback;
175
176 /* Wait for concurrent tasks to finish */
177 m_clone_client.wait_for_workers();
178
179 /* End clone apply to destination. */
180 hton_clone_apply_end(thd, client_vector, client_tasks, error);
181
182 /* End clone copy from source. */
183 hton_clone_end(thd, server_vector, server_tasks, error);
184
185 /* Release DDL lock */
186 if (acquire_backup_lock) {
187 mysql_service_mysql_backup_lock->release(thd);
188 }
189 return (error);
190 }
191
file_cbk(Ha_clone_file from_file,uint len)192 int Local_Callback::file_cbk(Ha_clone_file from_file, uint len) {
193 DBUG_ASSERT(!m_apply_data);
194
195 /* Set source file to external handle of "Clone Client". */
196 auto ext_link = get_client_data_link();
197
198 ext_link->set_file(from_file, len);
199
200 auto error = apply_data();
201
202 return (error);
203 }
204
buffer_cbk(uchar * from_buffer,uint buf_len)205 int Local_Callback::buffer_cbk(uchar *from_buffer, uint buf_len) {
206 int error = 0;
207
208 if (m_apply_data) {
209 /* Acknowledge data transfer while in apply phase */
210 error = apply_ack();
211 return (error);
212 }
213
214 /* Set source buffer to external handle of "Clone Client". */
215 auto ext_link = get_client_data_link();
216
217 ext_link->set_buffer(from_buffer, buf_len);
218
219 error = apply_data();
220
221 return (error);
222 }
223
apply_ack()224 int Local_Callback::apply_ack() {
225 DBUG_ASSERT(m_apply_data);
226
227 auto client = get_clone_client();
228
229 uint64_t data_estimate = 0;
230 /* Check and update PFS table while beginning state. */
231 if (is_state_change(data_estimate)) {
232 client->pfs_change_stage(data_estimate);
233 return (0);
234 }
235
236 /* Update and reset statistics information at state end. */
237 client->update_stat(true);
238
239 uint loc_len = 0;
240
241 auto hton = get_hton();
242
243 auto server = get_clone_server();
244
245 auto thd = server->get_thd();
246 auto server_loc = server->get_locator(get_loc_index(), loc_len);
247
248 /* Use master task ID = 0 */
249 auto error = hton->clone_interface.clone_ack(hton, thd, server_loc, loc_len,
250 0, 0, this);
251
252 return (error);
253 }
254
apply_data()255 int Local_Callback::apply_data() {
256 uint loc_len = 0;
257
258 auto client = get_clone_client();
259 auto client_loc = client->get_locator(get_loc_index(), loc_len);
260
261 auto hton = get_hton();
262 auto thd = client->get_thd();
263
264 /* Check and abort, if killed */
265 if (thd_killed(thd)) {
266 if (client->is_master()) {
267 my_error(ER_QUERY_INTERRUPTED, MYF(0));
268 }
269
270 return (ER_QUERY_INTERRUPTED);
271 }
272
273 auto &task_vector = client->get_task_vector();
274
275 DBUG_ASSERT(get_loc_index() < task_vector.size());
276 auto task_id = task_vector[get_loc_index()];
277
278 /* Call storage engine to apply the data. */
279 DBUG_ASSERT(!m_apply_data);
280 m_apply_data = true;
281
282 auto error = hton->clone_interface.clone_apply(hton, thd, client_loc, loc_len,
283 task_id, 0, this);
284
285 m_apply_data = false;
286
287 return (error);
288 }
289
apply_buffer_cbk(uchar * & to_buffer,uint & len)290 int Local_Callback::apply_buffer_cbk(uchar *&to_buffer, uint &len) {
291 Ha_clone_file dummy_file;
292 dummy_file.type = Ha_clone_file::FILE_HANDLE;
293 dummy_file.file_handle = nullptr;
294 return (apply_cbk(dummy_file, false, to_buffer, len));
295 }
296
apply_file_cbk(Ha_clone_file to_file)297 int Local_Callback::apply_file_cbk(Ha_clone_file to_file) {
298 uchar *bufp = nullptr;
299 uint buf_len = 0;
300 return (apply_cbk(to_file, true, bufp, buf_len));
301 }
302
apply_cbk(Ha_clone_file to_file,bool apply_file,uchar * & to_buffer,uint & to_len)303 int Local_Callback::apply_cbk(Ha_clone_file to_file, bool apply_file,
304 uchar *&to_buffer, uint &to_len) {
305 int error;
306
307 DBUG_ASSERT(m_apply_data);
308
309 auto client = get_clone_client();
310 auto server = get_clone_server();
311 auto &info = client->get_thread_info();
312
313 /* Update statistics. */
314 auto num_workers = client->update_stat(false);
315
316 /* Spawn new concurrent client tasks, if needed. */
317 using namespace std::placeholders;
318 auto func = std::bind(clone_local, _1, server, _2);
319 client->spawn_workers(num_workers, func);
320
321 auto ext_link = get_client_data_link();
322
323 auto dest_type = ext_link->get_type();
324
325 if (dest_type == CLONE_HANDLE_BUFFER) {
326 auto from_buf = ext_link->get_buffer();
327
328 /* Assert alignment to CLONE_OS_ALIGN for O_DIRECT */
329 DBUG_ASSERT(is_os_buffer_cache() ||
330 from_buf->m_buffer == clone_os_align(from_buf->m_buffer));
331
332 if (apply_file) {
333 error = clone_os_copy_buf_to_file(from_buf->m_buffer, to_file,
334 from_buf->m_length, get_dest_name());
335 } else {
336 error = 0;
337 to_buffer = from_buf->m_buffer;
338 to_len = static_cast<uint>(from_buf->m_length);
339 }
340
341 info.update(from_buf->m_length, 0);
342
343 } else {
344 DBUG_ASSERT(dest_type == CLONE_HANDLE_FILE);
345 uchar *buf_ptr;
346 uint buf_len;
347
348 if (is_os_buffer_cache() && is_zero_copy() &&
349 clone_os_supports_zero_copy()) {
350 buf_ptr = nullptr;
351 buf_len = 0;
352 } else {
353 /* For direct IO use client buffer. */
354 buf_len = client->limit_buffer(clone_buffer_size);
355 buf_ptr = client->get_aligned_buffer(buf_len);
356
357 if (buf_ptr == nullptr) {
358 return (ER_OUTOFMEMORY);
359 }
360 }
361
362 auto from_file = ext_link->get_file();
363
364 if (apply_file) {
365 error = clone_os_copy_file_to_file(from_file->m_file_desc, to_file,
366 from_file->m_length, buf_ptr, buf_len,
367 get_source_name(), get_dest_name());
368 } else {
369 to_len = from_file->m_length;
370 to_buffer = client->get_aligned_buffer(to_len);
371 if (to_buffer == nullptr) {
372 return (ER_OUTOFMEMORY); /* purecov: inspected */
373 }
374
375 error = clone_os_copy_file_to_buf(from_file->m_file_desc, to_buffer,
376 to_len, get_source_name());
377 }
378 info.update(from_file->m_length, 0);
379 }
380
381 /* Check limits and throttle if needed. */
382 client->check_and_throttle();
383
384 return (error);
385 }
386 } // namespace myclone
387