1 /* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /**
24 @file clone/src/clone_server.cc
25 Clone Plugin: Server implementation
26 
27 */
28 
29 #include "plugin/clone/include/clone_server.h"
30 #include "plugin/clone/include/clone_status.h"
31 
32 #include "my_byteorder.h"
33 
34 /* Namespace for all clone data types */
35 namespace myclone {
36 
37 /** All configuration parameters to be validated. */
38 Key_Values Server::s_configs = {{"version", ""},
39                                 {"version_compile_machine", ""},
40                                 {"version_compile_os", ""},
41                                 {"character_set_server", ""},
42                                 {"character_set_filesystem", ""},
43                                 {"collation_server", ""},
44                                 {"innodb_page_size", ""}};
45 
Server(THD * thd,MYSQL_SOCKET socket)46 Server::Server(THD *thd, MYSQL_SOCKET socket)
47     : m_server_thd(thd),
48       m_is_master(false),
49       m_storage_initialized(false),
50       m_pfs_initialized(false),
51       m_acquired_backup_lock(false),
52       m_protocol_version(CLONE_PROTOCOL_VERSION),
53       m_client_ddl_timeout() {
54   m_ext_link.set_socket(socket);
55   m_storage_vec.reserve(MAX_CLONE_STORAGE_ENGINE);
56 
57   m_tasks.reserve(MAX_CLONE_STORAGE_ENGINE);
58 
59   m_copy_buff.init();
60   m_res_buff.init();
61 }
62 
~Server()63 Server::~Server() {
64   DBUG_ASSERT(!m_storage_initialized);
65   m_copy_buff.free();
66   m_res_buff.free();
67 }
68 
clone()69 int Server::clone() {
70   int err = 0;
71 
72   while (true) {
73     uchar command;
74     uchar *com_buf;
75     size_t com_len;
76 
77     err = mysql_service_clone_protocol->mysql_clone_get_command(
78         get_thd(), &command, &com_buf, &com_len);
79 
80     bool done = true;
81 
82     if (err == 0) {
83       err = parse_command_buffer(command, com_buf, com_len, done);
84     }
85 
86     if (err == 0 && thd_killed(get_thd())) {
87       my_error(ER_QUERY_INTERRUPTED, MYF(0));
88       err = ER_QUERY_INTERRUPTED;
89     }
90 
91     /* Send status to client */
92     err = send_status(err);
93 
94     if (done || err != 0) {
95       if (m_storage_initialized) {
96         DBUG_ASSERT(err != 0);
97         /* Don't abort clone if worker thread fails during attach. */
98         int in_err = (command == COM_ATTACH) ? 0 : err;
99 
100         hton_clone_end(get_thd(), get_storage_vector(), m_tasks, in_err);
101         m_storage_initialized = false;
102       }
103       /* Release if we have acquired backup lock */
104       if (m_acquired_backup_lock) {
105         DBUG_ASSERT(m_is_master);
106         mysql_service_mysql_backup_lock->release(get_thd());
107       }
108       break;
109     }
110   }
111 
112   log_error(get_thd(), false, err, "Exiting clone protocol");
113   return (err);
114 }
115 
send_status(int err)116 int Server::send_status(int err) {
117   uchar res_cmd;
118   char info_mesg[128];
119 
120   if (err == 0) {
121     /* Send complete response */
122     res_cmd = static_cast<uchar>(COM_RES_COMPLETE);
123 
124     err = mysql_service_clone_protocol->mysql_clone_send_response(
125         get_thd(), false, &res_cmd, sizeof(res_cmd));
126     log_error(get_thd(), false, err, "COM_RES_COMPLETE");
127 
128   } else {
129     /* Send Error Response */
130     res_cmd = static_cast<uchar>(COM_RES_ERROR);
131 
132     snprintf(info_mesg, 128, "Before sending COM_RES_ERROR: %s",
133              is_network_error(err) ? "network " : " ");
134     log_error(get_thd(), false, err, &info_mesg[0]);
135 
136     err = mysql_service_clone_protocol->mysql_clone_send_error(
137         get_thd(), res_cmd, is_network_error(err));
138     log_error(get_thd(), false, err, "After sending COM_RES_ERROR");
139   }
140 
141   return (err);
142 }
143 
init_storage(Ha_clone_mode mode,uchar * com_buf,size_t com_len)144 int Server::init_storage(Ha_clone_mode mode, uchar *com_buf, size_t com_len) {
145   auto thd = get_thd();
146 
147   DBUG_ASSERT(thd != nullptr);
148   DBUG_ASSERT(!m_pfs_initialized);
149 
150   auto err = deserialize_init_buffer(com_buf, com_len);
151 
152   if (err != 0) {
153     return (err);
154   }
155 
156   if (m_is_master) {
157     /* Set statement type for master thread */
158     mysql_service_clone_protocol->mysql_clone_start_statement(
159         thd, PSI_NOT_INSTRUMENTED, clone_stmt_server_key);
160 
161     /* Acquire backup lock */
162     if (m_client_ddl_timeout != 0) {
163       auto failed = mysql_service_mysql_backup_lock->acquire(
164           thd, BACKUP_LOCK_SERVICE_DEFAULT, m_client_ddl_timeout);
165 
166       if (failed) {
167         return (ER_LOCK_WAIT_TIMEOUT);
168       }
169       m_acquired_backup_lock = true;
170       log_error(get_thd(), false, 0, "Acquired backup lock");
171     }
172   }
173   m_pfs_initialized = true;
174 
175   /* Get server locators */
176   err = hton_clone_begin(get_thd(), get_storage_vector(), m_tasks,
177                          HA_CLONE_HYBRID, mode);
178   if (err != 0) {
179     return (err);
180   }
181   m_storage_initialized = true;
182 
183   if (m_is_master && mode == HA_CLONE_MODE_START) {
184     /* Validate local configurations. */
185     err = validate_local_params(get_thd());
186 
187     if (err == 0) {
188       /* Send current server parameters for validation. */
189       err = send_params();
190     }
191 
192     if (err != 0) {
193       return (err);
194     }
195   }
196 
197   /* Send locators back to client */
198   err = send_locators();
199 
200   return (err);
201 }
202 
parse_command_buffer(uchar command,uchar * com_buf,size_t com_len,bool & done)203 int Server::parse_command_buffer(uchar command, uchar *com_buf, size_t com_len,
204                                  bool &done) {
205   int err = 0;
206   auto com = static_cast<Command_RPC>(command);
207   done = false;
208 
209   switch (com) {
210     case COM_REINIT:
211       m_is_master = true;
212       err = init_storage(HA_CLONE_MODE_RESTART, com_buf, com_len);
213       log_error(get_thd(), false, err, "COM_REINIT: Storage Initialize");
214       break;
215 
216     case COM_INIT:
217       m_is_master = true;
218 
219       /* Initialize storage, send locators and validating configurations.  */
220       err = init_storage(HA_CLONE_MODE_START, com_buf, com_len);
221 
222       log_error(get_thd(), false, err, "COM_INIT: Storage Initialize");
223       break;
224 
225     case COM_ATTACH:
226       m_is_master = false;
227       err = init_storage(HA_CLONE_MODE_ADD_TASK, com_buf, com_len);
228       log_error(get_thd(), false, err, "COM_ATTACH: Storage Attach");
229       break;
230 
231     case COM_EXECUTE: {
232       if (!m_storage_initialized) {
233         /* purecov: begin deadcode */
234         err = ER_CLONE_PROTOCOL;
235         my_error(err, MYF(0), "Wrong Clone RPC: Execute request before Init");
236         log_error(get_thd(), false, err, "COM_EXECUTE : Storage ninitialized");
237         break;
238         /* purecov: end */
239       }
240 
241       Server_Cbk clone_callback(this);
242 
243       err = hton_clone_copy(get_thd(), get_storage_vector(), m_tasks,
244                             &clone_callback);
245       log_error(get_thd(), false, err, "COM_EXECUTE: Storage Execute");
246       break;
247     }
248     case COM_ACK: {
249       m_pfs_initialized = true;
250       int err_code = 0;
251       Locator loc = {nullptr, nullptr, 0};
252 
253       Server_Cbk clone_callback(this);
254 
255       err = deserialize_ack_buffer(com_buf, com_len, &clone_callback, err_code,
256                                    &loc);
257 
258       if (err == 0) {
259         auto hton = loc.m_hton;
260 
261         err = hton->clone_interface.clone_ack(hton, get_thd(), loc.m_loc,
262                                               loc.m_loc_len, 0, err_code,
263                                               &clone_callback);
264       }
265       log_error(get_thd(), false, err, "COM_ACK: Storage Ack");
266       break;
267     }
268 
269     case COM_EXIT:
270       if (m_storage_initialized) {
271         hton_clone_end(get_thd(), get_storage_vector(), m_tasks, 0);
272         m_storage_initialized = false;
273       }
274       done = true;
275       log_error(get_thd(), false, err, "COM_EXIT: Storage End");
276       break;
277 
278     case COM_MAX:
279       /* Fall through */
280     default:
281       /* purecov: begin deadcode */
282       err = ER_CLONE_PROTOCOL;
283       my_error(err, MYF(0), "Wrong Clone RPC: Invalid request");
284       break;
285       /* purecov: end */
286   }
287   return (err);
288 }
289 
deserialize_ack_buffer(const uchar * ack_buf,size_t ack_len,Ha_clone_cbk * cbk,int & err_code,Locator * loc)290 int Server::deserialize_ack_buffer(const uchar *ack_buf, size_t ack_len,
291                                    Ha_clone_cbk *cbk, int &err_code,
292                                    Locator *loc) {
293   size_t serialized_length = 0;
294 
295   const uchar *desc_ptr = nullptr;
296   uint desc_len = 0;
297 
298   /*  Should not deserialize if less than the base length */
299   if (ack_len < (4 + loc->serlialized_length())) {
300     goto err_end;
301   }
302 
303   /* Extract error code */
304   err_code = uint4korr(ack_buf);
305   ack_buf += 4;
306   ack_len -= 4;
307 
308   /* Extract Locator */
309   serialized_length = loc->deserialize(get_thd(), ack_buf);
310 
311   if (ack_len < serialized_length) {
312     goto err_end;
313   }
314   ack_buf += serialized_length;
315   ack_len -= serialized_length;
316 
317   /* Extract descriptor */
318   if (ack_len < 4) {
319     goto err_end;
320   }
321 
322   desc_len = uint4korr(ack_buf);
323   ack_buf += 4;
324   ack_len -= 4;
325 
326   if (desc_len > 0) {
327     desc_ptr = ack_buf;
328   }
329 
330   cbk->set_data_desc(desc_ptr, desc_len);
331 
332   ack_len -= desc_len;
333 
334   if (ack_len == 0) {
335     return (0);
336   }
337 
338 err_end:
339   /* purecov: begin deadcode */
340   my_error(ER_CLONE_PROTOCOL, MYF(0), "Wrong Clone RPC: Init ACK length");
341   return (ER_CLONE_PROTOCOL);
342   /* purecov: end */
343 }
344 
deserialize_init_buffer(const uchar * init_buf,size_t init_len)345 int Server::deserialize_init_buffer(const uchar *init_buf, size_t init_len) {
346   if (init_len < 8) {
347     goto err_end;
348   }
349 
350   /* Extract protocol version */
351   m_protocol_version = uint4korr(init_buf);
352   if (m_protocol_version > CLONE_PROTOCOL_VERSION) {
353     m_protocol_version = CLONE_PROTOCOL_VERSION;
354   }
355   init_buf += 4;
356   init_len -= 4;
357 
358   /* Extract DDL timeout */
359   m_client_ddl_timeout = uint4korr(init_buf);
360   init_buf += 4;
361   init_len -= 4;
362 
363   /* Initialize locators */
364   while (init_len > 0) {
365     Locator loc = {nullptr, nullptr, 0};
366 
367     /*  Should not deserialize if less than the base length */
368     if (init_len < loc.serlialized_length()) {
369       goto err_end;
370     }
371 
372     auto serialized_length = loc.deserialize(get_thd(), init_buf);
373 
374     init_buf += serialized_length;
375 
376     if (init_len < serialized_length) {
377       goto err_end;
378     }
379 
380     m_storage_vec.push_back(loc);
381 
382     init_len -= serialized_length;
383   }
384 
385   if (init_len == 0) {
386     return (0);
387   }
388 
389 err_end:
390   my_error(ER_CLONE_PROTOCOL, MYF(0), "Wrong Clone RPC: Init buffer length");
391 
392   return (ER_CLONE_PROTOCOL);
393 }
394 
send_key_value(Command_Response rcmd,String_Key & key_str,String_Key & val_str)395 int Server::send_key_value(Command_Response rcmd, String_Key &key_str,
396                            String_Key &val_str) {
397   /* Add length for key. */
398   auto buf_len = key_str.length();
399   buf_len += 4;
400 
401   /** Add length for value. */
402   if (rcmd == COM_RES_CONFIG) {
403     buf_len += val_str.length();
404     buf_len += 4;
405   }
406   /* Add length for response type. */
407   ++buf_len;
408 
409   /* Allocate for response buffer */
410   auto err = m_res_buff.allocate(buf_len);
411   auto buf_ptr = m_res_buff.m_buffer;
412   if (err != 0) {
413     return (true);
414   }
415 
416   /* Store response command */
417   *buf_ptr = static_cast<uchar>(rcmd);
418   ++buf_ptr;
419 
420   /* Store key */
421   int4store(buf_ptr, key_str.length());
422   buf_ptr += 4;
423   memcpy(buf_ptr, key_str.c_str(), key_str.length());
424   buf_ptr += key_str.length();
425 
426   /* Store Value */
427   if (rcmd == COM_RES_CONFIG) {
428     int4store(buf_ptr, val_str.length());
429     buf_ptr += 4;
430     memcpy(buf_ptr, val_str.c_str(), val_str.length());
431   }
432   err = mysql_service_clone_protocol->mysql_clone_send_response(
433       get_thd(), false, m_res_buff.m_buffer, buf_len);
434 
435   return (err);
436 }
437 
send_params()438 int Server::send_params() {
439   int err = 0;
440 
441   /* Send plugins */
442   auto plugin_cbk = [](THD *, plugin_ref plugin, void *ctx) {
443     auto server = static_cast<Server *>(ctx);
444 
445     if (plugin == nullptr || plugin_state(plugin) == PLUGIN_IS_FREED ||
446         plugin_state(plugin) == PLUGIN_IS_DISABLED) {
447       return (false);
448     }
449     /* Send plugin name string */
450     String_Key pstring(plugin_name(plugin)->str, plugin_name(plugin)->length);
451     auto err = server->send_key_value(COM_RES_PLUGIN, pstring, pstring);
452 
453     return (err != 0);
454   };
455 
456   auto result = plugin_foreach_with_mask(
457       get_thd(), plugin_cbk, MYSQL_ANY_PLUGIN, ~PLUGIN_IS_FREED, this);
458 
459   if (result) {
460     err = ER_INTERNAL_ERROR;
461     my_error(err, MYF(0), "Clone error sending plugin information");
462     return (err);
463   }
464 
465   /* Send character sets and collations */
466   String_Keys char_sets;
467 
468   err = mysql_service_clone_protocol->mysql_clone_get_charsets(get_thd(),
469                                                                char_sets);
470   if (err != 0) {
471     return (err);
472   }
473 
474   for (auto &element : char_sets) {
475     err = send_key_value(COM_RES_COLLATION, element, element);
476     if (err != 0) {
477       return (err);
478     }
479   }
480 
481   /* Send configurations */
482   err = mysql_service_clone_protocol->mysql_clone_get_configs(get_thd(),
483                                                               s_configs);
484   if (err != 0) {
485     return (err);
486   }
487 
488   for (auto &key_val : s_configs) {
489     err = send_key_value(COM_RES_CONFIG, key_val.first, key_val.second);
490     if (err != 0) {
491       break;
492     }
493   }
494   return (err);
495 }
496 
send_locators()497 int Server::send_locators() {
498   /* Add length of protocol Version */
499   auto buf_len = sizeof(m_protocol_version);
500 
501   /* Add length for response type */
502   ++buf_len;
503 
504   /* Add SE and locator length */
505   for (auto &loc : m_storage_vec) {
506     buf_len += loc.serlialized_length();
507   }
508 
509   /* Allocate for response buffer */
510   auto err = m_res_buff.allocate(buf_len);
511   auto buf_ptr = m_res_buff.m_buffer;
512 
513   if (err != 0) {
514     return (err);
515   }
516 
517   /* Store response command */
518   *buf_ptr = static_cast<uchar>(COM_RES_LOCS);
519   ++buf_ptr;
520 
521   /* Store version */
522   int4store(buf_ptr, m_protocol_version);
523   buf_ptr += 4;
524 
525   /* Store SE information and Locators */
526   for (auto &loc : m_storage_vec) {
527     buf_ptr += loc.serialize(buf_ptr);
528   }
529 
530   err = mysql_service_clone_protocol->mysql_clone_send_response(
531       get_thd(), false, m_res_buff.m_buffer, buf_len);
532 
533   return (err);
534 }
535 
send_descriptor(handlerton * hton,bool secure,uint loc_index,const uchar * desc_buf,uint desc_len)536 int Server::send_descriptor(handlerton *hton, bool secure, uint loc_index,
537                             const uchar *desc_buf, uint desc_len) {
538   /* Add data descriptor length */
539   auto buf_len = desc_len;
540 
541   /* Add length for response type */
542   ++buf_len;
543 
544   /* Add length for Storage Engine type */
545   ++buf_len;
546 
547   /* Add length for Locator Index */
548   ++buf_len;
549 
550   /* Allocate for response buffer */
551   auto err = m_res_buff.allocate(buf_len);
552 
553   if (err != 0) {
554     return (err);
555   }
556 
557   auto buf_ptr = m_res_buff.m_buffer;
558 
559   /* Store response command */
560   *buf_ptr = static_cast<uchar>(COM_RES_DATA_DESC);
561   ++buf_ptr;
562 
563   /* Store Storage Engine type */
564   *buf_ptr = static_cast<uchar>(hton->db_type);
565   ++buf_ptr;
566 
567   /* Store Locator Index */
568   *buf_ptr = static_cast<uchar>(loc_index);
569   ++buf_ptr;
570 
571   /* Store Descriptor */
572   memcpy(buf_ptr, desc_buf, desc_len);
573 
574   err = mysql_service_clone_protocol->mysql_clone_send_response(
575       get_thd(), secure, m_res_buff.m_buffer, buf_len);
576 
577   return (err);
578 }
579 
send_descriptor()580 int Server_Cbk::send_descriptor() {
581   auto server = get_clone_server();
582 
583   uint desc_len = 0;
584   auto desc = get_data_desc(&desc_len);
585 
586   auto err = server->send_descriptor(get_hton(), is_secure(), get_loc_index(),
587                                      desc, desc_len);
588   return (err);
589 }
590 
file_cbk(Ha_clone_file from_file,uint len)591 int Server_Cbk::file_cbk(Ha_clone_file from_file, uint len) {
592   auto server = get_clone_server();
593 
594   /* Check if session is interrupted. */
595   if (thd_killed(server->get_thd())) {
596     my_error(ER_QUERY_INTERRUPTED, MYF(0));
597     return (ER_QUERY_INTERRUPTED);
598   }
599 
600   /* Add one byte for descriptor type */
601   auto buf_len = len + 1;
602   auto buf_ptr = server->alloc_copy_buffer(buf_len + CLONE_OS_ALIGN);
603 
604   if (buf_ptr == nullptr) {
605     return (ER_OUTOFMEMORY);
606   }
607 
608   /* Store response command */
609   auto data_ptr = buf_ptr + 1;
610 
611   /* Align buffer to CLONE_OS_ALIGN[4K] for O_DIRECT */
612   data_ptr = clone_os_align(data_ptr);
613   buf_ptr = data_ptr - 1;
614 
615   *buf_ptr = static_cast<uchar>(COM_RES_DATA);
616 
617   auto err =
618       clone_os_copy_file_to_buf(from_file, data_ptr, len, get_source_name());
619   if (err != 0) {
620     return (err);
621   }
622 
623   /* Step 1: Send Descriptor */
624   err = send_descriptor();
625 
626   if (err != 0) {
627     return (err);
628   }
629 
630   /* Step 2: Send Data */
631   err = mysql_service_clone_protocol->mysql_clone_send_response(
632       server->get_thd(), false, buf_ptr, buf_len);
633 
634   return (err);
635 }
636 
buffer_cbk(uchar * from_buffer,uint buf_len)637 int Server_Cbk::buffer_cbk(uchar *from_buffer, uint buf_len) {
638   auto server = get_clone_server();
639 
640   if (thd_killed(server->get_thd())) {
641     my_error(ER_QUERY_INTERRUPTED, MYF(0));
642     return (ER_QUERY_INTERRUPTED);
643   }
644 
645   uchar *buf_ptr = nullptr;
646   uint total_len = 0;
647 
648   if (buf_len > 0) {
649     /* Add one byte for descriptor type */
650     total_len = buf_len + 1;
651     buf_ptr = server->alloc_copy_buffer(total_len);
652 
653     if (buf_ptr == nullptr) {
654       return (ER_OUTOFMEMORY);
655     }
656   }
657 
658   /* Step 1: Send Descriptor */
659   auto err = send_descriptor();
660 
661   if (err != 0 || buf_len == 0) {
662     return (err);
663   }
664 
665   /* Step 2: Send Data */
666   *buf_ptr = static_cast<uchar>(COM_RES_DATA);
667   memcpy(buf_ptr + 1, from_buffer, static_cast<size_t>(buf_len));
668 
669   err = mysql_service_clone_protocol->mysql_clone_send_response(
670       server->get_thd(), false, buf_ptr, total_len);
671 
672   return (err);
673 }
674 
675 /* purecov: begin deadcode */
apply_file_cbk(Ha_clone_file to_file MY_ATTRIBUTE ((unused)))676 int Server_Cbk::apply_file_cbk(Ha_clone_file to_file MY_ATTRIBUTE((unused))) {
677   DBUG_ASSERT(false);
678   my_error(ER_INTERNAL_ERROR, MYF(0), "Apply callback from Clone Server");
679   return (ER_INTERNAL_ERROR);
680 }
681 
apply_buffer_cbk(uchar * & to_buffer MY_ATTRIBUTE ((unused)),uint & len MY_ATTRIBUTE ((unused)))682 int Server_Cbk::apply_buffer_cbk(uchar *&to_buffer MY_ATTRIBUTE((unused)),
683                                  uint &len MY_ATTRIBUTE((unused))) {
684   DBUG_ASSERT(false);
685   my_error(ER_INTERNAL_ERROR, MYF(0), "Apply callback from Clone Server");
686   return (ER_INTERNAL_ERROR);
687 }
688 /* purecov: end */
689 }  // namespace myclone
690