1 /* Copyright (c) 2006, 2011, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1335  USA */
15 
16 #include "mariadb.h"
17 #include "sql_priv.h"
18 #include "rpl_injector.h"
19 #include "transaction.h"
20 #include "sql_parse.h"                          // begin_trans, end_trans, COMMIT
21 #include "sql_base.h"                           // close_thread_tables
22 #include "log_event.h"                          // Incident_log_event
23 
24 /*
25   injector::transaction - member definitions
26 */
27 
28 /* inline since it's called below */
29 inline
transaction(MYSQL_BIN_LOG * log,THD * thd)30 injector::transaction::transaction(MYSQL_BIN_LOG *log, THD *thd)
31   : m_state(START_STATE), m_thd(thd)
32 {
33   /*
34      Default initialization of m_start_pos (which initializes it to garbage).
35      We need to fill it in using the code below.
36   */
37   LOG_INFO log_info;
38   log->get_current_log(&log_info);
39   /* !!! binlog_pos does not follow RAII !!! */
40   m_start_pos.m_file_name= my_strdup(log_info.log_file_name, MYF(0));
41   m_start_pos.m_file_pos= log_info.pos;
42 
43   m_thd->lex->start_transaction_opt= 0; /* for begin_trans() */
44   trans_begin(m_thd);
45 }
46 
~transaction()47 injector::transaction::~transaction()
48 {
49   if (!good())
50     return;
51 
52   /* Needed since my_free expects a 'char*' (instead of 'void*'). */
53   char* const the_memory= const_cast<char*>(m_start_pos.m_file_name);
54 
55   /*
56     We set the first character to null just to give all the copies of the
57     start position a (minimal) chance of seening that the memory is lost.
58     All assuming the my_free does not step over the memory, of course.
59   */
60   *the_memory= '\0';
61 
62   my_free(the_memory);
63 }
64 
65 /**
66    @retval 0 transaction committed
67    @retval 1 transaction rolled back
68  */
commit()69 int injector::transaction::commit()
70 {
71   DBUG_ENTER("injector::transaction::commit()");
72   int error= m_thd->binlog_flush_pending_rows_event(true);
73   /*
74     Cluster replication does not preserve statement or
75     transaction boundaries of the master.  Instead, a new
76     transaction on replication slave is started when a new GCI
77     (global checkpoint identifier) is issued, and is committed
78     when the last event of the check point has been received and
79     processed. This ensures consistency of each cluster in
80     cluster replication, and there is no requirement for stronger
81     consistency: MySQL replication is asynchronous with other
82     engines as well.
83 
84     A practical consequence of that is that row level replication
85     stream passed through the injector thread never contains
86     COMMIT events.
87     Here we should preserve the server invariant that there is no
88     outstanding statement transaction when the normal transaction
89     is committed by committing the statement transaction
90     explicitly.
91   */
92   trans_commit_stmt(m_thd);
93   if (!trans_commit(m_thd))
94   {
95     close_thread_tables(m_thd);
96     m_thd->release_transactional_locks();
97   }
98   DBUG_RETURN(error);
99 }
100 
101 
use_table(server_id_type sid,table tbl)102 int injector::transaction::use_table(server_id_type sid, table tbl)
103 {
104   DBUG_ENTER("injector::transaction::use_table");
105 
106   int error;
107 
108   if (unlikely((error= check_state(TABLE_STATE))))
109     DBUG_RETURN(error);
110 
111   server_id_type save_id= m_thd->variables.server_id;
112   m_thd->set_server_id(sid);
113   error= m_thd->binlog_write_table_map(tbl.get_table(),
114                                        tbl.is_transactional());
115   m_thd->set_server_id(save_id);
116   DBUG_RETURN(error);
117 }
118 
119 
120 
start_pos() const121 injector::transaction::binlog_pos injector::transaction::start_pos() const
122 {
123    return m_start_pos;
124 }
125 
126 
127 /*
128   injector - member definitions
129 */
130 
131 /* This constructor is called below */
injector()132 inline injector::injector()
133 {
134 }
135 
136 static injector *s_injector= 0;
instance()137 injector *injector::instance()
138 {
139   if (s_injector == 0)
140     s_injector= new injector;
141   /* "There can be only one [instance]" */
142   return s_injector;
143 }
144 
free_instance()145 void injector::free_instance()
146 {
147   injector *inj = s_injector;
148 
149   if (inj != 0)
150   {
151     s_injector= 0;
152     delete inj;
153   }
154 }
155 
156 
new_trans(THD * thd)157 injector::transaction injector::new_trans(THD *thd)
158 {
159    DBUG_ENTER("injector::new_trans(THD*)");
160    /*
161      Currently, there is no alternative to using 'mysql_bin_log' since that
162      is hardcoded into the way the handler is using the binary log.
163    */
164    DBUG_RETURN(transaction(&mysql_bin_log, thd));
165 }
166 
new_trans(THD * thd,injector::transaction * ptr)167 void injector::new_trans(THD *thd, injector::transaction *ptr)
168 {
169    DBUG_ENTER("injector::new_trans(THD *, transaction *)");
170    /*
171      Currently, there is no alternative to using 'mysql_bin_log' since that
172      is hardcoded into the way the handler is using the binary log.
173    */
174    transaction trans(&mysql_bin_log, thd);
175    ptr->swap(trans);
176 
177    DBUG_VOID_RETURN;
178 }
179 
record_incident(THD * thd,Incident incident)180 int injector::record_incident(THD *thd, Incident incident)
181 {
182   Incident_log_event ev(thd, incident);
183   int error;
184   if (unlikely((error= mysql_bin_log.write(&ev))))
185     return error;
186   return mysql_bin_log.rotate_and_purge(true);
187 }
188 
record_incident(THD * thd,Incident incident,const LEX_CSTRING * message)189 int injector::record_incident(THD *thd, Incident incident,
190                               const LEX_CSTRING *message)
191 {
192   Incident_log_event ev(thd, incident, message);
193   int error;
194   if (unlikely((error= mysql_bin_log.write(&ev))))
195     return error;
196   return mysql_bin_log.rotate_and_purge(true);
197 }
198