1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 22 23 /** 24 @file clone/include/clone_server.h 25 Clone Plugin: Server interface 26 27 */ 28 29 #ifndef CLONE_SERVER_H 30 #define CLONE_SERVER_H 31 32 #include "plugin/clone/include/clone.h" 33 #include "plugin/clone/include/clone_hton.h" 34 #include "plugin/clone/include/clone_os.h" 35 36 /* Namespace for all clone data types */ 37 namespace myclone { 38 /** For Remote Clone, "Clone Server" is created at donor. It retrieves data 39 from Storage Engines and transfers over network to remote "Clone Client". */ 40 class Server { 41 public: 42 /** Construct clone server. Initialize storage and external handle 43 @param[in,out] thd server thread handle 44 @param[in] socket network socket to remote client */ 45 Server(THD *thd, MYSQL_SOCKET socket); 46 47 /** Destructor: Free the transfer buffer, if created. */ 48 ~Server(); 49 50 /** Get storage handle vector for data transfer. 51 @return storage handle vector */ get_storage_vector()52 Storage_Vector &get_storage_vector() { return (m_storage_vec); } 53 54 /** Get clone locator for a storage engine at specified index. 55 @param[in] index locator index 56 @param[out] loc_len locator length in bytes 57 @return storage locator */ get_locator(uint index,uint & loc_len)58 const uchar *get_locator(uint index, uint &loc_len) const { 59 DBUG_ASSERT(index < m_storage_vec.size()); 60 loc_len = m_storage_vec[index].m_loc_len; 61 return (m_storage_vec[index].m_loc); 62 } 63 64 /** Get tasks for different SE 65 @return task vector */ get_task_vector()66 Task_Vector &get_task_vector() { return (m_tasks); } 67 68 /** Get external handle for data transfer. This is the 69 network socket to remote client. 70 @return external handle */ get_data_link()71 Data_Link *get_data_link() { return (&m_ext_link); } 72 73 /** Get server thread handle 74 @return server thread */ get_thd()75 THD *get_thd() { return (m_server_thd); } 76 77 /** Allocate and return buufer for data copy 78 @param[in] len buffer length 79 @return allocated pointer */ alloc_copy_buffer(uint len)80 uchar *alloc_copy_buffer(uint len) { 81 auto err = m_copy_buff.allocate(len); 82 83 if (err != 0) { 84 return (nullptr); 85 86 } else { 87 DBUG_ASSERT(m_copy_buff.m_length >= len); 88 return (m_copy_buff.m_buffer); 89 } 90 } 91 92 /** Clone database and send data to remote client. 93 @return error code */ 94 int clone(); 95 96 /** Send descriptor data to remote client 97 @param[in,out] hton SE handlerton 98 @param[in] secure validate secure connection 99 @param[in] loc_index current locator index 100 @param[in] desc_buf descriptor buffer 101 @param[in] desc_len buffer length 102 @return error code */ 103 int send_descriptor(handlerton *hton, bool secure, uint loc_index, 104 const uchar *desc_buf, uint desc_len); 105 106 /** Send one string value. 107 @param[in] rcmd response command 108 @param[in] key_str string key 109 @param[in] val_str string value 110 @return error code */ 111 int send_key_value(Command_Response rcmd, String_Key &key_str, 112 String_Key &val_str); 113 114 /** Configuration parameters to be validated by remote. */ 115 static Key_Values s_configs; 116 117 private: 118 /** Check if network error 119 @param[in] err error code 120 @return true if network error */ is_network_error(int err)121 static bool is_network_error(int err) { 122 if (err == ER_NET_ERROR_ON_WRITE || err == ER_NET_READ_ERROR || 123 err == ER_NET_WRITE_INTERRUPTED || err == ER_NET_READ_INTERRUPTED || 124 err == ER_NET_WAIT_ERROR) { 125 return (true); 126 } 127 128 /* Check for protocol error */ 129 if (err == ER_NET_PACKETS_OUT_OF_ORDER || err == ER_NET_UNCOMPRESS_ERROR || 130 err == ER_NET_PACKET_TOO_LARGE || err == ER_CLONE_PROTOCOL) { 131 return (true); 132 } 133 134 return (false); 135 } 136 137 /** Send status back to client 138 @param[in] err error code 139 @return error code */ 140 int send_status(int err); 141 142 /** Initialize storage engine using command buffer. 143 @param[in] mode clone start mode 144 @param[in] com_buf command buffer 145 @param[in] com_len command buffer length 146 @return error code */ 147 int init_storage(Ha_clone_mode mode, uchar *com_buf, size_t com_len); 148 149 /** Parse command buffer and execute 150 @param[in] command command type 151 @param[in] com_buf buffer to parse 152 @param[in] com_len buffer length 153 @param[out] done true if all clone commands are done 154 @return error code */ 155 int parse_command_buffer(uchar command, uchar *com_buf, size_t com_len, 156 bool &done); 157 158 /** Deserialize COM_INIT command buffer to extract version and locators 159 @param[in] init_buf INIT command buffer 160 @param[in] init_len buffer length 161 @return error code */ 162 int deserialize_init_buffer(const uchar *init_buf, size_t init_len); 163 164 /** Deserialize COM_ACK command buffer to extract descriptor 165 @param[in] ack_buf ACK command buffer 166 @param[in] ack_len buffer length 167 @param[in,out] cbk callback object 168 @param[out] err_code remote error 169 @param[out] loc Locator object 170 @return error code */ 171 int deserialize_ack_buffer(const uchar *ack_buf, size_t ack_len, 172 Ha_clone_cbk *cbk, int &err_code, Locator *loc); 173 174 /** Send back the locators 175 @return error code */ 176 int send_locators(); 177 178 /** Send mysql server parameters 179 @return error code */ 180 int send_params(); 181 182 private: 183 /** Server thread object */ 184 THD *m_server_thd; 185 186 /** If this is the master task */ 187 bool m_is_master; 188 189 /** Intermediate buffer for data copy when zero copy is not used. */ 190 Buffer m_copy_buff; 191 192 /** Buffer holding data for RPC response */ 193 Buffer m_res_buff; 194 195 /** Clone external handle. Data is transferred from 196 storage handle to external handle(network). */ 197 Data_Link m_ext_link; 198 199 /** Clone storage handle */ 200 Storage_Vector m_storage_vec; 201 202 /** Task IDs for different SE */ 203 Task_Vector m_tasks; 204 205 /** Storage vector is initialized */ 206 bool m_storage_initialized; 207 208 /** PFS statement is initialized */ 209 bool m_pfs_initialized; 210 211 /** If backup lock is acquired */ 212 bool m_acquired_backup_lock; 213 214 /** Negotiated protocol version */ 215 uint32_t m_protocol_version; 216 217 /** DDL timeout from client */ 218 uint32_t m_client_ddl_timeout; 219 }; 220 221 /** Clone server interface to handle callback from Storage Engine */ 222 class Server_Cbk : public Ha_clone_cbk { 223 public: 224 /** Construct Callback. Set clone server object. 225 @param[in] clone clone server object */ Server_Cbk(Server * clone)226 Server_Cbk(Server *clone) : m_clone_server(clone) {} 227 228 /** Get clone object 229 @return clone server object */ get_clone_server()230 Server *get_clone_server() const { return (m_clone_server); } 231 232 /** Send descriptor data to remote client */ 233 int send_descriptor(); 234 235 /** Clone server file callback: Send data from file to remote client 236 @param[in] from_file source file descriptor 237 @param[in] len data length 238 @return error code */ 239 int file_cbk(Ha_clone_file from_file, uint len) override; 240 241 /** Clone server buffer callback: Send data from buffer to remote client 242 @param[in] from_buffer source buffer 243 @param[in] buf_len data length 244 @return error code */ 245 int buffer_cbk(uchar *from_buffer, uint buf_len) override; 246 247 /** Clone server apply callback: Not used for server. 248 @param[in] to_file destination file descriptor 249 @return error code */ 250 int apply_file_cbk(Ha_clone_file to_file) override; 251 252 /** Clone server apply callback: Not used for server. 253 @param[out] to_buffer data buffer 254 @param[out] len data length 255 @return error code */ 256 int apply_buffer_cbk(uchar *&to_buffer, uint &len) override; 257 258 private: 259 /** Clone server object */ 260 Server *m_clone_server; 261 }; 262 263 } // namespace myclone 264 265 #endif /* CLONE_SERVER_H */ 266