1 /* Copyright (c) 2006, 2011, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql_priv.h"
24 #include "unireg.h" // REQUIRED by later includes
25 #include "rpl_injector.h"
26 #include "transaction.h"
27 #include "sql_parse.h" // begin_trans, end_trans, COMMIT
28 #include "sql_base.h" // close_thread_tables
29 #include "log_event.h" // Incident_log_event
30 #include "binlog.h" // mysql_bin_log
31
32 /*
33 injector::transaction - member definitions
34 */
35
36 /* inline since it's called below */
37 inline
transaction(MYSQL_BIN_LOG * log,THD * thd)38 injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
39 : m_state(START_STATE), m_thd(thd)
40 {
41 /*
42 Default initialization of m_start_pos (which initializes it to garbage).
43 We need to fill it in using the code below.
44 */
45 LOG_INFO log_info;
46 log->get_current_log(&log_info);
47 /* !!! binlog_pos does not follow RAII !!! */
48 m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
49 m_start_pos.m_file_pos= log_info.pos;
50
51 if (unlikely(m_start_pos.m_file_name == NULL))
52 {
53 m_thd= NULL;
54 return;
55 }
56
57 /*
58 Next pos is unknown until after commit of the Binlog transaction
59 */
60 m_next_pos.m_file_name= 0;
61 m_next_pos.m_file_pos= 0;
62
63 /*
64 Ensure we don't pick up this thd's last written Binlog pos in
65 empty-transaction-commit cases.
66 This is not ideal, as it zaps this information for any other
67 usage (e.g. WL4047)
68 Potential improvement : save the 'old' next pos prior to
69 commit, and restore on error.
70 */
71 m_thd->clear_next_event_pos();
72
73 trans_begin(m_thd);
74 }
75
~transaction()76 injector::transaction::~transaction()
77 {
78 if (!good())
79 return;
80
81 /* Needed since my_free expects a 'char*' (instead of 'void*'). */
82 char* const start_pos_memory= const_cast<char*>(m_start_pos.m_file_name);
83
84 if (start_pos_memory)
85 {
86 my_free(start_pos_memory);
87 }
88
89 char* const next_pos_memory= const_cast<char*>(m_next_pos.m_file_name);
90 if (next_pos_memory)
91 {
92 my_free(next_pos_memory);
93 }
94 }
95
96 /**
97 @retval 0 transaction committed
98 @retval 1 transaction rolled back
99 */
commit()100 int injector::transaction::commit()
101 {
102 DBUG_ENTER("injector::transaction::commit()");
103 int error= m_thd->binlog_flush_pending_rows_event(true);
104 /*
105 Cluster replication does not preserve statement or
106 transaction boundaries of the master. Instead, a new
107 transaction on replication slave is started when a new GCI
108 (global checkpoint identifier) is issued, and is committed
109 when the last event of the check point has been received and
110 processed. This ensures consistency of each cluster in
111 cluster replication, and there is no requirement for stronger
112 consistency: MySQL replication is asynchronous with other
113 engines as well.
114
115 A practical consequence of that is that row level replication
116 stream passed through the injector thread never contains
117 COMMIT events.
118 Here we should preserve the server invariant that there is no
119 outstanding statement transaction when the normal transaction
120 is committed by committing the statement transaction
121 explicitly.
122 */
123 trans_commit_stmt(m_thd);
124 if (!trans_commit(m_thd))
125 {
126 close_thread_tables(m_thd);
127 m_thd->mdl_context.release_transactional_locks();
128 }
129
130 /* Copy next position out into our next pos member */
131 if ((error == 0) &&
132 (m_thd->binlog_next_event_pos.file_name != NULL) &&
133 ((m_next_pos.m_file_name=
134 my_strdup(m_thd->binlog_next_event_pos.file_name, MYF(0))) != NULL))
135 {
136 m_next_pos.m_file_pos= m_thd->binlog_next_event_pos.pos;
137 }
138 else
139 {
140 /* Error, problem copying etc. */
141 m_next_pos.m_file_name= NULL;
142 m_next_pos.m_file_pos= 0;
143 }
144
145 DBUG_RETURN(error);
146 }
147
148
rollback()149 int injector::transaction::rollback()
150 {
151 DBUG_ENTER("injector::transaction::rollback()");
152 trans_rollback_stmt(m_thd);
153 if (!trans_rollback(m_thd))
154 {
155 close_thread_tables(m_thd);
156 if (!m_thd->locked_tables_mode)
157 m_thd->mdl_context.release_transactional_locks();
158 }
159 DBUG_RETURN(0);
160 }
161
162
use_table(server_id_type sid,table tbl)163 int injector::transaction::use_table(server_id_type sid, table tbl)
164 {
165 DBUG_ENTER("injector::transaction::use_table");
166
167 int error;
168
169 if ((error= check_state(TABLE_STATE)))
170 DBUG_RETURN(error);
171
172 server_id_type save_id= m_thd->server_id;
173 m_thd->set_server_id(sid);
174 error= m_thd->binlog_write_table_map(tbl.get_table(),
175 tbl.is_transactional(), FALSE);
176 m_thd->set_server_id(save_id);
177 DBUG_RETURN(error);
178 }
179
180
write_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record,const uchar * extra_row_info)181 int injector::transaction::write_row (server_id_type sid, table tbl,
182 MY_BITMAP const* cols, size_t colcnt,
183 record_type record,
184 const uchar* extra_row_info)
185 {
186 DBUG_ENTER("injector::transaction::write_row(...)");
187
188 int error= check_state(ROW_STATE);
189 if (error)
190 DBUG_RETURN(error);
191
192 server_id_type save_id= m_thd->server_id;
193 m_thd->set_server_id(sid);
194 table::save_sets saveset(tbl, cols, cols);
195
196 error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
197 record, extra_row_info);
198 m_thd->set_server_id(save_id);
199 DBUG_RETURN(error);
200 }
201
write_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record)202 int injector::transaction::write_row (server_id_type sid, table tbl,
203 MY_BITMAP const* cols, size_t colcnt,
204 record_type record)
205 {
206 return write_row(sid, tbl, cols, colcnt, record, NULL);
207 }
208
209
delete_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record,const uchar * extra_row_info)210 int injector::transaction::delete_row(server_id_type sid, table tbl,
211 MY_BITMAP const* cols, size_t colcnt,
212 record_type record,
213 const uchar* extra_row_info)
214 {
215 DBUG_ENTER("injector::transaction::delete_row(...)");
216
217 int error= check_state(ROW_STATE);
218 if (error)
219 DBUG_RETURN(error);
220
221 server_id_type save_id= m_thd->server_id;
222 m_thd->set_server_id(sid);
223 table::save_sets saveset(tbl, cols, cols);
224 error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
225 record, extra_row_info);
226 m_thd->set_server_id(save_id);
227 DBUG_RETURN(error);
228 }
229
delete_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record)230 int injector::transaction::delete_row(server_id_type sid, table tbl,
231 MY_BITMAP const* cols, size_t colcnt,
232 record_type record)
233 {
234 return delete_row(sid, tbl, cols, colcnt, record, NULL);
235 }
236
237
update_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type before,record_type after,const uchar * extra_row_info)238 int injector::transaction::update_row(server_id_type sid, table tbl,
239 MY_BITMAP const* cols, size_t colcnt,
240 record_type before, record_type after,
241 const uchar* extra_row_info)
242 {
243 DBUG_ENTER("injector::transaction::update_row(...)");
244
245 int error= check_state(ROW_STATE);
246 if (error)
247 DBUG_RETURN(error);
248
249 server_id_type save_id= m_thd->server_id;
250 m_thd->set_server_id(sid);
251 // The read- and write sets with autorestore (in the destructor)
252 table::save_sets saveset(tbl, cols, cols);
253
254 error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
255 before, after, extra_row_info);
256 m_thd->set_server_id(save_id);
257 DBUG_RETURN(error);
258 }
259
update_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type before,record_type after)260 int injector::transaction::update_row(server_id_type sid, table tbl,
261 MY_BITMAP const* cols, size_t colcnt,
262 record_type before, record_type after)
263 {
264 return update_row(sid, tbl, cols, colcnt, before, after, NULL);
265 }
266
start_pos() const267 injector::transaction::binlog_pos injector::transaction::start_pos() const
268 {
269 return m_start_pos;
270 }
271
next_pos() const272 injector::transaction::binlog_pos injector::transaction::next_pos() const
273 {
274 return m_next_pos;
275 }
276
277 /*
278 injector - member definitions
279 */
280
281 /* This constructor is called below */
injector()282 inline injector::injector()
283 {
284 }
285
286 static injector *s_injector= 0;
instance()287 injector *injector::instance()
288 {
289 if (s_injector == 0)
290 s_injector= new injector;
291 /* "There can be only one [instance]" */
292 return s_injector;
293 }
294
free_instance()295 void injector::free_instance()
296 {
297 injector *inj = s_injector;
298
299 if (inj != 0)
300 {
301 s_injector= 0;
302 delete inj;
303 }
304 }
305
new_trans(THD * thd,injector::transaction * ptr)306 void injector::new_trans(THD *thd, injector::transaction *ptr)
307 {
308 DBUG_ENTER("injector::new_trans(THD *, transaction *)");
309 /*
310 Currently, there is no alternative to using 'mysql_bin_log' since that
311 is hardcoded into the way the handler is using the binary log.
312 */
313 transaction trans(&mysql_bin_log, thd);
314 ptr->swap(trans);
315
316 DBUG_VOID_RETURN;
317 }
318
record_incident(THD * thd,Incident incident)319 int injector::record_incident(THD *thd, Incident incident)
320 {
321 Incident_log_event ev(thd, incident);
322 return mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/);
323 }
324
record_incident(THD * thd,Incident incident,LEX_STRING const message)325 int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
326 {
327 Incident_log_event ev(thd, incident, message);
328 return mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/);
329 }
330