1 /*
2 Copyright (c) 2005, 2013, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #include "sql_binlog.h"
20 #include "sql_parse.h"
21 #include "sql_acl.h"
22 #include "rpl_rli.h"
23 #include "slave.h"
24 #include "log_event.h"
25
26
27 /**
28 Check if the event type is allowed in a BINLOG statement.
29
30 @retval 0 if the event type is ok.
31 @retval 1 if the event type is not ok.
32 */
check_event_type(int type,Relay_log_info * rli)33 static int check_event_type(int type, Relay_log_info *rli)
34 {
35 Format_description_log_event *fd_event=
36 rli->relay_log.description_event_for_exec;
37
38 /*
39 Convert event type id of certain old versions (see comment in
40 Format_description_log_event::Format_description_log_event(char*,...)).
41 */
42 if (fd_event && fd_event->event_type_permutation)
43 {
44 IF_DBUG({
45 int new_type= fd_event->event_type_permutation[type];
46 DBUG_PRINT("info",
47 ("converting event type %d to %d (%s)",
48 type, new_type,
49 Log_event::get_type_str((Log_event_type)new_type)));
50 },
51 (void)0);
52 type= fd_event->event_type_permutation[type];
53 }
54
55 switch (type)
56 {
57 case START_EVENT_V3:
58 case FORMAT_DESCRIPTION_EVENT:
59 /*
60 We need a preliminary FD event in order to parse the FD event,
61 if we don't already have one.
62 */
63 if (!fd_event)
64 if (!(rli->relay_log.description_event_for_exec=
65 new Format_description_log_event(4)))
66 {
67 my_error(ER_OUTOFMEMORY, MYF(0), 1);
68 return 1;
69 }
70
71 /* It is always allowed to execute FD events. */
72 return 0;
73
74 case TABLE_MAP_EVENT:
75 case WRITE_ROWS_EVENT_V1:
76 case UPDATE_ROWS_EVENT_V1:
77 case DELETE_ROWS_EVENT_V1:
78 case WRITE_ROWS_EVENT:
79 case UPDATE_ROWS_EVENT:
80 case DELETE_ROWS_EVENT:
81 case PRE_GA_WRITE_ROWS_EVENT:
82 case PRE_GA_UPDATE_ROWS_EVENT:
83 case PRE_GA_DELETE_ROWS_EVENT:
84 /*
85 Row events are only allowed if a Format_description_event has
86 already been seen.
87 */
88 if (fd_event)
89 return 0;
90 else
91 {
92 my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
93 MYF(0), Log_event::get_type_str((Log_event_type)type));
94 return 1;
95 }
96 break;
97
98 default:
99 /*
100 It is not meaningful to execute other events than row-events and
101 FD events. It would even be dangerous to execute Stop_log_event
102 and Rotate_log_event since they call Relay_log_info::flush(), which
103 is not allowed to call by other threads than the slave SQL
104 thread when the slave SQL thread is running.
105 */
106 my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
107 MYF(0), Log_event::get_type_str((Log_event_type)type));
108 return 1;
109 }
110 }
111
112 /**
113 Copy fragments into the standard placeholder thd->lex->comment.str.
114
115 Compute the size of the (still) encoded total,
116 allocate and then copy fragments one after another.
117 The size can exceed max(max_allowed_packet) which is not a
118 problem as no String instance is created off this char array.
119
120 @param thd THD handle
121 @return
122 0 at success,
123 -1 otherwise.
124 */
binlog_defragment(THD * thd)125 int binlog_defragment(THD *thd)
126 {
127 user_var_entry *entry[2];
128 LEX_CSTRING name[2]= { thd->lex->comment, thd->lex->ident };
129
130 /* compute the total size */
131 thd->lex->comment.str= NULL;
132 thd->lex->comment.length= 0;
133 for (uint k= 0; k < 2; k++)
134 {
135 entry[k]=
136 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
137 name[k].length);
138 if (!entry[k] || entry[k]->type != STRING_RESULT)
139 {
140 my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
141 return -1;
142 }
143 thd->lex->comment.length += entry[k]->length;
144 }
145
146 thd->lex->comment.str= // to be freed by the caller
147 (char *) my_malloc(PSI_INSTRUMENT_ME, thd->lex->comment.length, MYF(MY_WME));
148 if (!thd->lex->comment.str)
149 {
150 my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);
151 return -1;
152 }
153
154 /* fragments are merged into allocated buf while the user var:s get reset */
155 size_t gathered_length= 0;
156 for (uint k=0; k < 2; k++)
157 {
158 memcpy(const_cast<char*>(thd->lex->comment.str) + gathered_length, entry[k]->value,
159 entry[k]->length);
160 gathered_length += entry[k]->length;
161 }
162 for (uint k=0; k < 2; k++)
163 update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0);
164
165 DBUG_ASSERT(gathered_length == thd->lex->comment.length);
166
167 return 0;
168 }
169
170
171 /**
172 Execute a BINLOG statement.
173
174 To execute the BINLOG command properly the server needs to know
175 which format the BINLOG command's event is in. Therefore, the first
176 BINLOG statement seen must be a base64 encoding of the
177 Format_description_log_event, as outputted by mysqlbinlog. This
178 Format_description_log_event is cached in
179 rli->description_event_for_exec.
180
181 @param thd Pointer to THD object for the client thread executing the
182 statement.
183 */
184
mysql_client_binlog_statement(THD * thd)185 void mysql_client_binlog_statement(THD* thd)
186 {
187 DBUG_ENTER("mysql_client_binlog_statement");
188 DBUG_PRINT("info",("binlog base64: '%*s'",
189 (int) (thd->lex->comment.length < 2048 ?
190 thd->lex->comment.length : 2048),
191 thd->lex->comment.str));
192
193 if (check_global_access(thd, PRIV_STMT_BINLOG))
194 DBUG_VOID_RETURN;
195
196 /*
197 option_bits will be changed when applying the event. But we don't expect
198 it be changed permanently after BINLOG statement, so backup it first.
199 It will be restored at the end of this function.
200 */
201 ulonglong thd_options= thd->variables.option_bits;
202
203 /*
204 Allocation
205 */
206
207 int err;
208 Relay_log_info *rli;
209 rpl_group_info *rgi;
210 char *buf= NULL;
211 size_t coded_len= 0, decoded_len= 0;
212
213 rli= thd->rli_fake;
214 if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE, "BINLOG_BASE64_EVENT")))
215 rli->sql_driver_thd= thd;
216 if (!(rgi= thd->rgi_fake))
217 rgi= thd->rgi_fake= new rpl_group_info(rli);
218 rgi->thd= thd;
219
220 const char *error= 0;
221 Log_event *ev = 0;
222 my_bool is_fragmented= FALSE;
223
224 /*
225 Out of memory check
226 */
227 if (!(rli))
228 {
229 my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1); /* needed 1 bytes */
230 goto end;
231 }
232
233 DBUG_ASSERT(rli->belongs_to_client());
234
235 if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
236 if (binlog_defragment(thd))
237 goto end;
238
239 if (!(coded_len= thd->lex->comment.length))
240 {
241 my_error(ER_SYNTAX_ERROR, MYF(0));
242 goto end;
243 }
244
245 decoded_len= my_base64_needed_decoded_length((int)coded_len);
246 if (!(buf= (char *) my_malloc(key_memory_binlog_statement_buffer,
247 decoded_len, MYF(MY_WME))))
248 {
249 my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);
250 goto end;
251 }
252
253 for (char const *strptr= thd->lex->comment.str ;
254 strptr < thd->lex->comment.str + thd->lex->comment.length ; )
255 {
256 char const *endptr= 0;
257 int bytes_decoded= my_base64_decode(strptr, coded_len, buf, &endptr,
258 MY_BASE64_DECODE_ALLOW_MULTIPLE_CHUNKS);
259
260 #ifndef HAVE_valgrind
261 /*
262 This debug printout should not be used for valgrind builds
263 since it will read from unassigned memory.
264 */
265 DBUG_PRINT("info",
266 ("bytes_decoded: %d strptr: %p endptr: %p ('%c':%d)",
267 bytes_decoded, strptr, endptr, *endptr,
268 *endptr));
269 #endif
270
271 if (bytes_decoded < 0)
272 {
273 my_error(ER_BASE64_DECODE_ERROR, MYF(0));
274 goto end;
275 }
276 else if (bytes_decoded == 0)
277 break; // If no bytes where read, the string contained only whitespace
278
279 DBUG_ASSERT(bytes_decoded > 0);
280 DBUG_ASSERT(endptr > strptr);
281 coded_len-= endptr - strptr;
282 strptr= endptr;
283
284 /*
285 Now we have one or more events stored in the buffer. The size of
286 the buffer is computed based on how much base64-encoded data
287 there were, so there should be ample space for the data (maybe
288 even too much, since a statement can consist of a considerable
289 number of events).
290
291 TODO: Switch to use a stream-based base64 encoder/decoder in
292 order to be able to read exactly what is necessary.
293 */
294
295 DBUG_PRINT("info",("binlog base64 decoded_len: %lu bytes_decoded: %d",
296 (ulong) decoded_len, bytes_decoded));
297
298 /*
299 Now we start to read events of the buffer, until there are no
300 more.
301 */
302 for (char *bufptr= buf ; bytes_decoded > 0 ; )
303 {
304 /*
305 Checking that the first event in the buffer is not truncated.
306 */
307 ulong event_len;
308 if (bytes_decoded < EVENT_LEN_OFFSET + 4 ||
309 (event_len= uint4korr(bufptr + EVENT_LEN_OFFSET)) >
310 (uint) bytes_decoded)
311 {
312 my_error(ER_SYNTAX_ERROR, MYF(0));
313 goto end;
314 }
315 DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
316 event_len, bytes_decoded));
317
318 if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
319 goto end;
320
321 ev= Log_event::read_log_event(bufptr, event_len, &error,
322 rli->relay_log.description_event_for_exec,
323 0);
324
325 DBUG_PRINT("info",("binlog base64 err=%s", error));
326 if (!ev)
327 {
328 /*
329 This could actually be an out-of-memory, but it is more likely
330 caused by a bad statement
331 */
332 my_error(ER_SYNTAX_ERROR, MYF(0));
333 goto end;
334 }
335
336 bytes_decoded -= event_len;
337 bufptr += event_len;
338
339 DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
340 ev->thd= thd;
341 /*
342 We go directly to the application phase, since we don't need
343 to check if the event shall be skipped or not.
344
345 Neither do we have to update the log positions, since that is
346 not used at all: the rli_fake instance is used only for error
347 reporting.
348 */
349 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
350 ulonglong save_skip_replication=
351 thd->variables.option_bits & OPTION_SKIP_REPLICATION;
352 thd->variables.option_bits=
353 (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
354 (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
355 OPTION_SKIP_REPLICATION : 0);
356
357 err= ev->apply_event(rgi);
358
359 thd->variables.option_bits=
360 (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
361 save_skip_replication;
362 #else
363 err= 0;
364 #endif
365 /*
366 Format_description_log_event should not be deleted because it
367 will be used to read info about the relay log's format; it
368 will be deleted when the SQL thread does not need it,
369 i.e. when this thread terminates.
370 */
371 if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
372 delete ev;
373 ev= 0;
374 if (err)
375 {
376 /*
377 TODO: Maybe a better error message since the BINLOG statement
378 now contains several events.
379 */
380 my_error(ER_UNKNOWN_ERROR, MYF(0));
381 goto end;
382 }
383 }
384 }
385
386
387 DBUG_PRINT("info",("binlog base64 execution finished successfully"));
388 my_ok(thd);
389
390 end:
391 if (unlikely(is_fragmented))
392 my_free(const_cast<char*>(thd->lex->comment.str));
393 thd->variables.option_bits= thd_options;
394 rgi->slave_close_thread_tables(thd);
395 my_free(buf);
396 DBUG_VOID_RETURN;
397 }
398