1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "rpl_transaction_write_set_ctx.h"
24 
25 #include "mysql/service_rpl_transaction_write_set.h" // Transaction_write_set
26 #include "mysqld_thd_manager.h"                      // Global_THD_manager
27 #include "sql_class.h"                               // THD
28 #include "sql_parse.h"                               // Find_thd_with_id
29 #include "debug_sync.h"                              // debug_sync_set_action
30 #include "binlog.h"                              // get_opt_max_history_size
31 
32 int32 Rpl_transaction_write_set_ctx::m_global_component_requires_write_sets(0);
33 int64 Rpl_transaction_write_set_ctx::m_global_write_set_memory_size_limit(0);
34 
Rpl_transaction_write_set_ctx()35 Rpl_transaction_write_set_ctx::Rpl_transaction_write_set_ctx()
36     : m_has_missing_keys(false),
37       m_has_related_foreign_keys(false),
38       m_ignore_write_set_memory_limit(false),
39       m_local_allow_drop_write_set(false),
40       m_local_has_reached_write_set_limit(false)
41 {
42   DBUG_ENTER("Rpl_transaction_write_set_ctx::Rpl_transaction_write_set_ctx");
43   /*
44     In order to speed-up small transactions write-set extraction,
45     we preallocate 24 elements.
46     24 is a sufficient number to hold write-sets for single
47     statement transactions, even on tables with foreign keys.
48   */
49   write_set.reserve(24);
50   DBUG_VOID_RETURN;
51 }
52 
add_write_set(uint64 hash)53 bool Rpl_transaction_write_set_ctx::add_write_set(uint64 hash)
54 {
55   DBUG_EXECUTE_IF("add_write_set_no_memory", throw std::bad_alloc(););
56 
57   if (!m_local_has_reached_write_set_limit) {
58     ulong binlog_trx_dependency_history_size =
59         mysql_bin_log.m_dependency_tracker.get_writeset()
60             ->get_opt_max_history_size();
61     bool is_full_writeset_required =
62         m_global_component_requires_write_sets && !m_local_allow_drop_write_set;
63 
64     if (!is_full_writeset_required) {
65       if (write_set.size() >= binlog_trx_dependency_history_size) {
66         m_local_has_reached_write_set_limit = true;
67         clear_write_set();
68         return false;
69       }
70     }
71 
72     uint64 mem_limit = m_global_write_set_memory_size_limit;
73     if (mem_limit && !m_ignore_write_set_memory_limit) {
74       // Check if adding a new element goes over the limit
75       if (sizeof(uint64) + write_set_memory_size() > mem_limit) {
76         my_error(ER_WRITE_SET_EXCEEDS_LIMIT, MYF(0));
77         return true;
78       }
79     }
80 
81     write_set.push_back(hash);
82     write_set_unique.insert(hash);
83   }
84 
85   return false;
86 }
87 
get_write_set()88 std::set<uint64>* Rpl_transaction_write_set_ctx::get_write_set()
89 {
90   DBUG_ENTER("Rpl_transaction_write_set_ctx::get_write_set");
91   DBUG_RETURN(&write_set_unique);
92 }
93 
reset_state()94 void Rpl_transaction_write_set_ctx::reset_state() {
95   clear_write_set();
96   m_has_missing_keys = m_has_related_foreign_keys = false;
97   m_local_has_reached_write_set_limit = false;
98 }
99 
clear_write_set()100 void Rpl_transaction_write_set_ctx::clear_write_set() {
101   write_set.clear();
102   write_set_unique.clear();
103   savepoint.clear();
104   savepoint_list.clear();
105 }
106 
set_has_missing_keys()107 void Rpl_transaction_write_set_ctx::set_has_missing_keys()
108 {
109   DBUG_ENTER("Transaction_context_log_event::set_has_missing_keys");
110   m_has_missing_keys= true;
111   DBUG_VOID_RETURN;
112 }
113 
get_has_missing_keys()114 bool Rpl_transaction_write_set_ctx::get_has_missing_keys()
115 {
116   DBUG_ENTER("Transaction_context_log_event::get_has_missing_keys");
117   DBUG_RETURN(m_has_missing_keys);
118 }
119 
set_has_related_foreign_keys()120 void Rpl_transaction_write_set_ctx::set_has_related_foreign_keys()
121 {
122   DBUG_ENTER("Transaction_context_log_event::set_has_related_foreign_keys");
123   m_has_related_foreign_keys= true;
124   DBUG_VOID_RETURN;
125 }
126 
get_has_related_foreign_keys()127 bool Rpl_transaction_write_set_ctx::get_has_related_foreign_keys()
128 {
129   DBUG_ENTER("Transaction_context_log_event::get_has_related_foreign_keys");
130   DBUG_RETURN(m_has_related_foreign_keys);
131 }
132 
was_write_set_limit_reached()133 bool Rpl_transaction_write_set_ctx::was_write_set_limit_reached() {
134   return m_local_has_reached_write_set_limit;
135 }
136 
write_set_memory_size()137 size_t Rpl_transaction_write_set_ctx::write_set_memory_size() {
138   return sizeof(uint64) * write_set.size();
139 }
140 
set_global_require_full_write_set(bool requires_ws)141 void Rpl_transaction_write_set_ctx::set_global_require_full_write_set(
142     bool requires_ws) {
143   assert(!requires_ws || !m_global_component_requires_write_sets);
144   if (requires_ws)
145     my_atomic_store32(&m_global_component_requires_write_sets, 1);
146   else
147     my_atomic_store32(&m_global_component_requires_write_sets, 0);
148 }
149 
require_full_write_set(int requires_ws)150 void require_full_write_set(int requires_ws) {
151   Rpl_transaction_write_set_ctx::set_global_require_full_write_set(requires_ws);
152 }
153 
set_global_write_set_memory_size_limit(int64 limit)154 void Rpl_transaction_write_set_ctx::set_global_write_set_memory_size_limit(int64 limit) {
155   assert(m_global_write_set_memory_size_limit == 0);
156   m_global_write_set_memory_size_limit = limit;
157 }
158 
update_global_write_set_memory_size_limit(int64 limit)159 void Rpl_transaction_write_set_ctx::update_global_write_set_memory_size_limit(
160     int64 limit) {
161   m_global_write_set_memory_size_limit = limit;
162 }
163 
set_write_set_memory_size_limit(long long size_limit)164 void set_write_set_memory_size_limit(long long size_limit) {
165   Rpl_transaction_write_set_ctx::set_global_write_set_memory_size_limit(
166       size_limit);
167 }
168 
update_write_set_memory_size_limit(long long size_limit)169 void update_write_set_memory_size_limit(long long size_limit) {
170   Rpl_transaction_write_set_ctx::update_global_write_set_memory_size_limit(
171       size_limit);
172 }
173 
set_local_ignore_write_set_memory_limit(bool ignore_limit)174 void Rpl_transaction_write_set_ctx::set_local_ignore_write_set_memory_limit(
175     bool ignore_limit) {
176   m_ignore_write_set_memory_limit = ignore_limit;
177 }
178 
set_local_allow_drop_write_set(bool allow_drop_write_set)179 void Rpl_transaction_write_set_ctx::set_local_allow_drop_write_set(
180     bool allow_drop_write_set) {
181   m_local_allow_drop_write_set = allow_drop_write_set;
182 }
183 
184 /**
185   Implementation of service_rpl_transaction_write_set, see
186   @file include/mysql/service_rpl_transaction_write_set.h
187 */
188 
get_transaction_write_set(unsigned long m_thread_id)189 Transaction_write_set* get_transaction_write_set(unsigned long m_thread_id)
190 {
191   DBUG_ENTER("get_transaction_write_set");
192   THD *thd= NULL;
193   Transaction_write_set *result_set= NULL;
194   Find_thd_with_id find_thd_with_id(m_thread_id, false);
195 
196   thd= Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
197   if (thd)
198   {
199     std::set<uint64> *write_set= thd->get_transaction()
200         ->get_transaction_write_set_ctx()->get_write_set();
201     unsigned long write_set_size= write_set->size();
202     if (write_set_size == 0)
203     {
204       mysql_mutex_unlock(&thd->LOCK_thd_data);
205       DBUG_RETURN(NULL);
206     }
207 
208     result_set= (Transaction_write_set*)my_malloc(key_memory_write_set_extraction,
209                                                   sizeof(Transaction_write_set),
210                                                   MYF(0));
211     result_set->write_set_size= write_set_size;
212     result_set->write_set=
213         (unsigned long long*)my_malloc(key_memory_write_set_extraction,
214                                        write_set_size *
215                                        sizeof(unsigned long long),
216                                        MYF(0));
217     int result_set_index= 0;
218     for (std::set<uint64>::iterator it= write_set->begin();
219          it != write_set->end();
220          ++it)
221     {
222       uint64 temp= *it;
223       result_set->write_set[result_set_index++]=temp;
224     }
225     mysql_mutex_unlock(&thd->LOCK_thd_data);
226   }
227   DBUG_RETURN(result_set);
228 }
229 
add_savepoint(char * name)230 void Rpl_transaction_write_set_ctx::add_savepoint(char* name)
231 {
232   DBUG_ENTER("Rpl_transaction_write_set_ctx::add_savepoint");
233   std::string identifier(name);
234 
235   DBUG_EXECUTE_IF("transaction_write_set_savepoint_clear_on_commit_rollback",
236                   {
237                     assert(savepoint.size() == 0);
238                     assert(write_set.size() == 0);
239                     assert(write_set_unique.size() == 0);
240                     assert(savepoint_list.size() == 0);
241                   });
242 
243   DBUG_EXECUTE_IF("transaction_write_set_savepoint_level",
244                   assert(savepoint.size() == 0););
245 
246   std::map<std::string, size_t>::iterator it;
247 
248   /*
249     Savepoint with the same name, the old savepoint is deleted and a new one
250     is set
251   */
252   if ((it= savepoint.find(name)) != savepoint.end())
253       savepoint.erase(it);
254 
255   savepoint.insert(std::pair<std::string, size_t>(identifier,
256                                                   write_set.size()));
257 
258   DBUG_EXECUTE_IF("transaction_write_set_savepoint_add_savepoint",
259                   assert(savepoint.find(identifier)->second == write_set.size()););
260 
261   DBUG_VOID_RETURN;
262 }
263 
del_savepoint(char * name)264 void Rpl_transaction_write_set_ctx::del_savepoint(char* name)
265 {
266   DBUG_ENTER("Rpl_transaction_write_set_ctx::del_savepoint");
267   std::string identifier(name);
268 
269   DBUG_EXECUTE_IF("transaction_write_set_savepoint_block_before_release",
270                     {
271                       const char act[]= "now wait_for signal.unblock_release";
272                       assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
273                     });
274 
275   savepoint.erase(identifier);
276 
277   DBUG_VOID_RETURN;
278 }
279 
rollback_to_savepoint(char * name)280 void Rpl_transaction_write_set_ctx::rollback_to_savepoint(char* name)
281 {
282   DBUG_ENTER("Rpl_transaction_write_set_ctx::rollback_to_savepoint");
283   size_t position= 0;
284   std::string identifier(name);
285   std::map<std::string, size_t>::iterator elem;
286 
287   if ((elem = savepoint.find(identifier)) != savepoint.end())
288   {
289     assert(elem->second <= write_set.size());
290 
291     DBUG_EXECUTE_IF("transaction_write_set_savepoint_block_before_rollback",
292                     {
293                       const char act[]= "now wait_for signal.unblock_rollback";
294                       assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act)));
295                     });
296 
297     position= elem->second;
298 
299     // Remove all savepoints created after the savepoint identifier given as
300     // parameter
301     std::map<std::string, size_t>::iterator it= savepoint.begin();
302     while (it != savepoint.end())
303     {
304       if (it->second > position)
305         savepoint.erase(it++);
306       else
307         ++it;
308     }
309 
310     /*
311       We need to check that:
312        - starting index of the range we want to erase does exist.
313        - write_set size have elements to be removed
314     */
315     if (write_set.size() > 0 && position < write_set.size())
316     {
317       // Clear all elements after savepoint
318       write_set.erase(write_set.begin() + position, write_set.end());
319       // Since the write_set_unique set does not have insert order, the
320       // elements are ordered according its value, we need to rebuild it.
321       write_set_unique.clear();
322       write_set_unique.insert(write_set.begin(), write_set.end());
323     }
324 
325     DBUG_EXECUTE_IF("transaction_write_set_savepoint_add_savepoint", {
326         assert(write_set.size() == 2);
327         assert(write_set_unique.size() == 2);});
328 
329     DBUG_EXECUTE_IF("transaction_write_set_size_2", {
330         assert(write_set.size() == 4);
331         assert(write_set_unique.size() == 4);});
332   }
333 
334   DBUG_VOID_RETURN;
335 }
336 
337 
reset_savepoint_list()338 void Rpl_transaction_write_set_ctx::reset_savepoint_list()
339 {
340   DBUG_ENTER("Rpl_transaction_write_set_ctx::reset_savepoint_list");
341 
342   savepoint_list.push_back(savepoint);
343   savepoint.clear();
344 
345   DBUG_VOID_RETURN;
346 }
347 
restore_savepoint_list()348 void Rpl_transaction_write_set_ctx::restore_savepoint_list()
349 {
350   DBUG_ENTER("Rpl_transaction_write_set_ctx::restore_savepoint_list");
351 
352   if (!savepoint_list.empty())
353   {
354     savepoint = savepoint_list.back();
355     savepoint_list.pop_back();
356   }
357 
358   DBUG_VOID_RETURN;
359 }
360