1 /*
2 Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "mysql_object_reader.h"
26 #include <boost/algorithm/string.hpp>
27
28 using namespace Mysql::Tools::Dump;
29
acquire_fields_information(MYSQL_RES * mysql_result)30 void Mysql_object_reader::Rows_fetching_context::acquire_fields_information(
31 MYSQL_RES* mysql_result)
32 {
33 MYSQL_FIELD* fields= mysql_fetch_fields(mysql_result);
34 uint field_count= mysql_num_fields(mysql_result);
35 m_fields.reserve(field_count);
36 for (uint i= 0; i < field_count; ++i)
37 m_fields.push_back(Mysql_field(&fields[i]));
38 }
39
process_buffer()40 void Mysql_object_reader::Rows_fetching_context::process_buffer()
41 {
42 if (m_row_group.m_rows.size() == 0)
43 return;
44 m_parent->format_rows(m_item_processing, &m_row_group);
45
46 m_row_group.m_rows.clear();
47 }
48
result_callback(const Mysql::Tools::Base::Mysql_query_runner::Row & row_data)49 int64 Mysql_object_reader::Rows_fetching_context::result_callback(
50 const Mysql::Tools::Base::Mysql_query_runner::Row& row_data)
51 {
52 if (unlikely(m_fields.size() == 0))
53 {
54 this->acquire_fields_information(
55 row_data.get_mysql_result_info());
56 }
57 m_row_group.m_rows.push_back(new Row(row_data));
58
59 if (m_row_group.m_rows.size() >= m_parent->m_options->m_row_group_size)
60 {
61 this->process_buffer();
62 }
63
64 return 0;
65 }
66
Rows_fetching_context(Mysql_object_reader * parent,Item_processing_data * item_processing,bool has_generated_column)67 Mysql_object_reader::Rows_fetching_context::Rows_fetching_context(
68 Mysql_object_reader* parent, Item_processing_data* item_processing,
69 bool has_generated_column)
70 : m_parent(parent),
71 m_item_processing(item_processing),
72 m_row_group((Table*)item_processing
73 ->get_process_task_object()->get_related_db_object(), m_fields,
74 has_generated_column)
75 {
76 m_row_group.m_rows.reserve(
77 (size_t)m_parent->m_options->m_row_group_size);
78 }
79
is_all_rows_processed()80 bool Mysql_object_reader::Rows_fetching_context::is_all_rows_processed()
81 {
82 return m_row_group.m_rows.size() == 0;
83 }
84
read_table_rows_task(Table_rows_dump_task * table_rows_dump_task,Item_processing_data * item_to_process)85 void Mysql_object_reader::read_table_rows_task(
86 Table_rows_dump_task* table_rows_dump_task,
87 Item_processing_data* item_to_process)
88 {
89 bool has_generated_columns= 0;
90 Mysql::Tools::Base::Mysql_query_runner* runner= this->get_runner();
91 Table* table= table_rows_dump_task->get_related_table();
92
93 std::vector<const Mysql::Tools::Base::Mysql_query_runner::Row*> columns;
94 std::vector<std::string> field_names;
95
96 runner->run_query_store(
97 "SELECT `COLUMN_NAME`, `EXTRA` FROM " +
98 this->get_quoted_object_full_name("INFORMATION_SCHEMA", "COLUMNS") +
99 "WHERE TABLE_SCHEMA ='" + runner->escape_string(table->get_schema()) +
100 "' AND TABLE_NAME ='" + runner->escape_string(table->get_name()) + "'",
101 &columns);
102
103 std::string column_names;
104 for (std::vector<const Mysql::Tools::Base::Mysql_query_runner::Row*>::iterator
105 it= columns.begin(); it != columns.end(); ++it)
106 {
107 const Mysql::Tools::Base::Mysql_query_runner::Row& column_data= **it;
108 if (column_data[1] == "STORED GENERATED" ||
109 column_data[1] == "VIRTUAL GENERATED")
110 has_generated_columns= 1;
111 else
112 column_names+= this->quote_name(column_data[0]) + ",";
113 }
114 /* remove last comma from column_names */
115 column_names= boost::algorithm::replace_last_copy(column_names, ",", "");
116
117 Mysql::Tools::Base::Mysql_query_runner::cleanup_result(&columns);
118
119 Rows_fetching_context* row_fetching_context=
120 new Rows_fetching_context(this, item_to_process, has_generated_columns);
121
122 runner->run_query(
123 "SELECT " + column_names + " FROM " +
124 this->get_quoted_object_full_name(table),
125 new Mysql::Instance_callback<
126 int64, const Mysql::Tools::Base::Mysql_query_runner::Row&,
127 Rows_fetching_context>(
128 row_fetching_context, &Rows_fetching_context::result_callback));
129
130 row_fetching_context->process_buffer();
131 if (row_fetching_context->is_all_rows_processed())
132 delete row_fetching_context;
133 delete runner;
134 }
135
format_rows(Item_processing_data * item_to_process,Row_group_dump_task * row_group)136 void Mysql_object_reader::format_rows(Item_processing_data* item_to_process,
137 Row_group_dump_task* row_group)
138 {
139 this->format_object(this->new_chain_created(
140 item_to_process, row_group));
141 }
142
read_object(Item_processing_data * item_to_process)143 void Mysql_object_reader::read_object(Item_processing_data* item_to_process)
144 {
145 this->object_processing_starts(item_to_process);
146
147 if (!(this->try_process_task<Table_rows_dump_task>
148 (item_to_process, &Mysql_object_reader::read_table_rows_task)))
149 {
150 this->format_object(item_to_process);
151 }
152
153 this->object_processing_ends(item_to_process);
154 }
155
Mysql_object_reader(I_connection_provider * connection_provider,Mysql::I_callable<bool,const Mysql::Tools::Base::Message_data &> * message_handler,Simple_id_generator * object_id_generator,const Mysql_object_reader_options * options)156 Mysql_object_reader::Mysql_object_reader(
157 I_connection_provider* connection_provider,
158 Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
159 message_handler, Simple_id_generator* object_id_generator,
160 const Mysql_object_reader_options* options)
161 : Abstract_data_formatter_wrapper(message_handler, object_id_generator),
162 Abstract_mysql_chain_element_extension(connection_provider,
163 message_handler, options->m_mysql_chain_element_options),
164 m_options(options)
165 {}
166