1 /*
2 Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "object_queue.h"
26 #include "this_thread.h"
27 #include <boost/bind.hpp>
28 #include <boost/date_time.hpp>
29
30 using namespace Mysql::Tools::Dump;
31
add_ready_items_to_queue(std::map<const I_dump_task *,std::vector<Item_processing_data * > * >::iterator it)32 void Object_queue::add_ready_items_to_queue(
33 std::map<const I_dump_task*,
34 std::vector<Item_processing_data*>* > ::iterator it)
35 {
36 for (std::vector<Item_processing_data*>::iterator item_iterator=
37 it->second->begin();
38 item_iterator != it->second->end();
39 ++item_iterator)
40 {
41 m_items_ready_for_processing.push(*item_iterator);
42 }
43 delete it->second;
44 m_tasks_map.erase(it);
45 }
46
task_availability_callback(const Abstract_dump_task * available_task)47 void Object_queue::task_availability_callback(
48 const Abstract_dump_task* available_task)
49 {
50 my_boost::mutex::scoped_lock lock(m_queue_mutex);
51
52 std::map<const I_dump_task*, std::vector<Item_processing_data*>* >
53 ::iterator it= m_tasks_map.find(available_task);
54 if (it != m_tasks_map.end())
55 {
56 this->add_ready_items_to_queue(it);
57 }
58 }
59
queue_thread()60 void Object_queue::queue_thread()
61 {
62 (*this->m_thread_callback)(true);
63 while (true)
64 {
65 /* check if any errors are generated by main or child threads */
66 if(m_program->get_error_code())
67 stop_queue();
68
69 if (m_is_queue_running.load() == false)
70 break;
71
72 Item_processing_data* item_to_process= NULL;
73 {
74 my_boost::mutex::scoped_lock lock(m_queue_mutex);
75 if (m_items_ready_for_processing.size() > 0)
76 {
77 item_to_process= m_items_ready_for_processing.front();
78 m_items_ready_for_processing.pop();
79 }
80 }
81
82 if (item_to_process != NULL)
83 {
84 this->format_object(item_to_process);
85 this->object_processing_ends(item_to_process);
86 }
87
88 /**
89 In case there are no items to be processed from the queue then
90 in such a scenario the current thread will be infinitely hung in
91 this loop without allowing context switch to happen, so that other
92 executing threads can further progress. Thus we need this thread
93 to sleep so that other threads can proceed to complete its execution
94 or allow main thread to insert new items into the queue.
95 */
96 {
97 my_boost::this_thread::sleep(boost::posix_time::milliseconds(1));
98 }
99
100 }
101 (*this->m_thread_callback)(false);
102 }
103
read_object(Item_processing_data * item_to_process)104 void Object_queue::read_object(Item_processing_data* item_to_process)
105 {
106 this->object_processing_starts(item_to_process);
107
108 Abstract_dump_task* dump_task= dynamic_cast<Abstract_dump_task*>(
109 item_to_process->get_process_task_object());
110
111 if (dump_task == NULL)
112 {
113 (*this->get_message_handler())(
114 Mysql::Tools::Base::Message_data(
115 0, "Not supported operation called.",
116 Mysql::Tools::Base::Message_type_error));
117 }
118
119 my_boost::mutex::scoped_lock lock(m_queue_mutex);
120 /*
121 Check if all dependencies are already met, if so, we can directly add
122 this processing item to queue. If no, we will create completion callback
123 to handle addition to queue when ready, but this may be called in the
124 meantime, so we need to check this statement again at the end.
125 */
126 if (dump_task->can_be_executed())
127 {
128 m_items_ready_for_processing.push(item_to_process);
129 return;
130 }
131
132 std::map<const I_dump_task*, std::vector<Item_processing_data*>* >
133 ::iterator it= m_tasks_map.find(dump_task);
134
135 std::vector<Item_processing_data*>* list;
136
137 if (it == m_tasks_map.end())
138 {
139 list= new std::vector<Item_processing_data*>();
140 m_tasks_map.insert(make_pair(dump_task,
141 list));
142 dump_task->register_execution_availability_callback(
143 &m_task_availability_callback);
144 }
145 else
146 list= it->second;
147
148 list->push_back(item_to_process);
149
150 /*
151 Check if executing threads haven't completed this task dependencies in
152 meantime, if so, we must revert what we have done with waiting task list.
153 */
154 if (dump_task->can_be_executed())
155 this->add_ready_items_to_queue(m_tasks_map.find(dump_task));
156 }
157
stop_queue()158 void Object_queue::stop_queue()
159 {
160 /*
161 In case of error we stop all the running queues. Make sure the
162 cleanup of the items is done properly.
163 */
164 if (m_is_queue_running) {
165 Item_processing_data *item_to_process= NULL;
166 do
167 {
168 {
169 my_boost::mutex::scoped_lock lock(m_queue_mutex);
170 if (m_items_ready_for_processing.size() == 0)
171 break;
172 item_to_process = m_items_ready_for_processing.front();
173 m_items_ready_for_processing.pop();
174 }
175 this->object_processing_ends(item_to_process);
176 } while(item_to_process != NULL);
177 m_is_queue_running = false;
178 }
179 }
180
~Object_queue()181 Object_queue::~Object_queue()
182 {
183 m_is_queue_running= false;
184 m_thread_group.join_all();
185 delete m_thread_callback;
186 std::map<const I_dump_task*, std::vector<Item_processing_data*>* >
187 ::iterator it= m_tasks_map.begin();
188 for(;it != m_tasks_map.end(); ++it)
189 delete it->second;
190 }
191
Object_queue(Mysql::I_callable<bool,const Mysql::Tools::Base::Message_data &> * message_handler,Simple_id_generator * object_id_generator,uint threads_count,Mysql::I_callable<void,bool> * thread_callback,Mysql::Tools::Base::Abstract_program * program)192 Object_queue::Object_queue(
193 Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
194 message_handler, Simple_id_generator* object_id_generator,
195 uint threads_count, Mysql::I_callable<void, bool>* thread_callback,
196 Mysql::Tools::Base::Abstract_program* program)
197 : Abstract_object_reader_wrapper(message_handler, object_id_generator),
198 m_task_availability_callback(
199 this, &Object_queue::task_availability_callback),
200 m_is_queue_running(true),
201 m_thread_callback(thread_callback),
202 m_program(program)
203 {
204 for (int thread= threads_count; thread-- > 0;)
205 {
206 m_thread_group.create_thread(
207 boost::bind(&Object_queue::queue_thread, this));
208 }
209 }
210