1 /*
2   Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "mysqldump_tool_chain_maker.h"
26 #include "i_output_writer.h"
27 #include "file_writer.h"
28 #include "standard_writer.h"
29 #include "compression_lz4_writer.h"
30 #include "compression_zlib_writer.h"
31 #include "sql_formatter.h"
32 #include "mysqldump_tool_chain_maker_options.h"
33 #include <boost/algorithm/string.hpp>
34 #include "m_ctype.h"
35 
36 using namespace Mysql::Tools::Dump;
37 
delete_chain(uint64 chain_id,I_object_reader * chain)38 void Mysqldump_tool_chain_maker::delete_chain(
39   uint64 chain_id, I_object_reader* chain)
40 {}
41 
create_chain(Chain_data * chain_data,I_dump_task * dump_task)42 I_object_reader* Mysqldump_tool_chain_maker::create_chain(
43   Chain_data* chain_data, I_dump_task* dump_task)
44 {
45   Table_rows_dump_task* rows_task= dynamic_cast<Table_rows_dump_task*>(
46     dump_task);
47   if (rows_task != NULL
48     && (m_options->m_skip_rows_data
49     || rows_task->get_related_table()->get_type() == "FEDERATED"
50     || rows_task->get_related_table()->get_type() == "MRG_ISAM"
51     || !this->compare_no_case_latin_with_db_string(
52     "MRG_MyISAM", rows_task->get_related_table()->get_type())))
53   {
54     return NULL;
55   }
56 
57   if (!m_options->is_object_included_in_dump(
58     dynamic_cast<Abstract_data_object*>(
59     dump_task->get_related_db_object())))
60   {
61     return NULL;
62   }
63 
64   if (m_main_object_reader == NULL)
65   {
66     I_output_writer* writer;
67     if (m_options->m_result_file.has_value())
68       writer= new File_writer(
69       this->get_message_handler(), this->get_object_id_generator(),
70       m_options->m_result_file.value());
71     else
72       writer= new Standard_writer(
73       this->get_message_handler(), this->get_object_id_generator());
74     m_all_created_elements.push_back(writer);
75     if (m_options->m_compress_output_algorithm.has_value())
76     {
77       std::string algorithm_name=
78         m_options->m_compress_output_algorithm.value();
79       boost::to_lower(algorithm_name);
80 
81       Abstract_output_writer_wrapper* compression_writer_as_wrapper= NULL;
82       I_output_writer* compression_writer_as_writer= NULL;
83       if (algorithm_name == "lz4")
84       {
85         Compression_lz4_writer* compression_writer=
86           new Compression_lz4_writer(
87             this->get_message_handler(), this->get_object_id_generator());
88         compression_writer_as_wrapper= compression_writer;
89           compression_writer_as_writer= compression_writer;
90       }
91       else if (algorithm_name =="zlib")
92       {
93         Compression_zlib_writer* compression_writer=
94           new Compression_zlib_writer(
95             this->get_message_handler(), this->get_object_id_generator(),
96             Z_DEFAULT_COMPRESSION);
97         compression_writer_as_wrapper= compression_writer;
98         compression_writer_as_writer= compression_writer;
99       }
100       else
101         this->pass_message(Mysql::Tools::Base::Message_data(
102           0, "Unknown compression method: " + algorithm_name,
103           Mysql::Tools::Base::Message_type_error));
104 
105       compression_writer_as_wrapper->register_output_writer(writer);
106       writer= compression_writer_as_writer;
107       m_all_created_elements.push_back(writer);
108     }
109     Sql_formatter* formatter= new Sql_formatter(
110       this->get_connection_provider(),
111       this->get_message_handler(), this->get_object_id_generator(),
112       m_options,
113       m_options->m_formatter_options);
114     this->register_progress_watchers_in_child(formatter);
115     formatter->register_output_writer(writer);
116     m_all_created_elements.push_back(formatter);
117 
118     m_main_object_reader= new Mysql_object_reader(
119       this->get_connection_provider(),
120       this->get_message_handler(), this->get_object_id_generator(),
121       m_options->m_object_reader_options);
122     this->register_progress_watchers_in_child(m_main_object_reader);
123     m_main_object_reader->register_data_formatter(formatter);
124     m_all_created_elements.push_back(m_main_object_reader);
125   }
126   /*
127     run as a single process only if default parallelism is set to 0 and
128     parallel schemas is not set
129   */
130   if (m_options->m_default_parallelism == 0 &&
131       m_options->get_parallel_schemas_thread_count() == 0)
132   {
133     return m_main_object_reader;
134   }
135   Abstract_data_object* data_object= dynamic_cast<Abstract_data_object*>(
136     dump_task->get_related_db_object());
137 
138   int object_queue_id= (data_object != NULL)
139     ? (m_options->get_object_queue_id_for_schema(data_object->get_schema()))
140     : 0;
141   std::map<int, Object_queue*>::iterator it=
142     m_object_queues.find(object_queue_id);
143   if (it != m_object_queues.end())
144   {
145     return it->second;
146   }
147   Object_queue* queue= new Object_queue(
148     this->get_message_handler(), this->get_object_id_generator(),
149     m_options->get_object_queue_threads_count(object_queue_id),
150     new Mysql::Instance_callback<void, bool, Mysqldump_tool_chain_maker>(
151       this, &Mysqldump_tool_chain_maker::mysql_thread_callback), m_program);
152   this->register_progress_watchers_in_child(queue);
153   queue->register_object_reader(m_main_object_reader);
154   m_all_created_elements.push_back(queue);
155   m_object_queues.insert(std::make_pair(object_queue_id, queue));
156   return queue;
157 }
158 
stop_queues()159 void Mysqldump_tool_chain_maker::stop_queues()
160 {
161   std::map<int, Object_queue*>::const_iterator iter;
162   for (iter = m_object_queues.begin(); iter != m_object_queues.end(); iter++)
163   {
164     iter->second->stop_queue();
165   }
166 }
167 
mysql_thread_callback(bool is_starting)168 void Mysqldump_tool_chain_maker::mysql_thread_callback(bool is_starting)
169 {
170   if (is_starting)
171     mysql_thread_init();
172   else
173     mysql_thread_end();
174 }
175 
~Mysqldump_tool_chain_maker()176 Mysqldump_tool_chain_maker::~Mysqldump_tool_chain_maker()
177 {
178   for (std::vector<I_chain_element*>::reverse_iterator it=
179     m_all_created_elements.rbegin(); it != m_all_created_elements.rend();
180     ++it)
181   {
182     delete *it;
183   }
184 }
185 
Mysqldump_tool_chain_maker(I_connection_provider * connection_provider,Mysql::I_callable<bool,const Mysql::Tools::Base::Message_data &> * message_handler,Simple_id_generator * object_id_generator,Mysqldump_tool_chain_maker_options * options,Mysql::Tools::Base::Abstract_program * program)186 Mysqldump_tool_chain_maker::Mysqldump_tool_chain_maker(
187   I_connection_provider* connection_provider,
188   Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
189     message_handler, Simple_id_generator* object_id_generator,
190   Mysqldump_tool_chain_maker_options* options,
191   Mysql::Tools::Base::Abstract_program* program)
192   : Abstract_chain_element(message_handler, object_id_generator),
193   Abstract_mysql_chain_element_extension(
194     connection_provider, message_handler,
195     options->m_mysql_chain_element_options),
196   m_options(options),
197   m_main_object_reader(NULL),
198   m_program(program)
199 {}
200