1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /**
24 @file clone/include/clone_client.h
25 Clone Plugin: Client Interface
26 
27 */
28 
29 #ifndef CLONE_CLIENT_H
30 #define CLONE_CLIENT_H
31 
32 #include "plugin/clone/include/clone.h"
33 #include "plugin/clone/include/clone_hton.h"
34 #include "plugin/clone/include/clone_status.h"
35 
36 #include "mysql/psi/mysql_thread.h"
37 
38 #include <array>
39 #include <atomic>
40 #include <thread>
41 #include <vector>
42 
43 /* Namespace for all clone data types */
44 namespace myclone {
45 
46 using Clock = std::chrono::steady_clock;
47 using Time_Point = std::chrono::time_point<Clock>;
48 
49 using Time_Msec = std::chrono::milliseconds;
50 using Time_Sec = std::chrono::seconds;
51 
52 struct Thread_Info {
53   /** Default constructor */
54   Thread_Info() = default;
55 
56   /** Copy constructor needed for std::vector. */
Thread_InfoThread_Info57   Thread_Info(const Thread_Info &) { reset(); } /* purecov: inspected */
58 
59   /** Reset transferred data bytes. */
resetThread_Info60   void reset() {
61     m_last_update = Clock::now();
62     m_last_data_bytes = 0;
63     m_last_network_bytes = 0;
64 
65     m_data_bytes.store(0);
66     m_network_bytes.store(0);
67   }
68 
69   /** Update transferred data bytes.
70   @param[in]	data_bytes	data bytes transferred
71   @param[in]	net_bytes	network bytes transferred */
updateThread_Info72   void update(uint64_t data_bytes, uint64_t net_bytes) {
73     m_data_bytes.fetch_add(data_bytes);
74     m_network_bytes.fetch_add(net_bytes);
75   }
76 
77   /** Calculate the expected time for transfer based on target.
78   @param[in]	current	current number of transferred data bytes
79   @param[in]	prev	previous number of transferred data bytes
80   @param[in]	target	target data transfer rate in bytes per second
81   @return expected time in milliseconds. */
82   uint64_t get_target_time(uint64_t current, uint64_t prev, uint64_t target);
83 
84   /** Check target transfer speed and throttle if needed. The thread sleeps
85   for appropriate time if the current transfer rate is more than target.
86   @param[in]	data_target	target data bytes transfer per second
87   @param[in]	net_target	target network bytes transfer per second */
88   void throttle(uint64_t data_target, uint64_t net_target);
89 
90   /** Data transfer throttle interval */
91   Time_Msec m_interval{100};
92 
93   /** Current thread */
94   std::thread m_thread;
95 
96   /** Last time information was updated. */
97   Time_Point m_last_update;
98 
99   /** Data bytes at last update. */
100   uint64_t m_last_data_bytes{};
101 
102   /** Network bytes at last update. */
103   uint64_t m_last_network_bytes{};
104 
105   /** Total amount of data transferred. */
106   std::atomic<uint64_t> m_data_bytes;
107 
108   /** Total amount of network bytes transferred. The value differs
109   from data as we use compression in network layer. */
110   std::atomic<uint64_t> m_network_bytes;
111 };
112 
113 /** Thread information vector. */
114 using Thread_Vector = std::vector<Thread_Info>;
115 
116 /** Maximum size of history data */
117 const size_t STAT_HISTORY_SIZE = 16;
118 
119 /** Auto tuning information for threads. */
120 struct Thread_Tune_Auto {
121   /** Auto tuning state */
122   enum class State { INIT, ACTIVE, DONE };
123 
124   /** Reset to initial state. */
resetThread_Tune_Auto125   void reset() {
126     m_prev_number = 0;
127     m_next_number = 0;
128     m_cur_number = 0;
129     m_prev_speed = 0;
130     m_last_step_speed = 0;
131     m_prev_history_index = 0;
132     m_state = State::INIT;
133   }
134 
135   /** Statistics history interval for tuning. */
136   const uint64_t m_history_interval{5};
137 
138   /** Number of threads to increase in each step. */
139   const uint64_t m_step{4};
140 
141   /* Previous number of threads. */
142   uint32_t m_prev_number{};
143 
144   /** Next target number of threads. */
145   uint32_t m_next_number{};
146 
147   /** Current number of threads. */
148   uint32_t m_cur_number{};
149 
150   /** Average data transfer MB/sec */
151   uint64_t m_prev_speed{};
152 
153   /** Average data transfer in last step MB/sec */
154   uint64_t m_last_step_speed{};
155 
156   /* Saved history index on last tuning. */
157   uint64_t m_prev_history_index{};
158 
159   /** Current tuning state. */
160   State m_state{State::INIT};
161 };
162 
163 /** Client data transfer statistics. */
164 class Client_Stat {
165  public:
166   /** Update statistics data.
167   @param[in]	reset		reset all previous history
168   @param[in]	threads		all concurrent thread information
169   @param[in]	num_workers	current number of worker threads */
170   void update(bool reset, const Thread_Vector &threads, uint32_t num_workers);
171 
172   /** Tune total number of threads based on stat
173   @param[in]	num_threads	current number of active threads
174   @param[in]	max_threads	maximum number of threads
175   @return suggested number of threads. */
176   uint32_t get_tuned_thread_number(uint32_t num_threads, uint32_t max_threads);
177 
178   /** Get target speed, in case user has specified limits.
179   @param[out]	data_speed	target data transfer in bytes/sec
180   @param[out]	net_speed	target network transfer in bytes/sec */
get_target(uint64_t & data_speed,uint64_t & net_speed)181   void get_target(uint64_t &data_speed, uint64_t &net_speed) const {
182     data_speed = m_target_data_speed.load();
183     net_speed = m_target_network_speed.load();
184   }
185 
186   /** Initialize target speed read by all threads. Adjusted later based on
187   maximum bandwidth threads. Zero implies unlimited bandwidth. */
init_target()188   void init_target() {
189     m_target_data_speed.store(0);
190     m_target_network_speed.store(0);
191   }
192 
193   /** Save finished byte stat when thread info is released. It is
194   used during clone restart after network failure.
195   @param[in]	data_bytes	data bytes to save
196   @param[in]	net_bytes	network bytes to save */
save_at_exit(uint64_t data_bytes,uint64_t net_bytes)197   void save_at_exit(uint64_t data_bytes, uint64_t net_bytes) {
198     m_finished_data_bytes += data_bytes;
199     m_finished_network_bytes += net_bytes;
200   }
201 
202   /** Finish automatic tuning for spawning threads. */
finish_tuning()203   void finish_tuning() { m_tune.m_state = Thread_Tune_Auto::State::DONE; }
204 
205   /** Reset history elements.
206   @param[in]	init	true, if called during initialization */
207   void reset_history(bool init);
208 
209  private:
210   /** Calculate target for each task based on current performance.
211   @param[in]	target_speed	overall target speed in bytes per second
212   @param[in]	current_speed	overall current speed in bytes per second
213   @param[in]	current_target	current target for a task in bytes per second
214   @param[in]	num_tasks	number of clone tasks
215   @return target for a task in bytes per second. */
216   uint64_t task_target(uint64_t target_speed, uint64_t current_speed,
217                        uint64_t current_target, uint32_t num_tasks);
218 
219   /** Set target bandwidth for data and network per thread.
220   @param[in]	num_workers	current number of worker threads
221   @param[in]	is_reset	if called during stage reset
222   @param[in]	data_speed	current data speed in bytes per second
223   @param[in]	net_speed	current network speed in bytes per second */
224   void set_target_bandwidth(uint32_t num_workers, bool is_reset,
225                             uint64_t data_speed, uint64_t net_speed);
226 
227   /** @return true if bandwidth limit is already reached. */
228   bool is_bandwidth_saturated();
229 
230   /** @return true if tuning has improved performance.
231   @param[in]	num_threads	current number of threads */
232   bool tune_has_improved(uint32_t num_threads);
233 
234   /* Set next target number of threads
235   @param[in]	num_threads	current number of threads
236   @param[in]	max_threads	maximum number of threads */
237   void tune_set_target(uint32_t num_threads, uint32_t max_threads);
238 
239  private:
240   /** Statistics update interval - 1 sec*/
241   const Time_Msec m_interval{1000};
242 
243   /** Minimum data transfer rate per task - 1M */
244   const uint64_t m_minimum_speed = 1048576;
245 
246   /* If stat elements are initialized. */
247   bool m_initialized{false};
248 
249   /** Starting point for clone data transfer. */
250   Time_Point m_start_time;
251 
252   /** Last evaluation time */
253   Time_Point m_eval_time;
254 
255   /** Data transferred at last evaluation time. */
256   uint64_t m_eval_data_bytes{};
257 
258   /** All data bytes transferred by threads already finished. */
259   uint64_t m_finished_data_bytes{};
260 
261   /** Network bytes transferred at last evaluation time. */
262   uint64_t m_eval_network_bytes{};
263 
264   /** All data bytes transferred by threads already finished. */
265   uint64_t m_finished_network_bytes{};
266 
267   /** Network speed history. */
268   std::array<uint64_t, STAT_HISTORY_SIZE> m_network_speed_history{};
269 
270   /** Data speed history. */
271   std::array<uint64_t, STAT_HISTORY_SIZE> m_data_speed_history{};
272 
273   /** Current index for history data. */
274   size_t m_current_history_index{};
275 
276   /** Target Network bytes to be transferred per thread per second. */
277   std::atomic<uint64_t> m_target_network_speed;
278 
279   /** Target data bytes to be transferred per thread per second. */
280   std::atomic<uint64_t> m_target_data_speed;
281 
282   /** Thread auto tuning state and information. */
283   Thread_Tune_Auto m_tune;
284 };
285 
286 /* Shared client information for multi threaded clone */
287 struct Client_Share {
288   /** Construct clone client share. Initialize storage handle.
289   @param[in]	host	remote host IP address
290   @param[in]	port	remote server port
291   @param[in]	user	remote user name
292   @param[in]	passwd	remote user's password
293   @param[in]	dir	target data directory for clone
294   @param[in]	mode	client SSL mode */
Client_ShareClient_Share295   Client_Share(const char *host, const uint port, const char *user,
296                const char *passwd, const char *dir, int mode)
297       : m_host(host),
298         m_port(port),
299         m_user(user),
300         m_passwd(passwd),
301         m_data_dir(dir),
302         m_ssl_mode(mode),
303         m_max_concurrency(clone_max_concurrency),
304         m_protocol_version(CLONE_PROTOCOL_VERSION) {
305     m_storage_vec.reserve(MAX_CLONE_STORAGE_ENGINE);
306     m_threads.resize(m_max_concurrency);
307     DBUG_ASSERT(m_max_concurrency > 0);
308     m_stat.init_target();
309   }
310 
311   /** Remote host name */
312   const char *m_host;
313 
314   /** Remote port */
315   const uint32_t m_port;
316 
317   /** Remote user name */
318   const char *m_user;
319 
320   /** Remote user password */
321   const char *m_passwd;
322 
323   /** Cloned database directory */
324   const char *m_data_dir;
325 
326   /** Client SSL mode */
327   const int m_ssl_mode;
328 
329   /** Maximum number of concurrent threads for current operation. */
330   const uint32_t m_max_concurrency;
331 
332   /** Negotiated protocol version */
333   uint32_t m_protocol_version;
334 
335   /** Clone storage vector */
336   Storage_Vector m_storage_vec;
337 
338   /** Thread vector for multi threaded clone. */
339   Thread_Vector m_threads;
340 
341   /** Data transfer statistics. */
342   Client_Stat m_stat;
343 };
344 
345 /** Auxiliary connection to send ACK */
346 struct Client_Aux {
347   /** Initialize members */
resetClient_Aux348   void reset() {
349     m_buffer = nullptr;
350     m_buf_len = 0;
351     m_cur_index = 0;
352     m_error = 0;
353   }
354 
355   /** Clone remote client connection */
356   MYSQL *m_conn;
357 
358   /** ACK descriptor buffer */
359   const uchar *m_buffer;
360 
361   /** ACK descriptor length */
362   size_t m_buf_len;
363 
364   /** Current SE index */
365   uint m_cur_index;
366 
367   /** Saved error */
368   int m_error;
369 };
370 
371 struct Remote_Parameters {
372   /** Remote character sets with collation */
373   String_Keys m_plugins;
374 
375   /** Remote character sets with collation */
376   String_Keys m_charsets;
377 
378   /** Remote configurations to validate */
379   Key_Values m_configs;
380 };
381 
382 /** For Remote Clone, "Clone Client" is created at recipient. It receives data
383 over network from remote "Clone Server" and applies to Storage Engines. */
384 class Client {
385  public:
386   /** Construct clone client. Initialize external handle.
387   @param[in,out]	thd		server thread handle
388   @param[in]		share		shared client information
389   @param[in]		index		current thread index
390   @param[in]		is_master	if it is master thread */
391   Client(THD *thd, Client_Share *share, uint32_t index, bool is_master);
392 
393   /** Destructor: Free the transfer buffer, if created. */
394   ~Client();
395 
396   /** Check if it is the master client object.
397   @return true if this is the master object */
is_master()398   bool is_master() const { return (m_is_master); }
399 
400   /** @return maximum concurrency for current clone operation. */
get_max_concurrency()401   uint32_t get_max_concurrency() const {
402     DBUG_ASSERT(m_share->m_max_concurrency > 0);
403     return (m_share->m_max_concurrency);
404   }
405 
406   /** @return current thread information. */
get_thread_info()407   Thread_Info &get_thread_info() {
408     return (m_share->m_threads[m_thread_index]);
409   }
410 
411   /** Check if network error
412   @param[in]	err		error code
413   @param[in]	protocol_error	include protocol error
414   @return true if network error */
415   static bool is_network_error(int err, bool protocol_error);
416 
417   /** Update statistics and tune threads
418   @param[in]	is_reset	reset statistics
419   @return tuned number of worker threads. */
420   uint32_t update_stat(bool is_reset);
421 
422   /** Check transfer speed and throttle. */
423   void check_and_throttle();
424 
425   /** Get auxiliary connection information
426   @return auxiliary connection data */
get_aux()427   Client_Aux *get_aux() { return (&m_conn_aux); }
428 
429   /** Get Shared area for client tasks
430   @return shared client data */
get_share()431   Client_Share *get_share() { return (m_share); }
432 
433   /** Get storage handle vector for data transfer.
434   @return storage handle vector */
get_storage_vector()435   Storage_Vector &get_storage_vector() { return (m_share->m_storage_vec); }
436 
437   /** Get tasks for different SE
438   @return task vector */
get_task_vector()439   Task_Vector &get_task_vector() { return (m_tasks); }
440 
441   /** Get external handle for data transfer. This is file
442   or buffer for local clone and network socket to remote server
443   for remote clone.
444   @param[out]	conn	connection handle to remote server
445   @return external handle */
get_data_link(MYSQL * & conn)446   Data_Link *get_data_link(MYSQL *&conn) {
447     conn = m_conn;
448     return (&m_ext_link);
449   }
450 
451   /** Get server thread handle
452   @return server thread */
get_thd()453   THD *get_thd() { return (m_server_thd); }
454 
455   /** Get target clone data directory
456   @return data directory */
get_data_dir()457   const char *get_data_dir() const { return (m_share->m_data_dir); }
458 
459   /** Get clone locator for a storage engine at specified index.
460   @param[in]	index	locator index
461   @param[out]	loc_len	locator length in bytes
462   @return storage locator */
get_locator(uint index,uint & loc_len)463   const uchar *get_locator(uint index, uint &loc_len) const {
464     DBUG_ASSERT(index < m_share->m_storage_vec.size());
465 
466     loc_len = m_share->m_storage_vec[index].m_loc_len;
467     return (m_share->m_storage_vec[index].m_loc);
468   }
469 
470   /** Get aligned intermediate buffer for transferring data. Allocate,
471   when called for first time.
472   @param[in]	len	length of allocated buffer
473   @return allocated buffer pointer */
474   uchar *get_aligned_buffer(uint32_t len);
475 
476   /** Limit total memory used for clone transfer buffer.
477   @param[in]	buffer_size	configured buffer size
478   @return actual buffer size to allocate. */
479   uint32_t limit_buffer(uint32_t buffer_size);
480 
481   /** Limit spawning initial number of workers if data or network
482   bandwidth is small.
483   @param[in]	num_workers	planned number of workers to spawn
484   @return actual number of workers to be spawned. */
485   uint32_t limit_workers(uint32_t num_workers);
486 
487   /* Spawn worker threads.
488   @param[in]	num_workers	number of worker threads
489   @param[in]	func		worker function */
490   template <typename F>
spawn_workers(uint32_t num_workers,F func)491   void spawn_workers(uint32_t num_workers, F func) {
492     /* Currently we don't reduce the number of threads. */
493     if (!is_master() || num_workers <= m_num_active_workers) {
494       return;
495     }
496     auto &thread_vector = m_share->m_threads;
497 
498     /* Maximum number of workers are fixed. */
499     if (num_workers + 1 > get_max_concurrency()) {
500       DBUG_ASSERT(false); /* purecov: inspected */
501       return;
502     }
503 
504     while (m_num_active_workers < num_workers) {
505       ++m_num_active_workers;
506       auto &info = thread_vector[m_num_active_workers];
507       info.reset();
508       try {
509         info.m_thread = std::thread(func, m_share, m_num_active_workers);
510       } catch (...) {
511         /* purecov: begin deadcode */
512         auto &stat = m_share->m_stat;
513         stat.finish_tuning();
514 
515         char info_mesg[64];
516         snprintf(info_mesg, sizeof(info_mesg), "Failed to spawn worker: %d",
517                  m_num_active_workers);
518         LogPluginErr(INFORMATION_LEVEL, ER_CLONE_CLIENT_TRACE, info_mesg);
519 
520         --m_num_active_workers;
521         break;
522         /* purecov: end */
523       }
524     }
525   }
526 
527   /** Wait for worker threads to finish. */
528   void wait_for_workers();
529 
530   /** Get data from remote server and create cloned database by
531   applying to storage engines.
532   @return error code */
533   int clone();
534 
535   /** Execute RPC clone command on remote server
536   @param[in]	com	RPC command ID
537   @param[in]	use_aux	use auxiliary connection
538   @return error if not successful */
539   int remote_command(Command_RPC com, bool use_aux);
540 
541   /** Begin state in PFS table.
542   @return error code. */
543   int pfs_begin_state();
544 
545   /** Change stage in PFS progress table. */
546   void pfs_change_stage(uint64_t estimate);
547 
548   /** End state in PFS table.
549   @param[in]	err_num		error number
550   @param[in]	err_mesg	error message */
551   void pfs_end_state(uint32_t err_num, const char *err_mesg);
552 
553   /** Copy PFS status data safely.
554   @param[out]	pfs_data	status data. */
555   static void copy_pfs_data(Status_pfs::Data &pfs_data);
556 
557   /** Copy PFS progress data safely.
558   @param[out]	pfs_data	progress data. */
559   static void copy_pfs_data(Progress_pfs::Data &pfs_data);
560 
561   /** Update data and network consumed.
562   @param[in]	data		data bytes transferred
563   @param[in]	network		network bytes transferred
564   @param[in]	data_speed	data transfer speed in bytes/sec
565   @param[in]	net_speed	network transfer speed in bytes/sec
566   @param[in]	num_workers	number of worker threads */
567   static void update_pfs_data(uint64_t data, uint64_t network,
568                               uint32_t data_speed, uint32_t net_speed,
569                               uint32_t num_workers);
570 
571   /** Init PFS mutex for table. */
572   static void init_pfs();
573 
574   /** Destroy PFS mutex for table. */
575   static void uninit_pfs();
576 
577  private:
578   /** Connect to remote server
579   @param[in]	is_restart	restarting clone after network failure
580   @param[in]	use_aux		establish auxiliary connection
581   @return	error code */
582   int connect_remote(bool is_restart, bool use_aux);
583 
584   /** Initialize storage engine and command buffer.
585   @param[in]	mode	initialization mode
586   @param[out]	cmd_len	serialized command length
587   @return error if initialization fails. */
588   int init_storage(enum Ha_clone_mode mode, size_t &cmd_len);
589 
590   /** Prepare command buffer for remote RPC
591   @param[in]	com	RPC command ID
592   @param[out]	buf_len	command buffer length
593   @return error if allocation fails */
594   int prepare_command_buffer(Command_RPC com, size_t &buf_len);
595 
596   /** Serialize the buffer for COM_INIT
597   @param[out]	buf_len	length of serialized buffer */
598   int serialize_init_cmd(size_t &buf_len);
599 
600   /** Serialize the buffer for COM_ACK
601   @param[out]	buf_len	length of serialized buffer */
602   int serialize_ack_cmd(size_t &buf_len);
603 
604   /** Receive and handle response from remote server
605   @param[in]	com		RPC command ID
606   @param[in]	use_aux		use auxiliary connection
607   @return error code */
608   int receive_response(Command_RPC com, bool use_aux);
609 
610   /** Handle response packet from remote server
611   @param[in]	packet		data packet
612   @param[in]	length		length of the packet
613   @param[in]	in_err		skip if error has occurred
614   @param[in]	skip_loc	skip applying locator
615   @param[out]	is_last		true if last packet
616   @return error code */
617   int handle_response(const uchar *packet, size_t length, int in_err,
618                       bool skip_loc, bool &is_last);
619 
620   /** Handle error and check if needs to exit
621   @param[in]	current_err			error number
622   @param[in,out]	first_error		first error that has occurred
623   @param[in,out]	first_error_time	time for first error in
624   milliseconds
625   @return true if the caller needs to exit */
626   bool handle_error(int current_err, int &first_error,
627                     ulonglong &first_error_time);
628 
629   /** Validate all remote parameters.
630   @return error code */
631   int validate_remote_params();
632 
633   /** Extract string from network buffer.
634   @param[in,out]	packet	network packet
635   @param[in,out]	length	packet length
636   @param[out]		str	extracted string
637   @return error code */
extract_string(const uchar * & packet,size_t & length,String_Key & str)638   int extract_string(const uchar *&packet, size_t &length, String_Key &str) {
639     /* Check length. */
640     if (length >= 4) {
641       auto name_length = uint4korr(packet);
642       length -= 4;
643       packet += 4;
644 
645       /* Check length. */
646       if (length >= name_length) {
647         str.clear();
648         if (name_length > 0) {
649           auto char_str = reinterpret_cast<const char *>(packet);
650           auto str_len = static_cast<size_t>(name_length);
651           str.assign(char_str, str_len);
652 
653           length -= name_length;
654           packet += name_length;
655         }
656         return (0);
657       }
658     }
659     /* purecov: begin deadcode */
660     int err = ER_CLONE_PROTOCOL;
661     my_error(err, MYF(0), "Wrong Clone RPC response length for parameters");
662     return (err);
663     /* purecov: end */
664   }
665 
666   /** Extract and add plugin name from network packet.
667   @param[in]	packet	network packet
668   @param[in]	length	packet length
669   @return error code */
add_plugin(const uchar * packet,size_t length)670   int add_plugin(const uchar *packet, size_t length) {
671     /* Get plugin name. */
672     String_Key plugin_name;
673     auto err = extract_string(packet, length, plugin_name);
674     if (err == 0) {
675       m_parameters.m_plugins.push_back(plugin_name);
676     }
677     return (err);
678   }
679 
680   /** Extract and add charset name from network packet.
681   @param[in]	packet	network packet
682   @param[in]	length	packet length
683   @return error code */
add_charset(const uchar * packet,size_t length)684   int add_charset(const uchar *packet, size_t length) {
685     /* Get character set collation name. */
686     String_Key charset_name;
687     auto err = extract_string(packet, length, charset_name);
688     if (err == 0) {
689       m_parameters.m_charsets.push_back(charset_name);
690     }
691     return (err);
692   }
693 
694   /** Extract and add remote configuration from network packet.
695   @param[in]	packet	network packet
696   @param[in]	length	packet length
697   @return error code */
add_config(const uchar * packet,size_t length)698   int add_config(const uchar *packet, size_t length) {
699     /* Get configuration parameter name. */
700     String_Key config_name;
701     auto err = extract_string(packet, length, config_name);
702     if (err != 0) {
703       return (err); /* purecov: inspected */
704     }
705 
706     /* Get configuration parameter value */
707     String_Key config_value;
708     err = extract_string(packet, length, config_value);
709     if (err == 0) {
710       auto key_val = std::make_pair(config_name, config_value);
711       m_parameters.m_configs.push_back(key_val);
712     }
713     return (err);
714   }
715 
716   /** Set locators returned by remote server
717   @param[in]	buffer	serialized locator information
718   @param[in]	length	length of serialized data
719   @return error code */
720   int set_locators(const uchar *buffer, size_t length);
721 
722   /** Apply descriptor returned by remote server
723   @param[in]	buffer	serialized data descriptor
724   @param[in]	length	length of serialized data
725   @return error code */
726   int set_descriptor(const uchar *buffer, size_t length);
727 
728   /** Extract and set error mesg from remote server
729   @param[in]	buffer	Remote error buffer
730   @param[in]	length	length of error buffer
731   @return error code */
732   int set_error(const uchar *buffer, size_t length);
733 
734   /** If PFS table and mutex is initialized. */
735   static bool s_pfs_initialized;
736 
737  private:
738   /** Clone status table data. */
739   static Status_pfs::Data s_status_data;
740 
741   /** Clone progress table data. */
742   static Progress_pfs::Data s_progress_data;
743 
744   /** Clone table mutex to protect PFS table data. */
745   static mysql_mutex_t s_table_mutex;
746 
747   /** Number of concurrent clone clients. */
748   static uint32_t s_num_clones;
749 
750  private:
751   /** Server thread object */
752   THD *m_server_thd;
753 
754   /** Auxiliary client connection */
755   Client_Aux m_conn_aux;
756 
757   /** Clone remote client connection */
758   MYSQL *m_conn;
759   NET_SERVER m_conn_server_extn;
760 
761   /** Intermediate buffer for data copy when zero copy is not used. */
762   Buffer m_copy_buff;
763 
764   /** Buffer holding data for RPC command */
765   Buffer m_cmd_buff;
766 
767   /** Clone external handle. Data is transferred from
768   external handle(network) to storage handle. */
769   Data_Link m_ext_link;
770 
771   /** If it is the master thread */
772   bool m_is_master;
773 
774   /** Thread index for multi-threaded clone */
775   uint32_t m_thread_index;
776 
777   /** Number of active worker tasks. */
778   uint32_t m_num_active_workers;
779 
780   /** Task IDs for different SE */
781   Task_Vector m_tasks;
782 
783   /** Storage is initialized */
784   bool m_storage_initialized;
785 
786   /** Storage is active with locators set */
787   bool m_storage_active;
788 
789   /** If backup lock is acquired */
790   bool m_acquired_backup_lock;
791 
792   /** Remote parameters for validation. */
793   Remote_Parameters m_parameters;
794 
795   /** Shared client information */
796   Client_Share *m_share;
797 };
798 
799 /** Clone client interface to handle callback from Storage Engine */
800 class Client_Cbk : public Ha_clone_cbk {
801  public:
802   /** Construct Callback. Set clone client object.
803   @param[in]	clone	clone client object */
Client_Cbk(Client * clone)804   Client_Cbk(Client *clone) : m_clone_client(clone) {}
805 
806   /** Get clone object
807   @return clone client object */
get_clone_client()808   Client *get_clone_client() const { return (m_clone_client); }
809 
810   /** Clone client file callback: Not used for client.
811   @param[in]	from_file	source file descriptor
812   @param[in]	len		data length
813   @return error code */
814   int file_cbk(Ha_clone_file from_file, uint len) override;
815 
816   /** Clone client buffer callback: Not used for client.
817   @param[in]	from_buffer	source buffer
818   @param[in]	buf_len		data length
819   @return error code */
820   int buffer_cbk(uchar *from_buffer, uint buf_len) override;
821 
822   /** Clone client apply callback: Copy data to storage
823   engine file from network.
824   @param[in]	to_file destination file descriptor
825   @return error code */
826   int apply_file_cbk(Ha_clone_file to_file) override;
827 
828   /** Clone client apply callback: Get data in buffer
829   @param[out]  to_buffer  data buffer
830   @param[out]  len        data length
831   @return error code */
832   int apply_buffer_cbk(uchar *&to_buffer, uint &len) override;
833 
834  private:
835   /** Apply data to local file or buffer.
836   @param[in,out]        to_file         destination file
837   @param[in]            apply_file      copy data to file
838   @param[out]           to_buffer       data buffer
839   @param[out]           to_len          data length
840   @return error code */
841   int apply_cbk(Ha_clone_file to_file, bool apply_file, uchar *&to_buffer,
842                 uint &to_len);
843 
844  private:
845   /** Clone client object */
846   Client *m_clone_client;
847 };
848 
849 }  // namespace myclone
850 
851 #endif /* CLONE_CLIENT_H */
852