1 /*
2   Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "client/dump/mysqldump_tool_chain_maker.h"
26 
27 #include <stddef.h>
28 #include <boost/algorithm/string.hpp>
29 #include <functional>
30 
31 #include "client/dump/compression_lz4_writer.h"
32 #include "client/dump/compression_zlib_writer.h"
33 #include "client/dump/file_writer.h"
34 #include "client/dump/i_output_writer.h"
35 #include "client/dump/mysqldump_tool_chain_maker_options.h"
36 #include "client/dump/sql_formatter.h"
37 #include "client/dump/standard_writer.h"
38 #include "client/dump/view.h"
39 #include "m_ctype.h"
40 
41 using namespace Mysql::Tools::Dump;
42 using std::placeholders::_1;
43 
delete_chain(uint64,I_object_reader *)44 void Mysqldump_tool_chain_maker::delete_chain(uint64, I_object_reader *) {}
45 
create_chain(Chain_data *,I_dump_task * dump_task)46 I_object_reader *Mysqldump_tool_chain_maker::create_chain(
47     Chain_data *, I_dump_task *dump_task) {
48   Table_rows_dump_task *rows_task =
49       dynamic_cast<Table_rows_dump_task *>(dump_task);
50   if (rows_task != nullptr &&
51       (m_options->m_skip_rows_data ||
52        rows_task->get_related_table()->get_type() == "FEDERATED" ||
53        rows_task->get_related_table()->get_type() == "MRG_ISAM" ||
54        !this->compare_no_case_latin_with_db_string(
55            "MRG_MyISAM", rows_task->get_related_table()->get_type()))) {
56     return nullptr;
57   }
58 
59   Abstract_data_object *object =
60       dynamic_cast<Abstract_data_object *>(dump_task->get_related_db_object());
61   if (!m_options->is_object_included_in_dump(object)) {
62     return nullptr;
63   }
64   /*
65     View dependency check is moved post filteration. This will ensure that
66     only filtered out views will be checked for their dependecies. This
67     allows mysqlpump to dump a database even when there exsits an invalid
68     view in another database which user is not interested to dump. I_S views
69     are skipped from this check.
70   */
71   if (object && (dynamic_cast<View *>(object) != nullptr) &&
72       my_strcasecmp(&my_charset_latin1, object->get_schema().c_str(),
73                     INFORMATION_SCHEMA_DB_NAME)) {
74     Mysql::Tools::Base::Mysql_query_runner *runner = this->get_runner();
75     /* Check if view dependent objects exists */
76     if (runner->run_query(std::string("LOCK TABLES ") +
77                           this->get_quoted_object_full_name(
78                               object->get_schema(), object->get_name()) +
79                           " READ") != 0)
80       return nullptr;
81     else
82       runner->run_query(std::string("UNLOCK TABLES"));
83     delete runner;
84   }
85 
86   if (m_main_object_reader == nullptr) {
87     I_output_writer *writer;
88     if (m_options->m_result_file.has_value())
89       writer = new File_writer(this->get_message_handler(),
90                                this->get_object_id_generator(),
91                                m_options->m_result_file.value());
92     else
93       writer = new Standard_writer(this->get_message_handler(),
94                                    this->get_object_id_generator());
95     if (writer->init()) {
96       delete writer;
97       return nullptr;
98     }
99     m_all_created_elements.push_back(writer);
100     if (m_options->m_compress_output_algorithm.has_value()) {
101       std::string algorithm_name =
102           m_options->m_compress_output_algorithm.value();
103       boost::to_lower(algorithm_name);
104 
105       Abstract_output_writer_wrapper *compression_writer_as_wrapper = nullptr;
106       I_output_writer *compression_writer_as_writer = nullptr;
107       if (algorithm_name == "lz4") {
108         Compression_lz4_writer *compression_writer = new Compression_lz4_writer(
109             this->get_message_handler(), this->get_object_id_generator());
110         if (compression_writer->init()) {
111           delete compression_writer;
112           return nullptr;
113         }
114         compression_writer_as_wrapper = compression_writer;
115         compression_writer_as_writer = compression_writer;
116       } else if (algorithm_name == "zlib") {
117         Compression_zlib_writer *compression_writer =
118             new Compression_zlib_writer(this->get_message_handler(),
119                                         this->get_object_id_generator(),
120                                         Z_DEFAULT_COMPRESSION);
121         if (compression_writer->init()) {
122           delete compression_writer;
123           return nullptr;
124         }
125         compression_writer_as_wrapper = compression_writer;
126         compression_writer_as_writer = compression_writer;
127       } else {
128         this->pass_message(Mysql::Tools::Base::Message_data(
129             0, "Unknown compression method: " + algorithm_name,
130             Mysql::Tools::Base::Message_type_error));
131         return nullptr;
132       }
133       compression_writer_as_wrapper->register_output_writer(writer);
134       writer = compression_writer_as_writer;
135       m_all_created_elements.push_back(writer);
136     }
137     Sql_formatter *formatter = new Sql_formatter(
138         this->get_connection_provider(), this->get_message_handler(),
139         this->get_object_id_generator(), m_options,
140         m_options->m_formatter_options);
141     this->register_progress_watchers_in_child(formatter);
142     formatter->register_output_writer(writer);
143     m_all_created_elements.push_back(formatter);
144 
145     m_main_object_reader = new Mysql_object_reader(
146         this->get_connection_provider(), this->get_message_handler(),
147         this->get_object_id_generator(), m_options->m_object_reader_options);
148     this->register_progress_watchers_in_child(m_main_object_reader);
149     m_main_object_reader->register_data_formatter(formatter);
150     m_all_created_elements.push_back(m_main_object_reader);
151   }
152   /*
153     run as a single process only if default parallelism is set to 0 and
154     parallel schemas is not set
155   */
156   if (m_options->m_default_parallelism == 0 &&
157       m_options->get_parallel_schemas_thread_count() == 0) {
158     return m_main_object_reader;
159   }
160   Abstract_data_object *data_object =
161       dynamic_cast<Abstract_data_object *>(dump_task->get_related_db_object());
162 
163   int object_queue_id = (data_object != nullptr)
164                             ? (m_options->get_object_queue_id_for_schema(
165                                   data_object->get_schema()))
166                             : 0;
167   std::map<int, Object_queue *>::iterator it =
168       m_object_queues.find(object_queue_id);
169   if (it != m_object_queues.end()) {
170     return it->second;
171   }
172   Object_queue *queue = new Object_queue(
173       this->get_message_handler(), this->get_object_id_generator(),
174       m_options->get_object_queue_threads_count(object_queue_id),
175       new std::function<void(bool)>(std::bind(
176           &Mysqldump_tool_chain_maker::mysql_thread_callback, this, _1)),
177       m_program);
178   this->register_progress_watchers_in_child(queue);
179   queue->register_object_reader(m_main_object_reader);
180   m_all_created_elements.push_back(queue);
181   m_object_queues.insert(std::make_pair(object_queue_id, queue));
182   return queue;
183 }
184 
stop_queues()185 void Mysqldump_tool_chain_maker::stop_queues() {
186   std::map<int, Object_queue *>::const_iterator iter;
187   for (iter = m_object_queues.begin(); iter != m_object_queues.end(); iter++) {
188     iter->second->stop_queue();
189   }
190 }
191 
mysql_thread_callback(bool is_starting)192 void Mysqldump_tool_chain_maker::mysql_thread_callback(bool is_starting) {
193   if (is_starting)
194     mysql_thread_init();
195   else
196     mysql_thread_end();
197 }
198 
~Mysqldump_tool_chain_maker()199 Mysqldump_tool_chain_maker::~Mysqldump_tool_chain_maker() {
200   for (std::vector<I_chain_element *>::reverse_iterator it =
201            m_all_created_elements.rbegin();
202        it != m_all_created_elements.rend(); ++it) {
203     delete *it;
204   }
205 }
206 
Mysqldump_tool_chain_maker(I_connection_provider * connection_provider,std::function<bool (const Mysql::Tools::Base::Message_data &)> * message_handler,Simple_id_generator * object_id_generator,Mysqldump_tool_chain_maker_options * options,Mysql::Tools::Base::Abstract_program * program)207 Mysqldump_tool_chain_maker::Mysqldump_tool_chain_maker(
208     I_connection_provider *connection_provider,
209     std::function<bool(const Mysql::Tools::Base::Message_data &)>
210         *message_handler,
211     Simple_id_generator *object_id_generator,
212     Mysqldump_tool_chain_maker_options *options,
213     Mysql::Tools::Base::Abstract_program *program)
214     : Abstract_chain_element(message_handler, object_id_generator),
215       Abstract_mysql_chain_element_extension(
216           connection_provider, message_handler,
217           options->m_mysql_chain_element_options),
218       m_options(options),
219       m_main_object_reader(nullptr),
220       m_program(program) {}
221