1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /**
24 @file clone/include/clone_server.h
25 Clone Plugin: Server interface
26 
27 */
28 
29 #ifndef CLONE_SERVER_H
30 #define CLONE_SERVER_H
31 
32 #include "plugin/clone/include/clone.h"
33 #include "plugin/clone/include/clone_hton.h"
34 #include "plugin/clone/include/clone_os.h"
35 
36 /* Namespace for all clone data types */
37 namespace myclone {
38 /** For Remote Clone, "Clone Server" is created at donor. It retrieves data
39 from Storage Engines and transfers over network to remote "Clone Client". */
40 class Server {
41  public:
42   /** Construct clone server. Initialize storage and external handle
43   @param[in,out]	thd	server thread handle
44   @param[in]	socket	network socket to remote client */
45   Server(THD *thd, MYSQL_SOCKET socket);
46 
47   /** Destructor: Free the transfer buffer, if created. */
48   ~Server();
49 
50   /** Get storage handle vector for data transfer.
51   @return storage handle vector */
get_storage_vector()52   Storage_Vector &get_storage_vector() { return (m_storage_vec); }
53 
54   /** Get clone locator for a storage engine at specified index.
55   @param[in]	index	locator index
56   @param[out]	loc_len	locator length in bytes
57   @return storage locator */
get_locator(uint index,uint & loc_len)58   const uchar *get_locator(uint index, uint &loc_len) const {
59     DBUG_ASSERT(index < m_storage_vec.size());
60     loc_len = m_storage_vec[index].m_loc_len;
61     return (m_storage_vec[index].m_loc);
62   }
63 
64   /** Get tasks for different SE
65   @return task vector */
get_task_vector()66   Task_Vector &get_task_vector() { return (m_tasks); }
67 
68   /** Get external handle for data transfer. This is the
69   network socket to remote client.
70   @return external handle */
get_data_link()71   Data_Link *get_data_link() { return (&m_ext_link); }
72 
73   /** Get server thread handle
74   @return server thread */
get_thd()75   THD *get_thd() { return (m_server_thd); }
76 
77   /** Allocate and return buufer for data copy
78   @param[in]	len	buffer length
79   @return allocated pointer */
alloc_copy_buffer(uint len)80   uchar *alloc_copy_buffer(uint len) {
81     auto err = m_copy_buff.allocate(len);
82 
83     if (err != 0) {
84       return (nullptr);
85 
86     } else {
87       DBUG_ASSERT(m_copy_buff.m_length >= len);
88       return (m_copy_buff.m_buffer);
89     }
90   }
91 
92   /** Clone database and send data to remote client.
93   @return error code */
94   int clone();
95 
96   /** Send descriptor data to remote client
97   @param[in,out]	hton		SE handlerton
98   @param[in]	secure		validate secure connection
99   @param[in]	loc_index	current locator index
100   @param[in]	desc_buf	descriptor buffer
101   @param[in]	desc_len	buffer length
102   @return error code */
103   int send_descriptor(handlerton *hton, bool secure, uint loc_index,
104                       const uchar *desc_buf, uint desc_len);
105 
106   /** Send one string value.
107   @param[in]	rcmd	response command
108   @param[in]	key_str	string key
109   @param[in]	val_str	string value
110   @return error code */
111   int send_key_value(Command_Response rcmd, String_Key &key_str,
112                      String_Key &val_str);
113 
114   /** Configuration parameters to be validated by remote. */
115   static Key_Values s_configs;
116 
117  private:
118   /** Check if network error
119   @param[in]	err	error code
120   @return true if network error */
is_network_error(int err)121   static bool is_network_error(int err) {
122     if (err == ER_NET_ERROR_ON_WRITE || err == ER_NET_READ_ERROR ||
123         err == ER_NET_WRITE_INTERRUPTED || err == ER_NET_READ_INTERRUPTED ||
124         err == ER_NET_WAIT_ERROR) {
125       return (true);
126     }
127 
128     /* Check for protocol error */
129     if (err == ER_NET_PACKETS_OUT_OF_ORDER || err == ER_NET_UNCOMPRESS_ERROR ||
130         err == ER_NET_PACKET_TOO_LARGE || err == ER_CLONE_PROTOCOL) {
131       return (true);
132     }
133 
134     return (false);
135   }
136 
137   /** Send status back to client
138   @param[in]	err	error code
139   @return error code */
140   int send_status(int err);
141 
142   /** Initialize storage engine using command buffer.
143   @param[in]	mode	clone start mode
144   @param[in]	com_buf	command buffer
145   @param[in]	com_len	command buffer length
146   @return error code */
147   int init_storage(Ha_clone_mode mode, uchar *com_buf, size_t com_len);
148 
149   /** Parse command buffer and execute
150   @param[in]	command	command type
151   @param[in]	com_buf	buffer to parse
152   @param[in]	com_len	buffer length
153   @param[out]	done	true if all clone commands are done
154   @return error code */
155   int parse_command_buffer(uchar command, uchar *com_buf, size_t com_len,
156                            bool &done);
157 
158   /** Deserialize COM_INIT command buffer to extract version and locators
159   @param[in]	init_buf	INIT command buffer
160   @param[in]	init_len	buffer length
161   @return error code */
162   int deserialize_init_buffer(const uchar *init_buf, size_t init_len);
163 
164   /** Deserialize COM_ACK command buffer to extract descriptor
165   @param[in]	ack_buf		ACK command buffer
166   @param[in]	ack_len		buffer length
167   @param[in,out]	cbk		callback object
168   @param[out]	err_code	remote error
169   @param[out]	loc		Locator object
170   @return error code */
171   int deserialize_ack_buffer(const uchar *ack_buf, size_t ack_len,
172                              Ha_clone_cbk *cbk, int &err_code, Locator *loc);
173 
174   /** Send back the locators
175   @return error code */
176   int send_locators();
177 
178   /** Send mysql server parameters
179   @return error code */
180   int send_params();
181 
182  private:
183   /** Server thread object */
184   THD *m_server_thd;
185 
186   /** If this is the master task */
187   bool m_is_master;
188 
189   /** Intermediate buffer for data copy when zero copy is not used. */
190   Buffer m_copy_buff;
191 
192   /** Buffer holding data for RPC response */
193   Buffer m_res_buff;
194 
195   /** Clone external handle. Data is transferred from
196   storage handle to external handle(network). */
197   Data_Link m_ext_link;
198 
199   /** Clone storage handle */
200   Storage_Vector m_storage_vec;
201 
202   /** Task IDs for different SE */
203   Task_Vector m_tasks;
204 
205   /** Storage vector is initialized */
206   bool m_storage_initialized;
207 
208   /** PFS statement is initialized */
209   bool m_pfs_initialized;
210 
211   /** If backup lock is acquired */
212   bool m_acquired_backup_lock;
213 
214   /** Negotiated protocol version */
215   uint32_t m_protocol_version;
216 
217   /** DDL timeout from client */
218   uint32_t m_client_ddl_timeout;
219 };
220 
221 /** Clone server interface to handle callback from Storage Engine */
222 class Server_Cbk : public Ha_clone_cbk {
223  public:
224   /** Construct Callback. Set clone server object.
225   @param[in]	clone	clone server object */
Server_Cbk(Server * clone)226   Server_Cbk(Server *clone) : m_clone_server(clone) {}
227 
228   /** Get clone object
229   @return clone server object */
get_clone_server()230   Server *get_clone_server() const { return (m_clone_server); }
231 
232   /** Send descriptor data to remote client */
233   int send_descriptor();
234 
235   /** Clone server file callback: Send data from file to remote client
236   @param[in]	from_file	source file descriptor
237   @param[in]	len		data length
238   @return error code */
239   int file_cbk(Ha_clone_file from_file, uint len) override;
240 
241   /** Clone server buffer callback: Send data from buffer to remote client
242   @param[in]	from_buffer	source buffer
243   @param[in]	buf_len		data length
244   @return error code */
245   int buffer_cbk(uchar *from_buffer, uint buf_len) override;
246 
247   /** Clone server apply callback: Not used for server.
248   @param[in]	to_file	destination file descriptor
249   @return error code */
250   int apply_file_cbk(Ha_clone_file to_file) override;
251 
252   /** Clone server apply callback: Not used for server.
253   @param[out]  to_buffer  data buffer
254   @param[out]  len        data length
255   @return error code */
256   int apply_buffer_cbk(uchar *&to_buffer, uint &len) override;
257 
258  private:
259   /** Clone server object */
260   Server *m_clone_server;
261 };
262 
263 }  // namespace myclone
264 
265 #endif /* CLONE_SERVER_H */
266