1 /*
2   Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "abstract_progress_watcher.h"
26 #include <algorithm>
27 #include "table_rows_dump_task.h"
28 #include "table_definition_dump_task.h"
29 #include "row_group_dump_task.h"
30 
31 using namespace Mysql::Tools::Dump;
32 
progress_changed()33 void Abstract_progress_watcher::progress_changed()
34 {
35   if (--m_step_countdown == 0)
36   {
37     boost::chrono::system_clock::time_point now=
38       boost::chrono::system_clock::now();
39 
40     double stages_past=
41       std::max(boost::chrono::duration_cast<
42       boost::chrono::duration<double> >(
43       now - m_last_stage_time) /
44       boost::chrono::milliseconds(REPORT_DELAY_MS / STAGES),
45       0.1); //  Do not expand stage by more than 10 times the steps.
46 
47     m_step_countdown= m_last_step_countdown= std::max(1LL,
48       ((int64)(m_last_step_countdown / stages_past)
49       + m_last_step_countdown) / 2);
50     m_last_stage_time= now;
51 
52     uint64 stages_past_int= 1000 * std::min(stages_past, 10.0);
53     uint64 last_stage= m_stage_countdown.fetch_sub(stages_past_int);
54 
55     if (last_stage <= stages_past_int)
56     {
57       Progress_data change= m_progress - m_last_progress;
58       m_last_progress= m_progress;
59 
60       this->process_progress_step(change);
61 
62       m_stage_countdown= STAGES * 1000;
63     }
64   }
65 }
66 
Abstract_progress_watcher(Mysql::I_callable<bool,const Mysql::Tools::Base::Message_data &> * message_handler,Simple_id_generator * object_id_generator)67 Abstract_progress_watcher::Abstract_progress_watcher(
68   Mysql::I_callable <bool, const Mysql::Tools::Base::Message_data&>*
69     message_handler, Simple_id_generator* object_id_generator)
70   : Abstract_chain_element(message_handler, object_id_generator),
71   m_step_countdown(1),
72   m_stage_countdown(STAGES * 1000),
73   m_last_step_countdown(1)
74 {}
75 
crawler_completed(I_crawler * crawler)76 void Abstract_progress_watcher::crawler_completed(I_crawler* crawler)
77 {}
78 
object_processing_ended(Item_processing_data * finished_process_data)79 void Abstract_progress_watcher::object_processing_ended(
80   Item_processing_data* finished_process_data)
81 {
82   // Check if it is last task in the chain.
83   if (finished_process_data->get_parent_item_data() != NULL &&
84     finished_process_data->get_parent_item_data()->get_process_task_object()
85     == finished_process_data->get_process_task_object())
86   {
87     return;
88   }
89   Table_rows_dump_task* processed_table_task=
90     dynamic_cast<Table_rows_dump_task*>(
91     finished_process_data->get_process_task_object());
92   if (processed_table_task != NULL
93     && finished_process_data->had_chain_created())
94   {
95     m_progress.m_table_count++;
96     this->progress_changed();
97     return;
98   }
99 
100   Row_group_dump_task* processed_row_group=
101     dynamic_cast<Row_group_dump_task*>(
102     finished_process_data->get_process_task_object());
103   if (processed_row_group != NULL && processed_row_group->is_completed())
104   {
105     m_progress.m_row_count+= processed_row_group->m_rows.size();
106     this->progress_changed();
107     return;
108   }
109 }
110 
object_processing_started(Item_processing_data * process_data)111 void Abstract_progress_watcher::object_processing_started(
112   Item_processing_data* process_data)
113 {}
114 
new_chain_created(Item_processing_data * new_chain_process_data)115 void Abstract_progress_watcher::new_chain_created(
116   Item_processing_data* new_chain_process_data)
117 {
118   Table_definition_dump_task* new_table_task=
119     dynamic_cast<Table_definition_dump_task*>(
120     new_chain_process_data->get_process_task_object());
121   if (new_table_task != NULL)
122   {
123     Table* new_table= new_table_task->get_related_table();
124 
125     m_total.m_table_count++;
126     m_total.m_row_data+= new_table->get_row_data_lenght();
127     m_total.m_row_count+= new_table->get_row_count();
128   }
129 }
130 
131 Abstract_progress_watcher::Progress_data
operator -(const Progress_data & to_subtract)132   Abstract_progress_watcher::Progress_data::operator-(
133     const Progress_data& to_subtract)
134 {
135   Progress_data res;
136   res.m_table_count= (uint64)m_table_count - (uint64)to_subtract.m_table_count;
137   res.m_row_data= (uint64)m_row_data - (uint64)to_subtract.m_row_data;
138   res.m_row_count= (uint64)m_row_count - (uint64)to_subtract.m_row_count;
139 
140   return res;
141 }
142 
143 Abstract_progress_watcher::Progress_data&
operator =(const Progress_data & to_copy)144   Abstract_progress_watcher::Progress_data::operator=(
145     const Progress_data& to_copy)
146 {
147   m_table_count= to_copy.m_table_count.load();
148   m_row_data= to_copy.m_row_data.load();
149   m_row_count= to_copy.m_row_count.load();
150 
151   return *this;
152 }
153 
Progress_data(const Abstract_progress_watcher::Progress_data & to_copy)154 Abstract_progress_watcher::Progress_data::Progress_data(
155   const Abstract_progress_watcher::Progress_data& to_copy)
156 {
157   *this= to_copy;
158 }
159 
Progress_data()160 Abstract_progress_watcher::Progress_data::Progress_data()
161   : m_table_count(0),
162   m_row_data(0),
163   m_row_count(0)
164 {}
165