1 /*
2   Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "client/dump/abstract_progress_watcher.h"
26 
27 #include <stddef.h>
28 #include <algorithm>
29 #include <chrono>
30 #include <functional>
31 
32 #include "client/dump/row_group_dump_task.h"
33 #include "client/dump/table_definition_dump_task.h"
34 #include "client/dump/table_rows_dump_task.h"
35 
36 using namespace Mysql::Tools::Dump;
37 
progress_changed()38 void Abstract_progress_watcher::progress_changed() {
39   if (--m_step_countdown == 0) {
40     std::chrono::system_clock::time_point now =
41         std::chrono::system_clock::now();
42 
43     double stages_past =
44         std::max(std::chrono::duration_cast<std::chrono::duration<double>>(
45                      now - m_last_stage_time) /
46                      std::chrono::milliseconds(REPORT_DELAY_MS / STAGES),
47                  0.1);  //  Do not expand stage by more than 10 times the steps.
48 
49     m_step_countdown = m_last_step_countdown = std::max<int64>(
50         1,
51         ((int64)(m_last_step_countdown / stages_past) + m_last_step_countdown) /
52             2);
53     m_last_stage_time = now;
54 
55     uint64 stages_past_int = 1000 * std::min(stages_past, 10.0);
56     uint64 last_stage = m_stage_countdown.fetch_sub(stages_past_int);
57 
58     if (last_stage <= stages_past_int) {
59       Progress_data change = m_progress - m_last_progress;
60       m_last_progress = m_progress;
61 
62       this->process_progress_step(change);
63 
64       m_stage_countdown = STAGES * 1000;
65     }
66   }
67 }
68 
Abstract_progress_watcher(std::function<bool (const Mysql::Tools::Base::Message_data &)> * message_handler,Simple_id_generator * object_id_generator)69 Abstract_progress_watcher::Abstract_progress_watcher(
70     std::function<bool(const Mysql::Tools::Base::Message_data &)>
71         *message_handler,
72     Simple_id_generator *object_id_generator)
73     : Abstract_chain_element(message_handler, object_id_generator),
74       m_step_countdown(1),
75       m_stage_countdown(STAGES * 1000),
76       m_last_step_countdown(1) {}
77 
crawler_completed(I_crawler *)78 void Abstract_progress_watcher::crawler_completed(I_crawler *) {}
79 
object_processing_ended(Item_processing_data * finished_process_data)80 void Abstract_progress_watcher::object_processing_ended(
81     Item_processing_data *finished_process_data) {
82   // Check if it is last task in the chain.
83   if (finished_process_data->get_parent_item_data() != nullptr &&
84       finished_process_data->get_parent_item_data()
85               ->get_process_task_object() ==
86           finished_process_data->get_process_task_object()) {
87     return;
88   }
89   Table_rows_dump_task *processed_table_task =
90       dynamic_cast<Table_rows_dump_task *>(
91           finished_process_data->get_process_task_object());
92   if (processed_table_task != nullptr &&
93       finished_process_data->had_chain_created()) {
94     m_progress.m_table_count++;
95     this->progress_changed();
96     return;
97   }
98 
99   Row_group_dump_task *processed_row_group =
100       dynamic_cast<Row_group_dump_task *>(
101           finished_process_data->get_process_task_object());
102   if (processed_row_group != nullptr && processed_row_group->is_completed()) {
103     m_progress.m_row_count += processed_row_group->m_rows.size();
104     this->progress_changed();
105     return;
106   }
107 }
108 
object_processing_started(Item_processing_data *)109 void Abstract_progress_watcher::object_processing_started(
110     Item_processing_data *) {}
111 
new_chain_created(Item_processing_data * new_chain_process_data)112 void Abstract_progress_watcher::new_chain_created(
113     Item_processing_data *new_chain_process_data) {
114   Table_definition_dump_task *new_table_task =
115       dynamic_cast<Table_definition_dump_task *>(
116           new_chain_process_data->get_process_task_object());
117   if (new_table_task != nullptr) {
118     Table *new_table = new_table_task->get_related_table();
119 
120     m_total.m_table_count++;
121     m_total.m_row_data += new_table->get_row_data_lenght();
122     m_total.m_row_count += new_table->get_row_count();
123   }
124 }
125 
126 Abstract_progress_watcher::Progress_data
operator -(const Progress_data & to_subtract)127 Abstract_progress_watcher::Progress_data::operator-(
128     const Progress_data &to_subtract) {
129   Progress_data res;
130   res.m_table_count = (uint64)m_table_count - (uint64)to_subtract.m_table_count;
131   res.m_row_data = (uint64)m_row_data - (uint64)to_subtract.m_row_data;
132   res.m_row_count = (uint64)m_row_count - (uint64)to_subtract.m_row_count;
133 
134   return res;
135 }
136 
137 Abstract_progress_watcher::Progress_data &
operator =(const Progress_data & to_copy)138 Abstract_progress_watcher::Progress_data::operator=(
139     const Progress_data &to_copy) {
140   m_table_count = to_copy.m_table_count.load();
141   m_row_data = to_copy.m_row_data.load();
142   m_row_count = to_copy.m_row_count.load();
143 
144   return *this;
145 }
146 
Progress_data(const Abstract_progress_watcher::Progress_data & to_copy)147 Abstract_progress_watcher::Progress_data::Progress_data(
148     const Abstract_progress_watcher::Progress_data &to_copy) {
149   *this = to_copy;
150 }
151 
Progress_data()152 Abstract_progress_watcher::Progress_data::Progress_data()
153     : m_table_count(0), m_row_data(0), m_row_count(0) {}
154