1 /*
2 Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "mysqldump_tool_chain_maker.h"
26 #include "i_output_writer.h"
27 #include "file_writer.h"
28 #include "standard_writer.h"
29 #include "compression_lz4_writer.h"
30 #include "compression_zlib_writer.h"
31 #include "sql_formatter.h"
32 #include "mysqldump_tool_chain_maker_options.h"
33 #include <boost/algorithm/string.hpp>
34 #include "m_ctype.h"
35
36 using namespace Mysql::Tools::Dump;
37
delete_chain(uint64 chain_id,I_object_reader * chain)38 void Mysqldump_tool_chain_maker::delete_chain(
39 uint64 chain_id, I_object_reader* chain)
40 {}
41
create_chain(Chain_data * chain_data,I_dump_task * dump_task)42 I_object_reader* Mysqldump_tool_chain_maker::create_chain(
43 Chain_data* chain_data, I_dump_task* dump_task)
44 {
45 Table_rows_dump_task* rows_task= dynamic_cast<Table_rows_dump_task*>(
46 dump_task);
47 if (rows_task != NULL
48 && (m_options->m_skip_rows_data
49 || rows_task->get_related_table()->get_type() == "FEDERATED"
50 || rows_task->get_related_table()->get_type() == "MRG_ISAM"
51 || !this->compare_no_case_latin_with_db_string(
52 "MRG_MyISAM", rows_task->get_related_table()->get_type())))
53 {
54 return NULL;
55 }
56
57 if (!m_options->is_object_included_in_dump(
58 dynamic_cast<Abstract_data_object*>(
59 dump_task->get_related_db_object())))
60 {
61 return NULL;
62 }
63
64 if (m_main_object_reader == NULL)
65 {
66 I_output_writer* writer;
67 if (m_options->m_result_file.has_value())
68 writer= new File_writer(
69 this->get_message_handler(), this->get_object_id_generator(),
70 m_options->m_result_file.value());
71 else
72 writer= new Standard_writer(
73 this->get_message_handler(), this->get_object_id_generator());
74 m_all_created_elements.push_back(writer);
75 if (m_options->m_compress_output_algorithm.has_value())
76 {
77 std::string algorithm_name=
78 m_options->m_compress_output_algorithm.value();
79 boost::to_lower(algorithm_name);
80
81 Abstract_output_writer_wrapper* compression_writer_as_wrapper= NULL;
82 I_output_writer* compression_writer_as_writer= NULL;
83 if (algorithm_name == "lz4")
84 {
85 Compression_lz4_writer* compression_writer=
86 new Compression_lz4_writer(
87 this->get_message_handler(), this->get_object_id_generator());
88 compression_writer_as_wrapper= compression_writer;
89 compression_writer_as_writer= compression_writer;
90 }
91 else if (algorithm_name =="zlib")
92 {
93 Compression_zlib_writer* compression_writer=
94 new Compression_zlib_writer(
95 this->get_message_handler(), this->get_object_id_generator(),
96 Z_DEFAULT_COMPRESSION);
97 compression_writer_as_wrapper= compression_writer;
98 compression_writer_as_writer= compression_writer;
99 }
100 else
101 this->pass_message(Mysql::Tools::Base::Message_data(
102 0, "Unknown compression method: " + algorithm_name,
103 Mysql::Tools::Base::Message_type_error));
104
105 compression_writer_as_wrapper->register_output_writer(writer);
106 writer= compression_writer_as_writer;
107 m_all_created_elements.push_back(writer);
108 }
109 Sql_formatter* formatter= new Sql_formatter(
110 this->get_connection_provider(),
111 this->get_message_handler(), this->get_object_id_generator(),
112 m_options,
113 m_options->m_formatter_options);
114 this->register_progress_watchers_in_child(formatter);
115 formatter->register_output_writer(writer);
116 m_all_created_elements.push_back(formatter);
117
118 m_main_object_reader= new Mysql_object_reader(
119 this->get_connection_provider(),
120 this->get_message_handler(), this->get_object_id_generator(),
121 m_options->m_object_reader_options);
122 this->register_progress_watchers_in_child(m_main_object_reader);
123 m_main_object_reader->register_data_formatter(formatter);
124 m_all_created_elements.push_back(m_main_object_reader);
125 }
126 /*
127 run as a single process only if default parallelism is set to 0 and
128 parallel schemas is not set
129 */
130 if (m_options->m_default_parallelism == 0 &&
131 m_options->get_parallel_schemas_thread_count() == 0)
132 {
133 return m_main_object_reader;
134 }
135 Abstract_data_object* data_object= dynamic_cast<Abstract_data_object*>(
136 dump_task->get_related_db_object());
137
138 int object_queue_id= (data_object != NULL)
139 ? (m_options->get_object_queue_id_for_schema(data_object->get_schema()))
140 : 0;
141 std::map<int, Object_queue*>::iterator it=
142 m_object_queues.find(object_queue_id);
143 if (it != m_object_queues.end())
144 {
145 return it->second;
146 }
147 Object_queue* queue= new Object_queue(
148 this->get_message_handler(), this->get_object_id_generator(),
149 m_options->get_object_queue_threads_count(object_queue_id),
150 new Mysql::Instance_callback<void, bool, Mysqldump_tool_chain_maker>(
151 this, &Mysqldump_tool_chain_maker::mysql_thread_callback), m_program);
152 this->register_progress_watchers_in_child(queue);
153 queue->register_object_reader(m_main_object_reader);
154 m_all_created_elements.push_back(queue);
155 m_object_queues.insert(std::make_pair(object_queue_id, queue));
156 return queue;
157 }
158
stop_queues()159 void Mysqldump_tool_chain_maker::stop_queues()
160 {
161 std::map<int, Object_queue*>::const_iterator iter;
162 for (iter = m_object_queues.begin(); iter != m_object_queues.end(); iter++)
163 {
164 iter->second->stop_queue();
165 }
166 }
167
mysql_thread_callback(bool is_starting)168 void Mysqldump_tool_chain_maker::mysql_thread_callback(bool is_starting)
169 {
170 if (is_starting)
171 mysql_thread_init();
172 else
173 mysql_thread_end();
174 }
175
~Mysqldump_tool_chain_maker()176 Mysqldump_tool_chain_maker::~Mysqldump_tool_chain_maker()
177 {
178 for (std::vector<I_chain_element*>::reverse_iterator it=
179 m_all_created_elements.rbegin(); it != m_all_created_elements.rend();
180 ++it)
181 {
182 delete *it;
183 }
184 }
185
Mysqldump_tool_chain_maker(I_connection_provider * connection_provider,Mysql::I_callable<bool,const Mysql::Tools::Base::Message_data &> * message_handler,Simple_id_generator * object_id_generator,Mysqldump_tool_chain_maker_options * options,Mysql::Tools::Base::Abstract_program * program)186 Mysqldump_tool_chain_maker::Mysqldump_tool_chain_maker(
187 I_connection_provider* connection_provider,
188 Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
189 message_handler, Simple_id_generator* object_id_generator,
190 Mysqldump_tool_chain_maker_options* options,
191 Mysql::Tools::Base::Abstract_program* program)
192 : Abstract_chain_element(message_handler, object_id_generator),
193 Abstract_mysql_chain_element_extension(
194 connection_provider, message_handler,
195 options->m_mysql_chain_element_options),
196 m_options(options),
197 m_main_object_reader(NULL),
198 m_program(program)
199 {}
200