1 /* Copyright (c) 2006, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
22 
23 #include "sql_priv.h"
24 #include "unireg.h"                             // REQUIRED by later includes
25 #include "rpl_injector.h"
26 #include "transaction.h"
27 #include "sql_parse.h"                          // begin_trans, end_trans, COMMIT
28 #include "sql_base.h"                           // close_thread_tables
29 #include "log_event.h"                          // Incident_log_event
30 #include "binlog.h"                             // mysql_bin_log
31 
32 /*
33   injector::transaction - member definitions
34 */
35 
36 /* inline since it's called below */
37 inline
transaction(MYSQL_BIN_LOG * log,THD * thd)38 injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
39   : m_state(START_STATE), m_thd(thd)
40 {
41   /*
42      Default initialization of m_start_pos (which initializes it to garbage).
43      We need to fill it in using the code below.
44   */
45   LOG_INFO log_info;
46   log->get_current_log(&log_info);
47   /* !!! binlog_pos does not follow RAII !!! */
48   m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
49   m_start_pos.m_file_pos= log_info.pos;
50 
51   if (unlikely(m_start_pos.m_file_name == NULL))
52   {
53     m_thd= NULL;
54     return;
55   }
56 
57   /*
58      Next pos is unknown until after commit of the Binlog transaction
59   */
60   m_next_pos.m_file_name= 0;
61   m_next_pos.m_file_pos= 0;
62 
63   /*
64     Ensure we don't pick up this thd's last written Binlog pos in
65     empty-transaction-commit cases.
66     This is not ideal, as it zaps this information for any other
67     usage (e.g. WL4047)
68     Potential improvement : save the 'old' next pos prior to
69     commit, and restore on error.
70   */
71   m_thd->clear_next_event_pos();
72 
73   trans_begin(m_thd);
74 }
75 
~transaction()76 injector::transaction::~transaction()
77 {
78   if (!good())
79     return;
80 
81   /* Needed since my_free expects a 'char*' (instead of 'void*'). */
82   char* const start_pos_memory= const_cast<char*>(m_start_pos.m_file_name);
83 
84   if (start_pos_memory)
85   {
86     my_free(start_pos_memory);
87   }
88 
89   char* const next_pos_memory= const_cast<char*>(m_next_pos.m_file_name);
90   if (next_pos_memory)
91   {
92     my_free(next_pos_memory);
93   }
94 }
95 
96 /**
97    @retval 0 transaction committed
98    @retval 1 transaction rolled back
99  */
commit()100 int injector::transaction::commit()
101 {
102    DBUG_ENTER("injector::transaction::commit()");
103    int error= m_thd->binlog_flush_pending_rows_event(true);
104    /*
105      Cluster replication does not preserve statement or
106      transaction boundaries of the master.  Instead, a new
107      transaction on replication slave is started when a new GCI
108      (global checkpoint identifier) is issued, and is committed
109      when the last event of the check point has been received and
110      processed. This ensures consistency of each cluster in
111      cluster replication, and there is no requirement for stronger
112      consistency: MySQL replication is asynchronous with other
113      engines as well.
114 
115      A practical consequence of that is that row level replication
116      stream passed through the injector thread never contains
117      COMMIT events.
118      Here we should preserve the server invariant that there is no
119      outstanding statement transaction when the normal transaction
120      is committed by committing the statement transaction
121      explicitly.
122    */
123    trans_commit_stmt(m_thd);
124    if (!trans_commit(m_thd))
125    {
126      close_thread_tables(m_thd);
127      m_thd->mdl_context.release_transactional_locks();
128    }
129 
130    /* Copy next position out into our next pos member */
131    if ((error == 0) &&
132        (m_thd->binlog_next_event_pos.file_name != NULL) &&
133        ((m_next_pos.m_file_name=
134          my_strdup(m_thd->binlog_next_event_pos.file_name, MYF(0))) != NULL))
135    {
136      m_next_pos.m_file_pos= m_thd->binlog_next_event_pos.pos;
137    }
138    else
139    {
140      /* Error, problem copying etc. */
141      m_next_pos.m_file_name= NULL;
142      m_next_pos.m_file_pos= 0;
143    }
144 
145    DBUG_RETURN(error);
146 }
147 
148 
rollback()149 int injector::transaction::rollback()
150 {
151    DBUG_ENTER("injector::transaction::rollback()");
152    trans_rollback_stmt(m_thd);
153    if (!trans_rollback(m_thd))
154    {
155      close_thread_tables(m_thd);
156      if (!m_thd->locked_tables_mode)
157        m_thd->mdl_context.release_transactional_locks();
158    }
159    DBUG_RETURN(0);
160 }
161 
162 
use_table(server_id_type sid,table tbl)163 int injector::transaction::use_table(server_id_type sid, table tbl)
164 {
165   DBUG_ENTER("injector::transaction::use_table");
166 
167   int error;
168 
169   if ((error= check_state(TABLE_STATE)))
170     DBUG_RETURN(error);
171 
172   server_id_type save_id= m_thd->server_id;
173   m_thd->set_server_id(sid);
174   error= m_thd->binlog_write_table_map(tbl.get_table(),
175                                        tbl.is_transactional(), FALSE);
176   m_thd->set_server_id(save_id);
177   DBUG_RETURN(error);
178 }
179 
180 
write_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record,const uchar * extra_row_info)181 int injector::transaction::write_row (server_id_type sid, table tbl,
182 				      MY_BITMAP const* cols, size_t colcnt,
183 				      record_type record,
184                                       const uchar* extra_row_info)
185 {
186    DBUG_ENTER("injector::transaction::write_row(...)");
187 
188    int error= check_state(ROW_STATE);
189    if (error)
190      DBUG_RETURN(error);
191 
192    server_id_type save_id= m_thd->server_id;
193    m_thd->set_server_id(sid);
194    table::save_sets saveset(tbl, cols, cols);
195 
196    error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
197                                   record, extra_row_info);
198    m_thd->set_server_id(save_id);
199    DBUG_RETURN(error);
200 }
201 
write_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record)202 int injector::transaction::write_row (server_id_type sid, table tbl,
203 				      MY_BITMAP const* cols, size_t colcnt,
204 				      record_type record)
205 {
206   return write_row(sid, tbl, cols, colcnt, record, NULL);
207 }
208 
209 
delete_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record,const uchar * extra_row_info)210 int injector::transaction::delete_row(server_id_type sid, table tbl,
211 				      MY_BITMAP const* cols, size_t colcnt,
212 				      record_type record,
213                                       const uchar* extra_row_info)
214 {
215    DBUG_ENTER("injector::transaction::delete_row(...)");
216 
217    int error= check_state(ROW_STATE);
218    if (error)
219      DBUG_RETURN(error);
220 
221    server_id_type save_id= m_thd->server_id;
222    m_thd->set_server_id(sid);
223    table::save_sets saveset(tbl, cols, cols);
224    error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
225                                    record, extra_row_info);
226    m_thd->set_server_id(save_id);
227    DBUG_RETURN(error);
228 }
229 
delete_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type record)230 int injector::transaction::delete_row(server_id_type sid, table tbl,
231 				      MY_BITMAP const* cols, size_t colcnt,
232 				      record_type record)
233 {
234   return delete_row(sid, tbl, cols, colcnt, record, NULL);
235 }
236 
237 
update_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type before,record_type after,const uchar * extra_row_info)238 int injector::transaction::update_row(server_id_type sid, table tbl,
239 				      MY_BITMAP const* cols, size_t colcnt,
240 				      record_type before, record_type after,
241                                       const uchar* extra_row_info)
242 {
243    DBUG_ENTER("injector::transaction::update_row(...)");
244 
245    int error= check_state(ROW_STATE);
246    if (error)
247      DBUG_RETURN(error);
248 
249    server_id_type save_id= m_thd->server_id;
250    m_thd->set_server_id(sid);
251    // The read- and write sets with autorestore (in the destructor)
252    table::save_sets saveset(tbl, cols, cols);
253 
254    error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
255                                    before, after, extra_row_info);
256    m_thd->set_server_id(save_id);
257    DBUG_RETURN(error);
258 }
259 
update_row(server_id_type sid,table tbl,MY_BITMAP const * cols,size_t colcnt,record_type before,record_type after)260 int injector::transaction::update_row(server_id_type sid, table tbl,
261 				      MY_BITMAP const* cols, size_t colcnt,
262 				      record_type before, record_type after)
263 {
264   return update_row(sid, tbl, cols, colcnt, before, after, NULL);
265 }
266 
start_pos() const267 injector::transaction::binlog_pos injector::transaction::start_pos() const
268 {
269    return m_start_pos;
270 }
271 
next_pos() const272 injector::transaction::binlog_pos injector::transaction::next_pos() const
273 {
274    return m_next_pos;
275 }
276 
277 /*
278   injector - member definitions
279 */
280 
281 /* This constructor is called below */
injector()282 inline injector::injector()
283 {
284 }
285 
286 static injector *s_injector= 0;
instance()287 injector *injector::instance()
288 {
289   if (s_injector == 0)
290     s_injector= new injector;
291   /* "There can be only one [instance]" */
292   return s_injector;
293 }
294 
free_instance()295 void injector::free_instance()
296 {
297   injector *inj = s_injector;
298 
299   if (inj != 0)
300   {
301     s_injector= 0;
302     delete inj;
303   }
304 }
305 
new_trans(THD * thd,injector::transaction * ptr)306 void injector::new_trans(THD *thd, injector::transaction *ptr)
307 {
308    DBUG_ENTER("injector::new_trans(THD *, transaction *)");
309    /*
310      Currently, there is no alternative to using 'mysql_bin_log' since that
311      is hardcoded into the way the handler is using the binary log.
312    */
313    transaction trans(&mysql_bin_log, thd);
314    ptr->swap(trans);
315 
316    DBUG_VOID_RETURN;
317 }
318 
record_incident(THD * thd,Incident incident)319 int injector::record_incident(THD *thd, Incident incident)
320 {
321   Incident_log_event ev(thd, incident);
322   return mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/);
323 }
324 
record_incident(THD * thd,Incident incident,LEX_STRING const message)325 int injector::record_incident(THD *thd, Incident incident, LEX_STRING const message)
326 {
327   Incident_log_event ev(thd, incident, message);
328   return mysql_bin_log.write_incident(&ev, true/*need_lock_log=true*/);
329 }
330