1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 /**
24 @file clone/src/clone_server.cc
25 Clone Plugin: Server implementation
26
27 */
28
29 #include "plugin/clone/include/clone_server.h"
30 #include "plugin/clone/include/clone_status.h"
31
32 #include "my_byteorder.h"
33
34 /* Namespace for all clone data types */
35 namespace myclone {
36
37 /** All configuration parameters to be validated. */
38 Key_Values Server::s_configs = {{"version", ""},
39 {"version_compile_machine", ""},
40 {"version_compile_os", ""},
41 {"character_set_server", ""},
42 {"character_set_filesystem", ""},
43 {"collation_server", ""},
44 {"innodb_page_size", ""}};
45
Server(THD * thd,MYSQL_SOCKET socket)46 Server::Server(THD *thd, MYSQL_SOCKET socket)
47 : m_server_thd(thd),
48 m_is_master(false),
49 m_storage_initialized(false),
50 m_pfs_initialized(false),
51 m_acquired_backup_lock(false),
52 m_protocol_version(CLONE_PROTOCOL_VERSION),
53 m_client_ddl_timeout() {
54 m_ext_link.set_socket(socket);
55 m_storage_vec.reserve(MAX_CLONE_STORAGE_ENGINE);
56
57 m_tasks.reserve(MAX_CLONE_STORAGE_ENGINE);
58
59 m_copy_buff.init();
60 m_res_buff.init();
61 }
62
~Server()63 Server::~Server() {
64 DBUG_ASSERT(!m_storage_initialized);
65 m_copy_buff.free();
66 m_res_buff.free();
67 }
68
clone()69 int Server::clone() {
70 int err = 0;
71
72 while (true) {
73 uchar command;
74 uchar *com_buf;
75 size_t com_len;
76
77 err = mysql_service_clone_protocol->mysql_clone_get_command(
78 get_thd(), &command, &com_buf, &com_len);
79
80 bool done = true;
81
82 if (err == 0) {
83 err = parse_command_buffer(command, com_buf, com_len, done);
84 }
85
86 if (err == 0 && thd_killed(get_thd())) {
87 my_error(ER_QUERY_INTERRUPTED, MYF(0));
88 err = ER_QUERY_INTERRUPTED;
89 }
90
91 /* Send status to client */
92 err = send_status(err);
93
94 if (done || err != 0) {
95 if (m_storage_initialized) {
96 DBUG_ASSERT(err != 0);
97 /* Don't abort clone if worker thread fails during attach. */
98 int in_err = (command == COM_ATTACH) ? 0 : err;
99
100 hton_clone_end(get_thd(), get_storage_vector(), m_tasks, in_err);
101 m_storage_initialized = false;
102 }
103 /* Release if we have acquired backup lock */
104 if (m_acquired_backup_lock) {
105 DBUG_ASSERT(m_is_master);
106 mysql_service_mysql_backup_lock->release(get_thd());
107 }
108 break;
109 }
110 }
111
112 log_error(get_thd(), false, err, "Exiting clone protocol");
113 return (err);
114 }
115
send_status(int err)116 int Server::send_status(int err) {
117 uchar res_cmd;
118 char info_mesg[128];
119
120 if (err == 0) {
121 /* Send complete response */
122 res_cmd = static_cast<uchar>(COM_RES_COMPLETE);
123
124 err = mysql_service_clone_protocol->mysql_clone_send_response(
125 get_thd(), false, &res_cmd, sizeof(res_cmd));
126 log_error(get_thd(), false, err, "COM_RES_COMPLETE");
127
128 } else {
129 /* Send Error Response */
130 res_cmd = static_cast<uchar>(COM_RES_ERROR);
131
132 snprintf(info_mesg, 128, "Before sending COM_RES_ERROR: %s",
133 is_network_error(err) ? "network " : " ");
134 log_error(get_thd(), false, err, &info_mesg[0]);
135
136 err = mysql_service_clone_protocol->mysql_clone_send_error(
137 get_thd(), res_cmd, is_network_error(err));
138 log_error(get_thd(), false, err, "After sending COM_RES_ERROR");
139 }
140
141 return (err);
142 }
143
init_storage(Ha_clone_mode mode,uchar * com_buf,size_t com_len)144 int Server::init_storage(Ha_clone_mode mode, uchar *com_buf, size_t com_len) {
145 auto thd = get_thd();
146
147 DBUG_ASSERT(thd != nullptr);
148 DBUG_ASSERT(!m_pfs_initialized);
149
150 auto err = deserialize_init_buffer(com_buf, com_len);
151
152 if (err != 0) {
153 return (err);
154 }
155
156 if (m_is_master) {
157 /* Set statement type for master thread */
158 mysql_service_clone_protocol->mysql_clone_start_statement(
159 thd, PSI_NOT_INSTRUMENTED, clone_stmt_server_key);
160
161 /* Acquire backup lock */
162 if (m_client_ddl_timeout != 0) {
163 auto failed = mysql_service_mysql_backup_lock->acquire(
164 thd, BACKUP_LOCK_SERVICE_DEFAULT, m_client_ddl_timeout);
165
166 if (failed) {
167 return (ER_LOCK_WAIT_TIMEOUT);
168 }
169 m_acquired_backup_lock = true;
170 log_error(get_thd(), false, 0, "Acquired backup lock");
171 }
172 }
173 m_pfs_initialized = true;
174
175 /* Get server locators */
176 err = hton_clone_begin(get_thd(), get_storage_vector(), m_tasks,
177 HA_CLONE_HYBRID, mode);
178 if (err != 0) {
179 return (err);
180 }
181 m_storage_initialized = true;
182
183 if (m_is_master && mode == HA_CLONE_MODE_START) {
184 /* Validate local configurations. */
185 err = validate_local_params(get_thd());
186
187 if (err == 0) {
188 /* Send current server parameters for validation. */
189 err = send_params();
190 }
191
192 if (err != 0) {
193 return (err);
194 }
195 }
196
197 /* Send locators back to client */
198 err = send_locators();
199
200 return (err);
201 }
202
parse_command_buffer(uchar command,uchar * com_buf,size_t com_len,bool & done)203 int Server::parse_command_buffer(uchar command, uchar *com_buf, size_t com_len,
204 bool &done) {
205 int err = 0;
206 auto com = static_cast<Command_RPC>(command);
207 done = false;
208
209 switch (com) {
210 case COM_REINIT:
211 m_is_master = true;
212 err = init_storage(HA_CLONE_MODE_RESTART, com_buf, com_len);
213 log_error(get_thd(), false, err, "COM_REINIT: Storage Initialize");
214 break;
215
216 case COM_INIT:
217 m_is_master = true;
218
219 /* Initialize storage, send locators and validating configurations. */
220 err = init_storage(HA_CLONE_MODE_START, com_buf, com_len);
221
222 log_error(get_thd(), false, err, "COM_INIT: Storage Initialize");
223 break;
224
225 case COM_ATTACH:
226 m_is_master = false;
227 err = init_storage(HA_CLONE_MODE_ADD_TASK, com_buf, com_len);
228 log_error(get_thd(), false, err, "COM_ATTACH: Storage Attach");
229 break;
230
231 case COM_EXECUTE: {
232 if (!m_storage_initialized) {
233 /* purecov: begin deadcode */
234 err = ER_CLONE_PROTOCOL;
235 my_error(err, MYF(0), "Wrong Clone RPC: Execute request before Init");
236 log_error(get_thd(), false, err, "COM_EXECUTE : Storage ninitialized");
237 break;
238 /* purecov: end */
239 }
240
241 Server_Cbk clone_callback(this);
242
243 err = hton_clone_copy(get_thd(), get_storage_vector(), m_tasks,
244 &clone_callback);
245 log_error(get_thd(), false, err, "COM_EXECUTE: Storage Execute");
246 break;
247 }
248 case COM_ACK: {
249 m_pfs_initialized = true;
250 int err_code = 0;
251 Locator loc = {nullptr, nullptr, 0};
252
253 Server_Cbk clone_callback(this);
254
255 err = deserialize_ack_buffer(com_buf, com_len, &clone_callback, err_code,
256 &loc);
257
258 if (err == 0) {
259 auto hton = loc.m_hton;
260
261 err = hton->clone_interface.clone_ack(hton, get_thd(), loc.m_loc,
262 loc.m_loc_len, 0, err_code,
263 &clone_callback);
264 }
265 log_error(get_thd(), false, err, "COM_ACK: Storage Ack");
266 break;
267 }
268
269 case COM_EXIT:
270 if (m_storage_initialized) {
271 hton_clone_end(get_thd(), get_storage_vector(), m_tasks, 0);
272 m_storage_initialized = false;
273 }
274 done = true;
275 log_error(get_thd(), false, err, "COM_EXIT: Storage End");
276 break;
277
278 case COM_MAX:
279 /* Fall through */
280 default:
281 /* purecov: begin deadcode */
282 err = ER_CLONE_PROTOCOL;
283 my_error(err, MYF(0), "Wrong Clone RPC: Invalid request");
284 break;
285 /* purecov: end */
286 }
287 return (err);
288 }
289
deserialize_ack_buffer(const uchar * ack_buf,size_t ack_len,Ha_clone_cbk * cbk,int & err_code,Locator * loc)290 int Server::deserialize_ack_buffer(const uchar *ack_buf, size_t ack_len,
291 Ha_clone_cbk *cbk, int &err_code,
292 Locator *loc) {
293 size_t serialized_length = 0;
294
295 const uchar *desc_ptr = nullptr;
296 uint desc_len = 0;
297
298 /* Should not deserialize if less than the base length */
299 if (ack_len < (4 + loc->serlialized_length())) {
300 goto err_end;
301 }
302
303 /* Extract error code */
304 err_code = uint4korr(ack_buf);
305 ack_buf += 4;
306 ack_len -= 4;
307
308 /* Extract Locator */
309 serialized_length = loc->deserialize(get_thd(), ack_buf);
310
311 if (ack_len < serialized_length) {
312 goto err_end;
313 }
314 ack_buf += serialized_length;
315 ack_len -= serialized_length;
316
317 /* Extract descriptor */
318 if (ack_len < 4) {
319 goto err_end;
320 }
321
322 desc_len = uint4korr(ack_buf);
323 ack_buf += 4;
324 ack_len -= 4;
325
326 if (desc_len > 0) {
327 desc_ptr = ack_buf;
328 }
329
330 cbk->set_data_desc(desc_ptr, desc_len);
331
332 ack_len -= desc_len;
333
334 if (ack_len == 0) {
335 return (0);
336 }
337
338 err_end:
339 /* purecov: begin deadcode */
340 my_error(ER_CLONE_PROTOCOL, MYF(0), "Wrong Clone RPC: Init ACK length");
341 return (ER_CLONE_PROTOCOL);
342 /* purecov: end */
343 }
344
deserialize_init_buffer(const uchar * init_buf,size_t init_len)345 int Server::deserialize_init_buffer(const uchar *init_buf, size_t init_len) {
346 if (init_len < 8) {
347 goto err_end;
348 }
349
350 /* Extract protocol version */
351 m_protocol_version = uint4korr(init_buf);
352 if (m_protocol_version > CLONE_PROTOCOL_VERSION) {
353 m_protocol_version = CLONE_PROTOCOL_VERSION;
354 }
355 init_buf += 4;
356 init_len -= 4;
357
358 /* Extract DDL timeout */
359 m_client_ddl_timeout = uint4korr(init_buf);
360 init_buf += 4;
361 init_len -= 4;
362
363 /* Initialize locators */
364 while (init_len > 0) {
365 Locator loc = {nullptr, nullptr, 0};
366
367 /* Should not deserialize if less than the base length */
368 if (init_len < loc.serlialized_length()) {
369 goto err_end;
370 }
371
372 auto serialized_length = loc.deserialize(get_thd(), init_buf);
373
374 init_buf += serialized_length;
375
376 if (init_len < serialized_length) {
377 goto err_end;
378 }
379
380 m_storage_vec.push_back(loc);
381
382 init_len -= serialized_length;
383 }
384
385 if (init_len == 0) {
386 return (0);
387 }
388
389 err_end:
390 my_error(ER_CLONE_PROTOCOL, MYF(0), "Wrong Clone RPC: Init buffer length");
391
392 return (ER_CLONE_PROTOCOL);
393 }
394
send_key_value(Command_Response rcmd,String_Key & key_str,String_Key & val_str)395 int Server::send_key_value(Command_Response rcmd, String_Key &key_str,
396 String_Key &val_str) {
397 /* Add length for key. */
398 auto buf_len = key_str.length();
399 buf_len += 4;
400
401 /** Add length for value. */
402 if (rcmd == COM_RES_CONFIG) {
403 buf_len += val_str.length();
404 buf_len += 4;
405 }
406 /* Add length for response type. */
407 ++buf_len;
408
409 /* Allocate for response buffer */
410 auto err = m_res_buff.allocate(buf_len);
411 auto buf_ptr = m_res_buff.m_buffer;
412 if (err != 0) {
413 return (true);
414 }
415
416 /* Store response command */
417 *buf_ptr = static_cast<uchar>(rcmd);
418 ++buf_ptr;
419
420 /* Store key */
421 int4store(buf_ptr, key_str.length());
422 buf_ptr += 4;
423 memcpy(buf_ptr, key_str.c_str(), key_str.length());
424 buf_ptr += key_str.length();
425
426 /* Store Value */
427 if (rcmd == COM_RES_CONFIG) {
428 int4store(buf_ptr, val_str.length());
429 buf_ptr += 4;
430 memcpy(buf_ptr, val_str.c_str(), val_str.length());
431 }
432 err = mysql_service_clone_protocol->mysql_clone_send_response(
433 get_thd(), false, m_res_buff.m_buffer, buf_len);
434
435 return (err);
436 }
437
send_params()438 int Server::send_params() {
439 int err = 0;
440
441 /* Send plugins */
442 auto plugin_cbk = [](THD *, plugin_ref plugin, void *ctx) {
443 auto server = static_cast<Server *>(ctx);
444
445 if (plugin == nullptr || plugin_state(plugin) == PLUGIN_IS_FREED ||
446 plugin_state(plugin) == PLUGIN_IS_DISABLED) {
447 return (false);
448 }
449 /* Send plugin name string */
450 String_Key pstring(plugin_name(plugin)->str, plugin_name(plugin)->length);
451 auto err = server->send_key_value(COM_RES_PLUGIN, pstring, pstring);
452
453 return (err != 0);
454 };
455
456 auto result = plugin_foreach_with_mask(
457 get_thd(), plugin_cbk, MYSQL_ANY_PLUGIN, ~PLUGIN_IS_FREED, this);
458
459 if (result) {
460 err = ER_INTERNAL_ERROR;
461 my_error(err, MYF(0), "Clone error sending plugin information");
462 return (err);
463 }
464
465 /* Send character sets and collations */
466 String_Keys char_sets;
467
468 err = mysql_service_clone_protocol->mysql_clone_get_charsets(get_thd(),
469 char_sets);
470 if (err != 0) {
471 return (err);
472 }
473
474 for (auto &element : char_sets) {
475 err = send_key_value(COM_RES_COLLATION, element, element);
476 if (err != 0) {
477 return (err);
478 }
479 }
480
481 /* Send configurations */
482 err = mysql_service_clone_protocol->mysql_clone_get_configs(get_thd(),
483 s_configs);
484 if (err != 0) {
485 return (err);
486 }
487
488 for (auto &key_val : s_configs) {
489 err = send_key_value(COM_RES_CONFIG, key_val.first, key_val.second);
490 if (err != 0) {
491 break;
492 }
493 }
494 return (err);
495 }
496
send_locators()497 int Server::send_locators() {
498 /* Add length of protocol Version */
499 auto buf_len = sizeof(m_protocol_version);
500
501 /* Add length for response type */
502 ++buf_len;
503
504 /* Add SE and locator length */
505 for (auto &loc : m_storage_vec) {
506 buf_len += loc.serlialized_length();
507 }
508
509 /* Allocate for response buffer */
510 auto err = m_res_buff.allocate(buf_len);
511 auto buf_ptr = m_res_buff.m_buffer;
512
513 if (err != 0) {
514 return (err);
515 }
516
517 /* Store response command */
518 *buf_ptr = static_cast<uchar>(COM_RES_LOCS);
519 ++buf_ptr;
520
521 /* Store version */
522 int4store(buf_ptr, m_protocol_version);
523 buf_ptr += 4;
524
525 /* Store SE information and Locators */
526 for (auto &loc : m_storage_vec) {
527 buf_ptr += loc.serialize(buf_ptr);
528 }
529
530 err = mysql_service_clone_protocol->mysql_clone_send_response(
531 get_thd(), false, m_res_buff.m_buffer, buf_len);
532
533 return (err);
534 }
535
send_descriptor(handlerton * hton,bool secure,uint loc_index,const uchar * desc_buf,uint desc_len)536 int Server::send_descriptor(handlerton *hton, bool secure, uint loc_index,
537 const uchar *desc_buf, uint desc_len) {
538 /* Add data descriptor length */
539 auto buf_len = desc_len;
540
541 /* Add length for response type */
542 ++buf_len;
543
544 /* Add length for Storage Engine type */
545 ++buf_len;
546
547 /* Add length for Locator Index */
548 ++buf_len;
549
550 /* Allocate for response buffer */
551 auto err = m_res_buff.allocate(buf_len);
552
553 if (err != 0) {
554 return (err);
555 }
556
557 auto buf_ptr = m_res_buff.m_buffer;
558
559 /* Store response command */
560 *buf_ptr = static_cast<uchar>(COM_RES_DATA_DESC);
561 ++buf_ptr;
562
563 /* Store Storage Engine type */
564 *buf_ptr = static_cast<uchar>(hton->db_type);
565 ++buf_ptr;
566
567 /* Store Locator Index */
568 *buf_ptr = static_cast<uchar>(loc_index);
569 ++buf_ptr;
570
571 /* Store Descriptor */
572 memcpy(buf_ptr, desc_buf, desc_len);
573
574 err = mysql_service_clone_protocol->mysql_clone_send_response(
575 get_thd(), secure, m_res_buff.m_buffer, buf_len);
576
577 return (err);
578 }
579
send_descriptor()580 int Server_Cbk::send_descriptor() {
581 auto server = get_clone_server();
582
583 uint desc_len = 0;
584 auto desc = get_data_desc(&desc_len);
585
586 auto err = server->send_descriptor(get_hton(), is_secure(), get_loc_index(),
587 desc, desc_len);
588 return (err);
589 }
590
file_cbk(Ha_clone_file from_file,uint len)591 int Server_Cbk::file_cbk(Ha_clone_file from_file, uint len) {
592 auto server = get_clone_server();
593
594 /* Check if session is interrupted. */
595 if (thd_killed(server->get_thd())) {
596 my_error(ER_QUERY_INTERRUPTED, MYF(0));
597 return (ER_QUERY_INTERRUPTED);
598 }
599
600 /* Add one byte for descriptor type */
601 auto buf_len = len + 1;
602 auto buf_ptr = server->alloc_copy_buffer(buf_len + CLONE_OS_ALIGN);
603
604 if (buf_ptr == nullptr) {
605 return (ER_OUTOFMEMORY);
606 }
607
608 /* Store response command */
609 auto data_ptr = buf_ptr + 1;
610
611 /* Align buffer to CLONE_OS_ALIGN[4K] for O_DIRECT */
612 data_ptr = clone_os_align(data_ptr);
613 buf_ptr = data_ptr - 1;
614
615 *buf_ptr = static_cast<uchar>(COM_RES_DATA);
616
617 auto err =
618 clone_os_copy_file_to_buf(from_file, data_ptr, len, get_source_name());
619 if (err != 0) {
620 return (err);
621 }
622
623 /* Step 1: Send Descriptor */
624 err = send_descriptor();
625
626 if (err != 0) {
627 return (err);
628 }
629
630 /* Step 2: Send Data */
631 err = mysql_service_clone_protocol->mysql_clone_send_response(
632 server->get_thd(), false, buf_ptr, buf_len);
633
634 return (err);
635 }
636
buffer_cbk(uchar * from_buffer,uint buf_len)637 int Server_Cbk::buffer_cbk(uchar *from_buffer, uint buf_len) {
638 auto server = get_clone_server();
639
640 if (thd_killed(server->get_thd())) {
641 my_error(ER_QUERY_INTERRUPTED, MYF(0));
642 return (ER_QUERY_INTERRUPTED);
643 }
644
645 uchar *buf_ptr = nullptr;
646 uint total_len = 0;
647
648 if (buf_len > 0) {
649 /* Add one byte for descriptor type */
650 total_len = buf_len + 1;
651 buf_ptr = server->alloc_copy_buffer(total_len);
652
653 if (buf_ptr == nullptr) {
654 return (ER_OUTOFMEMORY);
655 }
656 }
657
658 /* Step 1: Send Descriptor */
659 auto err = send_descriptor();
660
661 if (err != 0 || buf_len == 0) {
662 return (err);
663 }
664
665 /* Step 2: Send Data */
666 *buf_ptr = static_cast<uchar>(COM_RES_DATA);
667 memcpy(buf_ptr + 1, from_buffer, static_cast<size_t>(buf_len));
668
669 err = mysql_service_clone_protocol->mysql_clone_send_response(
670 server->get_thd(), false, buf_ptr, total_len);
671
672 return (err);
673 }
674
675 /* purecov: begin deadcode */
apply_file_cbk(Ha_clone_file to_file MY_ATTRIBUTE ((unused)))676 int Server_Cbk::apply_file_cbk(Ha_clone_file to_file MY_ATTRIBUTE((unused))) {
677 DBUG_ASSERT(false);
678 my_error(ER_INTERNAL_ERROR, MYF(0), "Apply callback from Clone Server");
679 return (ER_INTERNAL_ERROR);
680 }
681
apply_buffer_cbk(uchar * & to_buffer MY_ATTRIBUTE ((unused)),uint & len MY_ATTRIBUTE ((unused)))682 int Server_Cbk::apply_buffer_cbk(uchar *&to_buffer MY_ATTRIBUTE((unused)),
683 uint &len MY_ATTRIBUTE((unused))) {
684 DBUG_ASSERT(false);
685 my_error(ER_INTERNAL_ERROR, MYF(0), "Apply callback from Clone Server");
686 return (ER_INTERNAL_ERROR);
687 }
688 /* purecov: end */
689 } // namespace myclone
690