1 /* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "plugin/group_replication/include/plugin_handlers/server_ongoing_transactions_handler.h"
24 #include "mysql/components/services/ongoing_transaction_query_service.h"
25 #include "plugin/group_replication/include/plugin.h"
26 
27 #include <vector>
28 
Server_ongoing_transactions_handler()29 Server_ongoing_transactions_handler::Server_ongoing_transactions_handler()
30     : generic_service(nullptr) {
31   mysql_mutex_init(key_GR_LOCK_server_ongoing_transaction_handler,
32                    &query_wait_lock, MY_MUTEX_INIT_FAST);
33 }
34 
initialize_server_service(Plugin_stage_monitor_handler * stage_handler_arg)35 bool Server_ongoing_transactions_handler::initialize_server_service(
36     Plugin_stage_monitor_handler *stage_handler_arg) {
37   SERVICE_TYPE(registry) *registry = nullptr;
38   if (!registry_module || !(registry = registry_module->get_registry_handle()))
39     return true; /* purecov: inspected */
40   registry->acquire("mysql_ongoing_transactions_query", &generic_service);
41   stage_handler = stage_handler_arg;
42   return false;
43 }
44 
~Server_ongoing_transactions_handler()45 Server_ongoing_transactions_handler::~Server_ongoing_transactions_handler() {
46   mysql_mutex_destroy(&query_wait_lock);
47   SERVICE_TYPE(registry) *registry = nullptr;
48   if (!registry_module ||
49       !(registry = registry_module->get_registry_handle())) {
50     DBUG_ASSERT(0); /* purecov: inspected */
51     return;
52   }
53   registry->release(generic_service);
54 }
55 
get_server_running_transactions(ulong ** ids,ulong * size)56 bool Server_ongoing_transactions_handler::get_server_running_transactions(
57     ulong **ids, ulong *size) {
58   SERVICE_TYPE(mysql_ongoing_transactions_query) * server_transaction_service;
59   server_transaction_service =
60       reinterpret_cast<SERVICE_TYPE(mysql_ongoing_transactions_query) *>(
61           generic_service);
62   if (generic_service)
63     return server_transaction_service->get_ongoing_server_transactions(ids,
64                                                                        size);
65   else
66     return true; /* purecov: inspected */
67 }
68 
69 int Server_ongoing_transactions_handler::
wait_for_current_transaction_load_execution(bool * abort_flag,my_thread_id id_to_ignore)70     wait_for_current_transaction_load_execution(bool *abort_flag,
71                                                 my_thread_id id_to_ignore) {
72   group_transaction_observation_manager->register_transaction_observer(this);
73   unsigned long *thread_id_array = nullptr;
74   unsigned long size = 0;
75   bool error = get_server_running_transactions(&thread_id_array, &size);
76 
77   std::set<my_thread_id> transactions_to_wait;
78   if (!error)
79     transactions_to_wait.insert(thread_id_array, thread_id_array + size);
80   my_free(thread_id_array);
81   thread_id_array = nullptr;
82 
83   if (id_to_ignore) {
84     transactions_to_wait.erase(id_to_ignore);
85     size = transactions_to_wait.size();
86   }
87 
88   ulong transactions_to_wait_size = size;
89   if (stage_handler) stage_handler->set_estimated_work(size);
90 
91   while (!transactions_to_wait.empty() && !(*abort_flag) && !error) {
92     mysql_mutex_lock(&query_wait_lock);
93 
94     while (!thread_ids_finished.empty() && !transactions_to_wait.empty()) {
95       transactions_to_wait.erase(thread_ids_finished.front());
96       thread_ids_finished.pop();
97     }
98     mysql_mutex_unlock(&query_wait_lock);
99 
100     if (stage_handler) {
101       ulong transactions_ended =
102           transactions_to_wait_size - transactions_to_wait.size();
103       stage_handler->set_completed_work(transactions_ended);
104     }
105 
106     // Sleep to give some more transactions time to finish.
107     my_sleep(100);
108 
109     error = get_server_running_transactions(&thread_id_array, &size);
110     std::set<my_thread_id> current_transactions;
111     current_transactions.insert(thread_id_array, thread_id_array + size);
112     my_free(thread_id_array);
113     thread_id_array = nullptr;
114 
115     mysql_mutex_lock(&query_wait_lock);
116     for (my_thread_id thread_id : transactions_to_wait) {
117       if (current_transactions.find(thread_id) == current_transactions.end()) {
118         thread_ids_finished.push(thread_id);
119       }
120     }
121     mysql_mutex_unlock(&query_wait_lock);
122   }
123 
124   group_transaction_observation_manager->unregister_transaction_observer(this);
125   return error;
126 }
127 
128 /*
129   These methods are necessary to fulfil the Group_transaction_listener
130   interface.
131 */
132 /* purecov: begin inspected */
before_transaction_begin(my_thread_id,ulong,ulong,enum_rpl_channel_type)133 int Server_ongoing_transactions_handler::before_transaction_begin(
134     my_thread_id, ulong, ulong, enum_rpl_channel_type) {
135   return 0;
136 }
137 
before_commit(my_thread_id,Group_transaction_listener::enum_transaction_origin)138 int Server_ongoing_transactions_handler::before_commit(
139     my_thread_id, Group_transaction_listener::enum_transaction_origin) {
140   return 0;
141 }
142 
before_rollback(my_thread_id,Group_transaction_listener::enum_transaction_origin)143 int Server_ongoing_transactions_handler::before_rollback(
144     my_thread_id, Group_transaction_listener::enum_transaction_origin) {
145   return 0;
146 }
147 /* purecov: end */
148 
after_rollback(my_thread_id thread_id)149 int Server_ongoing_transactions_handler::after_rollback(
150     my_thread_id thread_id) {
151   mysql_mutex_lock(&query_wait_lock);
152   thread_ids_finished.push(thread_id);
153   mysql_mutex_unlock(&query_wait_lock);
154   return 0;
155 }
after_commit(my_thread_id thread_id,rpl_sidno,rpl_gno)156 int Server_ongoing_transactions_handler::after_commit(my_thread_id thread_id,
157                                                       rpl_sidno, rpl_gno) {
158   mysql_mutex_lock(&query_wait_lock);
159   thread_ids_finished.push(thread_id);
160   mysql_mutex_unlock(&query_wait_lock);
161 
162   return 0;
163 }
164