1 /*
2   Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "object_queue.h"
26 #include "this_thread.h"
27 #include <boost/bind.hpp>
28 #include <boost/date_time.hpp>
29 
30 using namespace Mysql::Tools::Dump;
31 
add_ready_items_to_queue(std::map<const I_dump_task *,std::vector<Item_processing_data * > * >::iterator it)32 void Object_queue::add_ready_items_to_queue(
33   std::map<const I_dump_task*,
34   std::vector<Item_processing_data*>* > ::iterator it)
35 {
36   for (std::vector<Item_processing_data*>::iterator item_iterator=
37     it->second->begin();
38     item_iterator != it->second->end();
39   ++item_iterator)
40   {
41     m_items_ready_for_processing.push(*item_iterator);
42   }
43   delete it->second;
44   m_tasks_map.erase(it);
45 }
46 
task_availability_callback(const Abstract_dump_task * available_task)47 void Object_queue::task_availability_callback(
48   const Abstract_dump_task* available_task)
49 {
50   my_boost::mutex::scoped_lock lock(m_queue_mutex);
51 
52   std::map<const I_dump_task*, std::vector<Item_processing_data*>* >
53     ::iterator it= m_tasks_map.find(available_task);
54   if (it != m_tasks_map.end())
55   {
56     this->add_ready_items_to_queue(it);
57   }
58 }
59 
queue_thread()60 void Object_queue::queue_thread()
61 {
62   (*this->m_thread_callback)(true);
63   while (true)
64   {
65     /* check if any errors are generated by main or child threads */
66     if(m_program->get_error_code())
67       stop_queue();
68 
69     if (m_is_queue_running.load() == false)
70       break;
71 
72     Item_processing_data* item_to_process= NULL;
73     {
74       my_boost::mutex::scoped_lock lock(m_queue_mutex);
75       if (m_items_ready_for_processing.size() > 0)
76       {
77         item_to_process= m_items_ready_for_processing.front();
78         m_items_ready_for_processing.pop();
79       }
80     }
81 
82     if (item_to_process != NULL)
83     {
84       this->format_object(item_to_process);
85       this->object_processing_ends(item_to_process);
86     }
87 
88     /**
89       In case there are no items to be processed from the queue then
90       in such a scenario the current thread will be infinitely hung in
91       this loop without allowing context switch to happen, so that other
92       executing threads can further progress. Thus we need this thread
93       to sleep so that other threads can proceed to complete its execution
94       or allow main thread to insert new items into the queue.
95     */
96     {
97       my_boost::this_thread::sleep(boost::posix_time::milliseconds(1));
98     }
99 
100   }
101   (*this->m_thread_callback)(false);
102 }
103 
read_object(Item_processing_data * item_to_process)104 void Object_queue::read_object(Item_processing_data* item_to_process)
105 {
106   this->object_processing_starts(item_to_process);
107 
108   Abstract_dump_task* dump_task= dynamic_cast<Abstract_dump_task*>(
109     item_to_process->get_process_task_object());
110 
111   if (dump_task == NULL)
112   {
113     (*this->get_message_handler())(
114       Mysql::Tools::Base::Message_data(
115       0, "Not supported operation called.",
116       Mysql::Tools::Base::Message_type_error));
117   }
118 
119   my_boost::mutex::scoped_lock lock(m_queue_mutex);
120   /*
121     Check if all dependencies are already met, if so, we can directly add
122     this processing item to queue. If no, we will create completion callback
123     to handle addition to queue when ready, but this may be called in the
124     meantime, so we need to check this statement again at the end.
125     */
126   if (dump_task->can_be_executed())
127   {
128     m_items_ready_for_processing.push(item_to_process);
129     return;
130   }
131 
132   std::map<const I_dump_task*, std::vector<Item_processing_data*>* >
133     ::iterator it= m_tasks_map.find(dump_task);
134 
135   std::vector<Item_processing_data*>* list;
136 
137   if (it == m_tasks_map.end())
138   {
139     list= new std::vector<Item_processing_data*>();
140     m_tasks_map.insert(make_pair(dump_task,
141       list));
142     dump_task->register_execution_availability_callback(
143       &m_task_availability_callback);
144   }
145   else
146     list= it->second;
147 
148   list->push_back(item_to_process);
149 
150   /*
151     Check if executing threads haven't completed this task dependencies in
152     meantime, if so, we must revert what we have done with waiting task list.
153     */
154   if (dump_task->can_be_executed())
155     this->add_ready_items_to_queue(m_tasks_map.find(dump_task));
156 }
157 
stop_queue()158 void Object_queue::stop_queue()
159 {
160   /*
161     In case of error we stop all the running queues. Make sure the
162     cleanup of the items is done properly.
163   */
164   if (m_is_queue_running) {
165     Item_processing_data *item_to_process= NULL;
166     do
167     {
168       {
169         my_boost::mutex::scoped_lock lock(m_queue_mutex);
170         if (m_items_ready_for_processing.size() == 0)
171           break;
172         item_to_process = m_items_ready_for_processing.front();
173         m_items_ready_for_processing.pop();
174       }
175       this->object_processing_ends(item_to_process);
176     } while(item_to_process != NULL);
177     m_is_queue_running = false;
178   }
179 }
180 
~Object_queue()181 Object_queue::~Object_queue()
182 {
183   m_is_queue_running= false;
184   m_thread_group.join_all();
185   delete m_thread_callback;
186   std::map<const I_dump_task*, std::vector<Item_processing_data*>* >
187     ::iterator it= m_tasks_map.begin();
188   for(;it != m_tasks_map.end(); ++it)
189     delete it->second;
190 }
191 
Object_queue(Mysql::I_callable<bool,const Mysql::Tools::Base::Message_data &> * message_handler,Simple_id_generator * object_id_generator,uint threads_count,Mysql::I_callable<void,bool> * thread_callback,Mysql::Tools::Base::Abstract_program * program)192 Object_queue::Object_queue(
193   Mysql::I_callable<bool, const Mysql::Tools::Base::Message_data&>*
194     message_handler, Simple_id_generator* object_id_generator,
195   uint threads_count, Mysql::I_callable<void, bool>* thread_callback,
196   Mysql::Tools::Base::Abstract_program* program)
197   : Abstract_object_reader_wrapper(message_handler, object_id_generator),
198   m_task_availability_callback(
199   this, &Object_queue::task_availability_callback),
200   m_is_queue_running(true),
201   m_thread_callback(thread_callback),
202   m_program(program)
203 {
204   for (int thread= threads_count; thread-- > 0;)
205   {
206     m_thread_group.create_thread(
207       boost::bind(&Object_queue::queue_thread, this));
208   }
209 }
210