1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "rpl_transaction_write_set_ctx.h"
24
25 #include "mysql/service_rpl_transaction_write_set.h" // Transaction_write_set
26 #include "mysqld_thd_manager.h" // Global_THD_manager
27 #include "sql_class.h" // THD
28 #include "sql_parse.h" // Find_thd_with_id
29 #include "debug_sync.h" // debug_sync_set_action
30 #include "binlog.h" // get_opt_max_history_size
31
32 int32 Rpl_transaction_write_set_ctx::m_global_component_requires_write_sets(0);
33 int64 Rpl_transaction_write_set_ctx::m_global_write_set_memory_size_limit(0);
34
Rpl_transaction_write_set_ctx()35 Rpl_transaction_write_set_ctx::Rpl_transaction_write_set_ctx()
36 : m_has_missing_keys(false),
37 m_has_related_foreign_keys(false),
38 m_ignore_write_set_memory_limit(false),
39 m_local_allow_drop_write_set(false),
40 m_local_has_reached_write_set_limit(false)
41 {
42 DBUG_ENTER("Rpl_transaction_write_set_ctx::Rpl_transaction_write_set_ctx");
43 /*
44 In order to speed-up small transactions write-set extraction,
45 we preallocate 24 elements.
46 24 is a sufficient number to hold write-sets for single
47 statement transactions, even on tables with foreign keys.
48 */
49 write_set.reserve(24);
50 DBUG_VOID_RETURN;
51 }
52
add_write_set(uint64 hash)53 bool Rpl_transaction_write_set_ctx::add_write_set(uint64 hash)
54 {
55 DBUG_EXECUTE_IF("add_write_set_no_memory", throw std::bad_alloc(););
56
57 if (!m_local_has_reached_write_set_limit) {
58 ulong binlog_trx_dependency_history_size =
59 mysql_bin_log.m_dependency_tracker.get_writeset()
60 ->get_opt_max_history_size();
61 bool is_full_writeset_required =
62 m_global_component_requires_write_sets && !m_local_allow_drop_write_set;
63
64 if (!is_full_writeset_required) {
65 if (write_set.size() >= binlog_trx_dependency_history_size) {
66 m_local_has_reached_write_set_limit = true;
67 clear_write_set();
68 return false;
69 }
70 }
71
72 uint64 mem_limit = m_global_write_set_memory_size_limit;
73 if (mem_limit && !m_ignore_write_set_memory_limit) {
74 // Check if adding a new element goes over the limit
75 if (sizeof(uint64) + write_set_memory_size() > mem_limit) {
76 my_error(ER_WRITE_SET_EXCEEDS_LIMIT, MYF(0));
77 return true;
78 }
79 }
80
81 write_set.push_back(hash);
82 write_set_unique.insert(hash);
83 }
84
85 return false;
86 }
87
get_write_set()88 std::set<uint64>* Rpl_transaction_write_set_ctx::get_write_set()
89 {
90 DBUG_ENTER("Rpl_transaction_write_set_ctx::get_write_set");
91 DBUG_RETURN(&write_set_unique);
92 }
93
reset_state()94 void Rpl_transaction_write_set_ctx::reset_state() {
95 clear_write_set();
96 m_has_missing_keys = m_has_related_foreign_keys = false;
97 m_local_has_reached_write_set_limit = false;
98 }
99
clear_write_set()100 void Rpl_transaction_write_set_ctx::clear_write_set() {
101 write_set.clear();
102 write_set_unique.clear();
103 savepoint.clear();
104 savepoint_list.clear();
105 }
106
set_has_missing_keys()107 void Rpl_transaction_write_set_ctx::set_has_missing_keys()
108 {
109 DBUG_ENTER("Transaction_context_log_event::set_has_missing_keys");
110 m_has_missing_keys= true;
111 DBUG_VOID_RETURN;
112 }
113
get_has_missing_keys()114 bool Rpl_transaction_write_set_ctx::get_has_missing_keys()
115 {
116 DBUG_ENTER("Transaction_context_log_event::get_has_missing_keys");
117 DBUG_RETURN(m_has_missing_keys);
118 }
119
set_has_related_foreign_keys()120 void Rpl_transaction_write_set_ctx::set_has_related_foreign_keys()
121 {
122 DBUG_ENTER("Transaction_context_log_event::set_has_related_foreign_keys");
123 m_has_related_foreign_keys= true;
124 DBUG_VOID_RETURN;
125 }
126
get_has_related_foreign_keys()127 bool Rpl_transaction_write_set_ctx::get_has_related_foreign_keys()
128 {
129 DBUG_ENTER("Transaction_context_log_event::get_has_related_foreign_keys");
130 DBUG_RETURN(m_has_related_foreign_keys);
131 }
132
was_write_set_limit_reached()133 bool Rpl_transaction_write_set_ctx::was_write_set_limit_reached() {
134 return m_local_has_reached_write_set_limit;
135 }
136
write_set_memory_size()137 size_t Rpl_transaction_write_set_ctx::write_set_memory_size() {
138 return sizeof(uint64) * write_set.size();
139 }
140
set_global_require_full_write_set(bool requires_ws)141 void Rpl_transaction_write_set_ctx::set_global_require_full_write_set(
142 bool requires_ws) {
143 assert(!requires_ws || !m_global_component_requires_write_sets);
144 if (requires_ws)
145 my_atomic_store32(&m_global_component_requires_write_sets, 1);
146 else
147 my_atomic_store32(&m_global_component_requires_write_sets, 0);
148 }
149
require_full_write_set(int requires_ws)150 void require_full_write_set(int requires_ws) {
151 Rpl_transaction_write_set_ctx::set_global_require_full_write_set(requires_ws);
152 }
153
set_global_write_set_memory_size_limit(int64 limit)154 void Rpl_transaction_write_set_ctx::set_global_write_set_memory_size_limit(int64 limit) {
155 assert(m_global_write_set_memory_size_limit == 0);
156 m_global_write_set_memory_size_limit = limit;
157 }
158
update_global_write_set_memory_size_limit(int64 limit)159 void Rpl_transaction_write_set_ctx::update_global_write_set_memory_size_limit(
160 int64 limit) {
161 m_global_write_set_memory_size_limit = limit;
162 }
163
set_write_set_memory_size_limit(long long size_limit)164 void set_write_set_memory_size_limit(long long size_limit) {
165 Rpl_transaction_write_set_ctx::set_global_write_set_memory_size_limit(
166 size_limit);
167 }
168
update_write_set_memory_size_limit(long long size_limit)169 void update_write_set_memory_size_limit(long long size_limit) {
170 Rpl_transaction_write_set_ctx::update_global_write_set_memory_size_limit(
171 size_limit);
172 }
173
set_local_ignore_write_set_memory_limit(bool ignore_limit)174 void Rpl_transaction_write_set_ctx::set_local_ignore_write_set_memory_limit(
175 bool ignore_limit) {
176 m_ignore_write_set_memory_limit = ignore_limit;
177 }
178
set_local_allow_drop_write_set(bool allow_drop_write_set)179 void Rpl_transaction_write_set_ctx::set_local_allow_drop_write_set(
180 bool allow_drop_write_set) {
181 m_local_allow_drop_write_set = allow_drop_write_set;
182 }
183
184 /**
185 Implementation of service_rpl_transaction_write_set, see
186 @file include/mysql/service_rpl_transaction_write_set.h
187 */
188
get_transaction_write_set(unsigned long m_thread_id)189 Transaction_write_set* get_transaction_write_set(unsigned long m_thread_id)
190 {
191 DBUG_ENTER("get_transaction_write_set");
192 THD *thd= NULL;
193 Transaction_write_set *result_set= NULL;
194 Find_thd_with_id find_thd_with_id(m_thread_id, false);
195
196 thd= Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
197 if (thd)
198 {
199 std::set<uint64> *write_set= thd->get_transaction()
200 ->get_transaction_write_set_ctx()->get_write_set();
201 unsigned long write_set_size= write_set->size();
202 if (write_set_size == 0)
203 {
204 mysql_mutex_unlock(&thd->LOCK_thd_data);
205 DBUG_RETURN(NULL);
206 }
207
208 result_set= (Transaction_write_set*)my_malloc(key_memory_write_set_extraction,
209 sizeof(Transaction_write_set),
210 MYF(0));
211 result_set->write_set_size= write_set_size;
212 result_set->write_set=
213 (unsigned long long*)my_malloc(key_memory_write_set_extraction,
214 write_set_size *
215 sizeof(unsigned long long),
216 MYF(0));
217 int result_set_index= 0;
218 for (std::set<uint64>::iterator it= write_set->begin();
219 it != write_set->end();
220 ++it)
221 {
222 uint64 temp= *it;
223 result_set->write_set[result_set_index++]=temp;
224 }
225 mysql_mutex_unlock(&thd->LOCK_thd_data);
226 }
227 DBUG_RETURN(result_set);
228 }
229
add_savepoint(char * name)230 void Rpl_transaction_write_set_ctx::add_savepoint(char* name)
231 {
232 DBUG_ENTER("Rpl_transaction_write_set_ctx::add_savepoint");
233 std::string identifier(name);
234
235 DBUG_EXECUTE_IF("transaction_write_set_savepoint_clear_on_commit_rollback",
236 {
237 assert(savepoint.size() == 0);
238 assert(write_set.size() == 0);
239 assert(write_set_unique.size() == 0);
240 assert(savepoint_list.size() == 0);
241 });
242
243 DBUG_EXECUTE_IF("transaction_write_set_savepoint_level",
244 assert(savepoint.size() == 0););
245
246 std::map<std::string, size_t>::iterator it;
247
248 /*
249 Savepoint with the same name, the old savepoint is deleted and a new one
250 is set
251 */
252 if ((it= savepoint.find(name)) != savepoint.end())
253 savepoint.erase(it);
254
255 savepoint.insert(std::pair<std::string, size_t>(identifier,
256 write_set.size()));
257
258 DBUG_EXECUTE_IF("transaction_write_set_savepoint_add_savepoint",
259 assert(savepoint.find(identifier)->second == write_set.size()););
260
261 DBUG_VOID_RETURN;
262 }
263
del_savepoint(char * name)264 void Rpl_transaction_write_set_ctx::del_savepoint(char* name)
265 {
266 DBUG_ENTER("Rpl_transaction_write_set_ctx::del_savepoint");
267 std::string identifier(name);
268
269 DBUG_EXECUTE_IF("transaction_write_set_savepoint_block_before_release",
270 {
271 const char act[]= "now wait_for signal.unblock_release";
272 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
273 });
274
275 savepoint.erase(identifier);
276
277 DBUG_VOID_RETURN;
278 }
279
rollback_to_savepoint(char * name)280 void Rpl_transaction_write_set_ctx::rollback_to_savepoint(char* name)
281 {
282 DBUG_ENTER("Rpl_transaction_write_set_ctx::rollback_to_savepoint");
283 size_t position= 0;
284 std::string identifier(name);
285 std::map<std::string, size_t>::iterator elem;
286
287 if ((elem = savepoint.find(identifier)) != savepoint.end())
288 {
289 assert(elem->second <= write_set.size());
290
291 DBUG_EXECUTE_IF("transaction_write_set_savepoint_block_before_rollback",
292 {
293 const char act[]= "now wait_for signal.unblock_rollback";
294 assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
295 });
296
297 position= elem->second;
298
299 // Remove all savepoints created after the savepoint identifier given as
300 // parameter
301 std::map<std::string, size_t>::iterator it= savepoint.begin();
302 while (it != savepoint.end())
303 {
304 if (it->second > position)
305 savepoint.erase(it++);
306 else
307 ++it;
308 }
309
310 /*
311 We need to check that:
312 - starting index of the range we want to erase does exist.
313 - write_set size have elements to be removed
314 */
315 if (write_set.size() > 0 && position < write_set.size())
316 {
317 // Clear all elements after savepoint
318 write_set.erase(write_set.begin() + position, write_set.end());
319 // Since the write_set_unique set does not have insert order, the
320 // elements are ordered according its value, we need to rebuild it.
321 write_set_unique.clear();
322 write_set_unique.insert(write_set.begin(), write_set.end());
323 }
324
325 DBUG_EXECUTE_IF("transaction_write_set_savepoint_add_savepoint", {
326 assert(write_set.size() == 2);
327 assert(write_set_unique.size() == 2);});
328
329 DBUG_EXECUTE_IF("transaction_write_set_size_2", {
330 assert(write_set.size() == 4);
331 assert(write_set_unique.size() == 4);});
332 }
333
334 DBUG_VOID_RETURN;
335 }
336
337
reset_savepoint_list()338 void Rpl_transaction_write_set_ctx::reset_savepoint_list()
339 {
340 DBUG_ENTER("Rpl_transaction_write_set_ctx::reset_savepoint_list");
341
342 savepoint_list.push_back(savepoint);
343 savepoint.clear();
344
345 DBUG_VOID_RETURN;
346 }
347
restore_savepoint_list()348 void Rpl_transaction_write_set_ctx::restore_savepoint_list()
349 {
350 DBUG_ENTER("Rpl_transaction_write_set_ctx::restore_savepoint_list");
351
352 if (!savepoint_list.empty())
353 {
354 savepoint = savepoint_list.back();
355 savepoint_list.pop_back();
356 }
357
358 DBUG_VOID_RETURN;
359 }
360