1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 22 23 /** 24 @file clone/include/clone_client.h 25 Clone Plugin: Client Interface 26 27 */ 28 29 #ifndef CLONE_CLIENT_H 30 #define CLONE_CLIENT_H 31 32 #include "plugin/clone/include/clone.h" 33 #include "plugin/clone/include/clone_hton.h" 34 #include "plugin/clone/include/clone_status.h" 35 36 #include "mysql/psi/mysql_thread.h" 37 38 #include <array> 39 #include <atomic> 40 #include <thread> 41 #include <vector> 42 43 /* Namespace for all clone data types */ 44 namespace myclone { 45 46 using Clock = std::chrono::steady_clock; 47 using Time_Point = std::chrono::time_point<Clock>; 48 49 using Time_Msec = std::chrono::milliseconds; 50 using Time_Sec = std::chrono::seconds; 51 52 struct Thread_Info { 53 /** Default constructor */ 54 Thread_Info() = default; 55 56 /** Copy constructor needed for std::vector. */ Thread_InfoThread_Info57 Thread_Info(const Thread_Info &) { reset(); } /* purecov: inspected */ 58 59 /** Reset transferred data bytes. */ resetThread_Info60 void reset() { 61 m_last_update = Clock::now(); 62 m_last_data_bytes = 0; 63 m_last_network_bytes = 0; 64 65 m_data_bytes.store(0); 66 m_network_bytes.store(0); 67 } 68 69 /** Update transferred data bytes. 70 @param[in] data_bytes data bytes transferred 71 @param[in] net_bytes network bytes transferred */ updateThread_Info72 void update(uint64_t data_bytes, uint64_t net_bytes) { 73 m_data_bytes.fetch_add(data_bytes); 74 m_network_bytes.fetch_add(net_bytes); 75 } 76 77 /** Calculate the expected time for transfer based on target. 78 @param[in] current current number of transferred data bytes 79 @param[in] prev previous number of transferred data bytes 80 @param[in] target target data transfer rate in bytes per second 81 @return expected time in milliseconds. */ 82 uint64_t get_target_time(uint64_t current, uint64_t prev, uint64_t target); 83 84 /** Check target transfer speed and throttle if needed. The thread sleeps 85 for appropriate time if the current transfer rate is more than target. 86 @param[in] data_target target data bytes transfer per second 87 @param[in] net_target target network bytes transfer per second */ 88 void throttle(uint64_t data_target, uint64_t net_target); 89 90 /** Data transfer throttle interval */ 91 Time_Msec m_interval{100}; 92 93 /** Current thread */ 94 std::thread m_thread; 95 96 /** Last time information was updated. */ 97 Time_Point m_last_update; 98 99 /** Data bytes at last update. */ 100 uint64_t m_last_data_bytes{}; 101 102 /** Network bytes at last update. */ 103 uint64_t m_last_network_bytes{}; 104 105 /** Total amount of data transferred. */ 106 std::atomic<uint64_t> m_data_bytes; 107 108 /** Total amount of network bytes transferred. The value differs 109 from data as we use compression in network layer. */ 110 std::atomic<uint64_t> m_network_bytes; 111 }; 112 113 /** Thread information vector. */ 114 using Thread_Vector = std::vector<Thread_Info>; 115 116 /** Maximum size of history data */ 117 const size_t STAT_HISTORY_SIZE = 16; 118 119 /** Auto tuning information for threads. */ 120 struct Thread_Tune_Auto { 121 /** Auto tuning state */ 122 enum class State { INIT, ACTIVE, DONE }; 123 124 /** Reset to initial state. */ resetThread_Tune_Auto125 void reset() { 126 m_prev_number = 0; 127 m_next_number = 0; 128 m_cur_number = 0; 129 m_prev_speed = 0; 130 m_last_step_speed = 0; 131 m_prev_history_index = 0; 132 m_state = State::INIT; 133 } 134 135 /** Statistics history interval for tuning. */ 136 const uint64_t m_history_interval{5}; 137 138 /** Number of threads to increase in each step. */ 139 const uint64_t m_step{4}; 140 141 /* Previous number of threads. */ 142 uint32_t m_prev_number{}; 143 144 /** Next target number of threads. */ 145 uint32_t m_next_number{}; 146 147 /** Current number of threads. */ 148 uint32_t m_cur_number{}; 149 150 /** Average data transfer MB/sec */ 151 uint64_t m_prev_speed{}; 152 153 /** Average data transfer in last step MB/sec */ 154 uint64_t m_last_step_speed{}; 155 156 /* Saved history index on last tuning. */ 157 uint64_t m_prev_history_index{}; 158 159 /** Current tuning state. */ 160 State m_state{State::INIT}; 161 }; 162 163 /** Client data transfer statistics. */ 164 class Client_Stat { 165 public: 166 /** Update statistics data. 167 @param[in] reset reset all previous history 168 @param[in] threads all concurrent thread information 169 @param[in] num_workers current number of worker threads */ 170 void update(bool reset, const Thread_Vector &threads, uint32_t num_workers); 171 172 /** Tune total number of threads based on stat 173 @param[in] num_threads current number of active threads 174 @param[in] max_threads maximum number of threads 175 @return suggested number of threads. */ 176 uint32_t get_tuned_thread_number(uint32_t num_threads, uint32_t max_threads); 177 178 /** Get target speed, in case user has specified limits. 179 @param[out] data_speed target data transfer in bytes/sec 180 @param[out] net_speed target network transfer in bytes/sec */ get_target(uint64_t & data_speed,uint64_t & net_speed)181 void get_target(uint64_t &data_speed, uint64_t &net_speed) const { 182 data_speed = m_target_data_speed.load(); 183 net_speed = m_target_network_speed.load(); 184 } 185 186 /** Initialize target speed read by all threads. Adjusted later based on 187 maximum bandwidth threads. Zero implies unlimited bandwidth. */ init_target()188 void init_target() { 189 m_target_data_speed.store(0); 190 m_target_network_speed.store(0); 191 } 192 193 /** Save finished byte stat when thread info is released. It is 194 used during clone restart after network failure. 195 @param[in] data_bytes data bytes to save 196 @param[in] net_bytes network bytes to save */ save_at_exit(uint64_t data_bytes,uint64_t net_bytes)197 void save_at_exit(uint64_t data_bytes, uint64_t net_bytes) { 198 m_finished_data_bytes += data_bytes; 199 m_finished_network_bytes += net_bytes; 200 } 201 202 /** Finish automatic tuning for spawning threads. */ finish_tuning()203 void finish_tuning() { m_tune.m_state = Thread_Tune_Auto::State::DONE; } 204 205 /** Reset history elements. 206 @param[in] init true, if called during initialization */ 207 void reset_history(bool init); 208 209 private: 210 /** Calculate target for each task based on current performance. 211 @param[in] target_speed overall target speed in bytes per second 212 @param[in] current_speed overall current speed in bytes per second 213 @param[in] current_target current target for a task in bytes per second 214 @param[in] num_tasks number of clone tasks 215 @return target for a task in bytes per second. */ 216 uint64_t task_target(uint64_t target_speed, uint64_t current_speed, 217 uint64_t current_target, uint32_t num_tasks); 218 219 /** Set target bandwidth for data and network per thread. 220 @param[in] num_workers current number of worker threads 221 @param[in] is_reset if called during stage reset 222 @param[in] data_speed current data speed in bytes per second 223 @param[in] net_speed current network speed in bytes per second */ 224 void set_target_bandwidth(uint32_t num_workers, bool is_reset, 225 uint64_t data_speed, uint64_t net_speed); 226 227 /** @return true if bandwidth limit is already reached. */ 228 bool is_bandwidth_saturated(); 229 230 /** @return true if tuning has improved performance. 231 @param[in] num_threads current number of threads */ 232 bool tune_has_improved(uint32_t num_threads); 233 234 /* Set next target number of threads 235 @param[in] num_threads current number of threads 236 @param[in] max_threads maximum number of threads */ 237 void tune_set_target(uint32_t num_threads, uint32_t max_threads); 238 239 private: 240 /** Statistics update interval - 1 sec*/ 241 const Time_Msec m_interval{1000}; 242 243 /** Minimum data transfer rate per task - 1M */ 244 const uint64_t m_minimum_speed = 1048576; 245 246 /* If stat elements are initialized. */ 247 bool m_initialized{false}; 248 249 /** Starting point for clone data transfer. */ 250 Time_Point m_start_time; 251 252 /** Last evaluation time */ 253 Time_Point m_eval_time; 254 255 /** Data transferred at last evaluation time. */ 256 uint64_t m_eval_data_bytes{}; 257 258 /** All data bytes transferred by threads already finished. */ 259 uint64_t m_finished_data_bytes{}; 260 261 /** Network bytes transferred at last evaluation time. */ 262 uint64_t m_eval_network_bytes{}; 263 264 /** All data bytes transferred by threads already finished. */ 265 uint64_t m_finished_network_bytes{}; 266 267 /** Network speed history. */ 268 std::array<uint64_t, STAT_HISTORY_SIZE> m_network_speed_history{}; 269 270 /** Data speed history. */ 271 std::array<uint64_t, STAT_HISTORY_SIZE> m_data_speed_history{}; 272 273 /** Current index for history data. */ 274 size_t m_current_history_index{}; 275 276 /** Target Network bytes to be transferred per thread per second. */ 277 std::atomic<uint64_t> m_target_network_speed; 278 279 /** Target data bytes to be transferred per thread per second. */ 280 std::atomic<uint64_t> m_target_data_speed; 281 282 /** Thread auto tuning state and information. */ 283 Thread_Tune_Auto m_tune; 284 }; 285 286 /* Shared client information for multi threaded clone */ 287 struct Client_Share { 288 /** Construct clone client share. Initialize storage handle. 289 @param[in] host remote host IP address 290 @param[in] port remote server port 291 @param[in] user remote user name 292 @param[in] passwd remote user's password 293 @param[in] dir target data directory for clone 294 @param[in] mode client SSL mode */ Client_ShareClient_Share295 Client_Share(const char *host, const uint port, const char *user, 296 const char *passwd, const char *dir, int mode) 297 : m_host(host), 298 m_port(port), 299 m_user(user), 300 m_passwd(passwd), 301 m_data_dir(dir), 302 m_ssl_mode(mode), 303 m_max_concurrency(clone_max_concurrency), 304 m_protocol_version(CLONE_PROTOCOL_VERSION) { 305 m_storage_vec.reserve(MAX_CLONE_STORAGE_ENGINE); 306 m_threads.resize(m_max_concurrency); 307 DBUG_ASSERT(m_max_concurrency > 0); 308 m_stat.init_target(); 309 } 310 311 /** Remote host name */ 312 const char *m_host; 313 314 /** Remote port */ 315 const uint32_t m_port; 316 317 /** Remote user name */ 318 const char *m_user; 319 320 /** Remote user password */ 321 const char *m_passwd; 322 323 /** Cloned database directory */ 324 const char *m_data_dir; 325 326 /** Client SSL mode */ 327 const int m_ssl_mode; 328 329 /** Maximum number of concurrent threads for current operation. */ 330 const uint32_t m_max_concurrency; 331 332 /** Negotiated protocol version */ 333 uint32_t m_protocol_version; 334 335 /** Clone storage vector */ 336 Storage_Vector m_storage_vec; 337 338 /** Thread vector for multi threaded clone. */ 339 Thread_Vector m_threads; 340 341 /** Data transfer statistics. */ 342 Client_Stat m_stat; 343 }; 344 345 /** Auxiliary connection to send ACK */ 346 struct Client_Aux { 347 /** Initialize members */ resetClient_Aux348 void reset() { 349 m_buffer = nullptr; 350 m_buf_len = 0; 351 m_cur_index = 0; 352 m_error = 0; 353 } 354 355 /** Clone remote client connection */ 356 MYSQL *m_conn; 357 358 /** ACK descriptor buffer */ 359 const uchar *m_buffer; 360 361 /** ACK descriptor length */ 362 size_t m_buf_len; 363 364 /** Current SE index */ 365 uint m_cur_index; 366 367 /** Saved error */ 368 int m_error; 369 }; 370 371 struct Remote_Parameters { 372 /** Remote character sets with collation */ 373 String_Keys m_plugins; 374 375 /** Remote character sets with collation */ 376 String_Keys m_charsets; 377 378 /** Remote configurations to validate */ 379 Key_Values m_configs; 380 }; 381 382 /** For Remote Clone, "Clone Client" is created at recipient. It receives data 383 over network from remote "Clone Server" and applies to Storage Engines. */ 384 class Client { 385 public: 386 /** Construct clone client. Initialize external handle. 387 @param[in,out] thd server thread handle 388 @param[in] share shared client information 389 @param[in] index current thread index 390 @param[in] is_master if it is master thread */ 391 Client(THD *thd, Client_Share *share, uint32_t index, bool is_master); 392 393 /** Destructor: Free the transfer buffer, if created. */ 394 ~Client(); 395 396 /** Check if it is the master client object. 397 @return true if this is the master object */ is_master()398 bool is_master() const { return (m_is_master); } 399 400 /** @return maximum concurrency for current clone operation. */ get_max_concurrency()401 uint32_t get_max_concurrency() const { 402 DBUG_ASSERT(m_share->m_max_concurrency > 0); 403 return (m_share->m_max_concurrency); 404 } 405 406 /** @return current thread information. */ get_thread_info()407 Thread_Info &get_thread_info() { 408 return (m_share->m_threads[m_thread_index]); 409 } 410 411 /** Check if network error 412 @param[in] err error code 413 @param[in] protocol_error include protocol error 414 @return true if network error */ 415 static bool is_network_error(int err, bool protocol_error); 416 417 /** Update statistics and tune threads 418 @param[in] is_reset reset statistics 419 @return tuned number of worker threads. */ 420 uint32_t update_stat(bool is_reset); 421 422 /** Check transfer speed and throttle. */ 423 void check_and_throttle(); 424 425 /** Get auxiliary connection information 426 @return auxiliary connection data */ get_aux()427 Client_Aux *get_aux() { return (&m_conn_aux); } 428 429 /** Get Shared area for client tasks 430 @return shared client data */ get_share()431 Client_Share *get_share() { return (m_share); } 432 433 /** Get storage handle vector for data transfer. 434 @return storage handle vector */ get_storage_vector()435 Storage_Vector &get_storage_vector() { return (m_share->m_storage_vec); } 436 437 /** Get tasks for different SE 438 @return task vector */ get_task_vector()439 Task_Vector &get_task_vector() { return (m_tasks); } 440 441 /** Get external handle for data transfer. This is file 442 or buffer for local clone and network socket to remote server 443 for remote clone. 444 @param[out] conn connection handle to remote server 445 @return external handle */ get_data_link(MYSQL * & conn)446 Data_Link *get_data_link(MYSQL *&conn) { 447 conn = m_conn; 448 return (&m_ext_link); 449 } 450 451 /** Get server thread handle 452 @return server thread */ get_thd()453 THD *get_thd() { return (m_server_thd); } 454 455 /** Get target clone data directory 456 @return data directory */ get_data_dir()457 const char *get_data_dir() const { return (m_share->m_data_dir); } 458 459 /** Get clone locator for a storage engine at specified index. 460 @param[in] index locator index 461 @param[out] loc_len locator length in bytes 462 @return storage locator */ get_locator(uint index,uint & loc_len)463 const uchar *get_locator(uint index, uint &loc_len) const { 464 DBUG_ASSERT(index < m_share->m_storage_vec.size()); 465 466 loc_len = m_share->m_storage_vec[index].m_loc_len; 467 return (m_share->m_storage_vec[index].m_loc); 468 } 469 470 /** Get aligned intermediate buffer for transferring data. Allocate, 471 when called for first time. 472 @param[in] len length of allocated buffer 473 @return allocated buffer pointer */ 474 uchar *get_aligned_buffer(uint32_t len); 475 476 /** Limit total memory used for clone transfer buffer. 477 @param[in] buffer_size configured buffer size 478 @return actual buffer size to allocate. */ 479 uint32_t limit_buffer(uint32_t buffer_size); 480 481 /** Limit spawning initial number of workers if data or network 482 bandwidth is small. 483 @param[in] num_workers planned number of workers to spawn 484 @return actual number of workers to be spawned. */ 485 uint32_t limit_workers(uint32_t num_workers); 486 487 /* Spawn worker threads. 488 @param[in] num_workers number of worker threads 489 @param[in] func worker function */ 490 template <typename F> spawn_workers(uint32_t num_workers,F func)491 void spawn_workers(uint32_t num_workers, F func) { 492 /* Currently we don't reduce the number of threads. */ 493 if (!is_master() || num_workers <= m_num_active_workers) { 494 return; 495 } 496 auto &thread_vector = m_share->m_threads; 497 498 /* Maximum number of workers are fixed. */ 499 if (num_workers + 1 > get_max_concurrency()) { 500 DBUG_ASSERT(false); /* purecov: inspected */ 501 return; 502 } 503 504 while (m_num_active_workers < num_workers) { 505 ++m_num_active_workers; 506 auto &info = thread_vector[m_num_active_workers]; 507 info.reset(); 508 try { 509 info.m_thread = std::thread(func, m_share, m_num_active_workers); 510 } catch (...) { 511 /* purecov: begin deadcode */ 512 auto &stat = m_share->m_stat; 513 stat.finish_tuning(); 514 515 char info_mesg[64]; 516 snprintf(info_mesg, sizeof(info_mesg), "Failed to spawn worker: %d", 517 m_num_active_workers); 518 LogPluginErr(INFORMATION_LEVEL, ER_CLONE_CLIENT_TRACE, info_mesg); 519 520 --m_num_active_workers; 521 break; 522 /* purecov: end */ 523 } 524 } 525 } 526 527 /** Wait for worker threads to finish. */ 528 void wait_for_workers(); 529 530 /** Get data from remote server and create cloned database by 531 applying to storage engines. 532 @return error code */ 533 int clone(); 534 535 /** Execute RPC clone command on remote server 536 @param[in] com RPC command ID 537 @param[in] use_aux use auxiliary connection 538 @return error if not successful */ 539 int remote_command(Command_RPC com, bool use_aux); 540 541 /** Begin state in PFS table. 542 @return error code. */ 543 int pfs_begin_state(); 544 545 /** Change stage in PFS progress table. */ 546 void pfs_change_stage(uint64_t estimate); 547 548 /** End state in PFS table. 549 @param[in] err_num error number 550 @param[in] err_mesg error message */ 551 void pfs_end_state(uint32_t err_num, const char *err_mesg); 552 553 /** Copy PFS status data safely. 554 @param[out] pfs_data status data. */ 555 static void copy_pfs_data(Status_pfs::Data &pfs_data); 556 557 /** Copy PFS progress data safely. 558 @param[out] pfs_data progress data. */ 559 static void copy_pfs_data(Progress_pfs::Data &pfs_data); 560 561 /** Update data and network consumed. 562 @param[in] data data bytes transferred 563 @param[in] network network bytes transferred 564 @param[in] data_speed data transfer speed in bytes/sec 565 @param[in] net_speed network transfer speed in bytes/sec 566 @param[in] num_workers number of worker threads */ 567 static void update_pfs_data(uint64_t data, uint64_t network, 568 uint32_t data_speed, uint32_t net_speed, 569 uint32_t num_workers); 570 571 /** Init PFS mutex for table. */ 572 static void init_pfs(); 573 574 /** Destroy PFS mutex for table. */ 575 static void uninit_pfs(); 576 577 private: 578 /** Connect to remote server 579 @param[in] is_restart restarting clone after network failure 580 @param[in] use_aux establish auxiliary connection 581 @return error code */ 582 int connect_remote(bool is_restart, bool use_aux); 583 584 /** Initialize storage engine and command buffer. 585 @param[in] mode initialization mode 586 @param[out] cmd_len serialized command length 587 @return error if initialization fails. */ 588 int init_storage(enum Ha_clone_mode mode, size_t &cmd_len); 589 590 /** Prepare command buffer for remote RPC 591 @param[in] com RPC command ID 592 @param[out] buf_len command buffer length 593 @return error if allocation fails */ 594 int prepare_command_buffer(Command_RPC com, size_t &buf_len); 595 596 /** Serialize the buffer for COM_INIT 597 @param[out] buf_len length of serialized buffer */ 598 int serialize_init_cmd(size_t &buf_len); 599 600 /** Serialize the buffer for COM_ACK 601 @param[out] buf_len length of serialized buffer */ 602 int serialize_ack_cmd(size_t &buf_len); 603 604 /** Receive and handle response from remote server 605 @param[in] com RPC command ID 606 @param[in] use_aux use auxiliary connection 607 @return error code */ 608 int receive_response(Command_RPC com, bool use_aux); 609 610 /** Handle response packet from remote server 611 @param[in] packet data packet 612 @param[in] length length of the packet 613 @param[in] in_err skip if error has occurred 614 @param[in] skip_loc skip applying locator 615 @param[out] is_last true if last packet 616 @return error code */ 617 int handle_response(const uchar *packet, size_t length, int in_err, 618 bool skip_loc, bool &is_last); 619 620 /** Handle error and check if needs to exit 621 @param[in] current_err error number 622 @param[in,out] first_error first error that has occurred 623 @param[in,out] first_error_time time for first error in 624 milliseconds 625 @return true if the caller needs to exit */ 626 bool handle_error(int current_err, int &first_error, 627 ulonglong &first_error_time); 628 629 /** Validate all remote parameters. 630 @return error code */ 631 int validate_remote_params(); 632 633 /** Extract string from network buffer. 634 @param[in,out] packet network packet 635 @param[in,out] length packet length 636 @param[out] str extracted string 637 @return error code */ extract_string(const uchar * & packet,size_t & length,String_Key & str)638 int extract_string(const uchar *&packet, size_t &length, String_Key &str) { 639 /* Check length. */ 640 if (length >= 4) { 641 auto name_length = uint4korr(packet); 642 length -= 4; 643 packet += 4; 644 645 /* Check length. */ 646 if (length >= name_length) { 647 str.clear(); 648 if (name_length > 0) { 649 auto char_str = reinterpret_cast<const char *>(packet); 650 auto str_len = static_cast<size_t>(name_length); 651 str.assign(char_str, str_len); 652 653 length -= name_length; 654 packet += name_length; 655 } 656 return (0); 657 } 658 } 659 /* purecov: begin deadcode */ 660 int err = ER_CLONE_PROTOCOL; 661 my_error(err, MYF(0), "Wrong Clone RPC response length for parameters"); 662 return (err); 663 /* purecov: end */ 664 } 665 666 /** Extract and add plugin name from network packet. 667 @param[in] packet network packet 668 @param[in] length packet length 669 @return error code */ add_plugin(const uchar * packet,size_t length)670 int add_plugin(const uchar *packet, size_t length) { 671 /* Get plugin name. */ 672 String_Key plugin_name; 673 auto err = extract_string(packet, length, plugin_name); 674 if (err == 0) { 675 m_parameters.m_plugins.push_back(plugin_name); 676 } 677 return (err); 678 } 679 680 /** Extract and add charset name from network packet. 681 @param[in] packet network packet 682 @param[in] length packet length 683 @return error code */ add_charset(const uchar * packet,size_t length)684 int add_charset(const uchar *packet, size_t length) { 685 /* Get character set collation name. */ 686 String_Key charset_name; 687 auto err = extract_string(packet, length, charset_name); 688 if (err == 0) { 689 m_parameters.m_charsets.push_back(charset_name); 690 } 691 return (err); 692 } 693 694 /** Extract and add remote configuration from network packet. 695 @param[in] packet network packet 696 @param[in] length packet length 697 @return error code */ add_config(const uchar * packet,size_t length)698 int add_config(const uchar *packet, size_t length) { 699 /* Get configuration parameter name. */ 700 String_Key config_name; 701 auto err = extract_string(packet, length, config_name); 702 if (err != 0) { 703 return (err); /* purecov: inspected */ 704 } 705 706 /* Get configuration parameter value */ 707 String_Key config_value; 708 err = extract_string(packet, length, config_value); 709 if (err == 0) { 710 auto key_val = std::make_pair(config_name, config_value); 711 m_parameters.m_configs.push_back(key_val); 712 } 713 return (err); 714 } 715 716 /** Set locators returned by remote server 717 @param[in] buffer serialized locator information 718 @param[in] length length of serialized data 719 @return error code */ 720 int set_locators(const uchar *buffer, size_t length); 721 722 /** Apply descriptor returned by remote server 723 @param[in] buffer serialized data descriptor 724 @param[in] length length of serialized data 725 @return error code */ 726 int set_descriptor(const uchar *buffer, size_t length); 727 728 /** Extract and set error mesg from remote server 729 @param[in] buffer Remote error buffer 730 @param[in] length length of error buffer 731 @return error code */ 732 int set_error(const uchar *buffer, size_t length); 733 734 /** If PFS table and mutex is initialized. */ 735 static bool s_pfs_initialized; 736 737 private: 738 /** Clone status table data. */ 739 static Status_pfs::Data s_status_data; 740 741 /** Clone progress table data. */ 742 static Progress_pfs::Data s_progress_data; 743 744 /** Clone table mutex to protect PFS table data. */ 745 static mysql_mutex_t s_table_mutex; 746 747 /** Number of concurrent clone clients. */ 748 static uint32_t s_num_clones; 749 750 private: 751 /** Server thread object */ 752 THD *m_server_thd; 753 754 /** Auxiliary client connection */ 755 Client_Aux m_conn_aux; 756 757 /** Clone remote client connection */ 758 MYSQL *m_conn; 759 NET_SERVER m_conn_server_extn; 760 761 /** Intermediate buffer for data copy when zero copy is not used. */ 762 Buffer m_copy_buff; 763 764 /** Buffer holding data for RPC command */ 765 Buffer m_cmd_buff; 766 767 /** Clone external handle. Data is transferred from 768 external handle(network) to storage handle. */ 769 Data_Link m_ext_link; 770 771 /** If it is the master thread */ 772 bool m_is_master; 773 774 /** Thread index for multi-threaded clone */ 775 uint32_t m_thread_index; 776 777 /** Number of active worker tasks. */ 778 uint32_t m_num_active_workers; 779 780 /** Task IDs for different SE */ 781 Task_Vector m_tasks; 782 783 /** Storage is initialized */ 784 bool m_storage_initialized; 785 786 /** Storage is active with locators set */ 787 bool m_storage_active; 788 789 /** If backup lock is acquired */ 790 bool m_acquired_backup_lock; 791 792 /** Remote parameters for validation. */ 793 Remote_Parameters m_parameters; 794 795 /** Shared client information */ 796 Client_Share *m_share; 797 }; 798 799 /** Clone client interface to handle callback from Storage Engine */ 800 class Client_Cbk : public Ha_clone_cbk { 801 public: 802 /** Construct Callback. Set clone client object. 803 @param[in] clone clone client object */ Client_Cbk(Client * clone)804 Client_Cbk(Client *clone) : m_clone_client(clone) {} 805 806 /** Get clone object 807 @return clone client object */ get_clone_client()808 Client *get_clone_client() const { return (m_clone_client); } 809 810 /** Clone client file callback: Not used for client. 811 @param[in] from_file source file descriptor 812 @param[in] len data length 813 @return error code */ 814 int file_cbk(Ha_clone_file from_file, uint len) override; 815 816 /** Clone client buffer callback: Not used for client. 817 @param[in] from_buffer source buffer 818 @param[in] buf_len data length 819 @return error code */ 820 int buffer_cbk(uchar *from_buffer, uint buf_len) override; 821 822 /** Clone client apply callback: Copy data to storage 823 engine file from network. 824 @param[in] to_file destination file descriptor 825 @return error code */ 826 int apply_file_cbk(Ha_clone_file to_file) override; 827 828 /** Clone client apply callback: Get data in buffer 829 @param[out] to_buffer data buffer 830 @param[out] len data length 831 @return error code */ 832 int apply_buffer_cbk(uchar *&to_buffer, uint &len) override; 833 834 private: 835 /** Apply data to local file or buffer. 836 @param[in,out] to_file destination file 837 @param[in] apply_file copy data to file 838 @param[out] to_buffer data buffer 839 @param[out] to_len data length 840 @return error code */ 841 int apply_cbk(Ha_clone_file to_file, bool apply_file, uchar *&to_buffer, 842 uint &to_len); 843 844 private: 845 /** Clone client object */ 846 Client *m_clone_client; 847 }; 848 849 } // namespace myclone 850 851 #endif /* CLONE_CLIENT_H */ 852