1 /*
2   Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "mysql_object_reader.h"
26 #include <boost/algorithm/string.hpp>
27 
28 using namespace Mysql::Tools::Dump;
29 
acquire_fields_information(MYSQL_RES * mysql_result)30 void Mysql_object_reader::Rows_fetching_context::acquire_fields_information(
31   MYSQL_RES* mysql_result)
32 {
33   MYSQL_FIELD* fields= mysql_fetch_fields(mysql_result);
34   uint field_count= mysql_num_fields(mysql_result);
35   m_fields.reserve(field_count);
36   for (uint i= 0; i < field_count; ++i)
37     m_fields.push_back(Mysql_field(&fields[i]));
38 }
39 
process_buffer()40 void Mysql_object_reader::Rows_fetching_context::process_buffer()
41 {
42   if (m_row_group.m_rows.size() == 0)
43     return;
44   m_parent->format_rows(m_item_processing, &m_row_group);
45 
46   m_row_group.m_rows.clear();
47 }
48 
result_callback(const Mysql::Tools::Base::Mysql_query_runner::Row & row_data)49 int64 Mysql_object_reader::Rows_fetching_context::result_callback(
50   const Mysql::Tools::Base::Mysql_query_runner::Row& row_data)
51 {
52   if (unlikely(m_fields.size() == 0))
53   {
54     this->acquire_fields_information(
55       row_data.get_mysql_result_info());
56   }
57   m_row_group.m_rows.push_back(new Row(row_data));
58 
59   if (m_row_group.m_rows.size() >= m_parent->m_options->m_row_group_size)
60   {
61     this->process_buffer();
62   }
63 
64   return 0;
65 }
66 
Rows_fetching_context(Mysql_object_reader * parent,Item_processing_data * item_processing,bool has_generated_column)67 Mysql_object_reader::Rows_fetching_context::Rows_fetching_context(
68   Mysql_object_reader* parent, Item_processing_data* item_processing,
69   bool has_generated_column)
70   : m_parent(parent),
71   m_item_processing(item_processing),
72   m_row_group((Table*)item_processing
73     ->get_process_task_object()->get_related_db_object(), m_fields,
74     has_generated_column)
75 {
76   m_row_group.m_rows.reserve(
77     (size_t)m_parent->m_options->m_row_group_size);
78 }
79 
is_all_rows_processed()80 bool Mysql_object_reader::Rows_fetching_context::is_all_rows_processed()
81 {
82   return m_row_group.m_rows.size() == 0;
83 }
84 
read_table_rows_task(Table_rows_dump_task * table_rows_dump_task,Item_processing_data * item_to_process)85 void Mysql_object_reader::read_table_rows_task(
86   Table_rows_dump_task* table_rows_dump_task,
87   Item_processing_data* item_to_process)
88 {
89   bool has_generated_columns= 0;
90   Mysql::Tools::Base::Mysql_query_runner* runner= this->get_runner();
91   Table* table= table_rows_dump_task->get_related_table();
92 
93   std::vector<const Mysql::Tools::Base::Mysql_query_runner::Row*> columns;
94   std::vector<std::string> field_names;
95 
96   runner->run_query_store(
97     "SELECT `COLUMN_NAME`, `EXTRA` FROM " +
98     this->get_quoted_object_full_name("INFORMATION_SCHEMA", "COLUMNS") +
99     "WHERE TABLE_SCHEMA ='" + runner->escape_string(table->get_schema()) +
100     "' AND TABLE_NAME ='" + runner->escape_string(table->get_name()) + "'",
101     &columns);
102 
103   std::string column_names;
104   for (std::vector<const Mysql::Tools::Base::Mysql_query_runner::Row*>::iterator
105     it= columns.begin(); it != columns.end(); ++it)
106   {
107     const Mysql::Tools::Base::Mysql_query_runner::Row& column_data= **it;
108     if (column_data[1] == "STORED GENERATED" ||
109         column_data[1] == "VIRTUAL GENERATED")
110       has_generated_columns= 1;
111     else
112       column_names+= this->quote_name(column_data[0]) + ",";
113   }
114   /* remove last comma from column_names */
115   column_names= boost::algorithm::replace_last_copy(column_names, ",", "");
116 
117   Mysql::Tools::Base::Mysql_query_runner::cleanup_result(&columns);
118 
119   Rows_fetching_context* row_fetching_context=
120     new Rows_fetching_context(this, item_to_process, has_generated_columns);
121 
122   runner->run_query(
123     "SELECT " + column_names + "  FROM " +
124     this->get_quoted_object_full_name(table),
125     new Mysql::Instance_callback<
126       int64, const Mysql::Tools::Base::Mysql_query_runner::Row&,
127         Rows_fetching_context>(
128           row_fetching_context, &Rows_fetching_context::result_callback));
129 
130   row_fetching_context->process_buffer();
131   if (row_fetching_context->is_all_rows_processed())
132     delete row_fetching_context;
133   delete runner;
134 }
135 
format_rows(Item_processing_data * item_to_process,Row_group_dump_task * row_group)136 void Mysql_object_reader::format_rows(Item_processing_data* item_to_process,
137   Row_group_dump_task* row_group)
138 {
139   this->format_object(this->new_chain_created(
140     item_to_process, row_group));
141 }
142 
read_object(Item_processing_data * item_to_process)143 void Mysql_object_reader::read_object(Item_processing_data* item_to_process)
144 {
145   this->object_processing_starts(item_to_process);
146 
147   if (!(this->try_process_task<Table_rows_dump_task>
148     (item_to_process, &Mysql_object_reader::read_table_rows_task)))
149   {
150     this->format_object(item_to_process);
151   }
152 
153   this->object_processing_ends(item_to_process);
154 }
155 
Mysql_object_reader(I_connection_provider * connection_provider,Mysql::I_callable<bool,const Mysql::Tools::Base::Message_data &> * message_handler,Simple_id_generator * object_id_generator,const Mysql_object_reader_options * options)156 Mysql_object_reader::Mysql_object_reader(
157   I_connection_provider* connection_provider,
158   Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
159     message_handler, Simple_id_generator* object_id_generator,
160   const Mysql_object_reader_options* options)
161   : Abstract_data_formatter_wrapper(message_handler, object_id_generator),
162   Abstract_mysql_chain_element_extension(connection_provider,
163   message_handler, options->m_mysql_chain_element_options),
164   m_options(options)
165 {}
166