1 /*
2 Copyright (c) 2005, 2013, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #include "sql_binlog.h"
20 #include "sql_parse.h"
21 #include "sql_acl.h"
22 #include "rpl_rli.h"
23 #include "slave.h"
24 #include "log_event.h"
25
26
27 /**
28 Check if the event type is allowed in a BINLOG statement.
29
30 @retval 0 if the event type is ok.
31 @retval 1 if the event type is not ok.
32 */
check_event_type(int type,Relay_log_info * rli)33 static int check_event_type(int type, Relay_log_info *rli)
34 {
35 Format_description_log_event *fd_event=
36 rli->relay_log.description_event_for_exec;
37
38 /*
39 Convert event type id of certain old versions (see comment in
40 Format_description_log_event::Format_description_log_event(char*,...)).
41 */
42 if (fd_event && fd_event->event_type_permutation)
43 {
44 IF_DBUG({
45 int new_type= fd_event->event_type_permutation[type];
46 DBUG_PRINT("info",
47 ("converting event type %d to %d (%s)",
48 type, new_type,
49 Log_event::get_type_str((Log_event_type)new_type)));
50 },
51 (void)0);
52 type= fd_event->event_type_permutation[type];
53 }
54
55 switch (type)
56 {
57 case START_EVENT_V3:
58 case FORMAT_DESCRIPTION_EVENT:
59 /*
60 We need a preliminary FD event in order to parse the FD event,
61 if we don't already have one.
62 */
63 if (!fd_event)
64 if (!(rli->relay_log.description_event_for_exec=
65 new Format_description_log_event(4)))
66 {
67 my_error(ER_OUTOFMEMORY, MYF(0), 1);
68 return 1;
69 }
70
71 /* It is always allowed to execute FD events. */
72 return 0;
73
74 case TABLE_MAP_EVENT:
75 case WRITE_ROWS_EVENT_V1:
76 case UPDATE_ROWS_EVENT_V1:
77 case DELETE_ROWS_EVENT_V1:
78 case WRITE_ROWS_EVENT:
79 case UPDATE_ROWS_EVENT:
80 case DELETE_ROWS_EVENT:
81 case PRE_GA_WRITE_ROWS_EVENT:
82 case PRE_GA_UPDATE_ROWS_EVENT:
83 case PRE_GA_DELETE_ROWS_EVENT:
84 /*
85 Row events are only allowed if a Format_description_event has
86 already been seen.
87 */
88 if (fd_event)
89 return 0;
90 else
91 {
92 my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
93 MYF(0), Log_event::get_type_str((Log_event_type)type));
94 return 1;
95 }
96 break;
97
98 default:
99 /*
100 It is not meaningful to execute other events than row-events and
101 FD events. It would even be dangerous to execute Stop_log_event
102 and Rotate_log_event since they call Relay_log_info::flush(), which
103 is not allowed to call by other threads than the slave SQL
104 thread when the slave SQL thread is running.
105 */
106 my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
107 MYF(0), Log_event::get_type_str((Log_event_type)type));
108 return 1;
109 }
110 }
111
112 /**
113 Copy fragments into the standard placeholder thd->lex->comment.str.
114
115 Compute the size of the (still) encoded total,
116 allocate and then copy fragments one after another.
117 The size can exceed max(max_allowed_packet) which is not a
118 problem as no String instance is created off this char array.
119
120 @param thd THD handle
121 @return
122 0 at success,
123 -1 otherwise.
124 */
binlog_defragment(THD * thd)125 int binlog_defragment(THD *thd)
126 {
127 user_var_entry *entry[2];
128 LEX_CSTRING name[2]= { thd->lex->comment, thd->lex->ident };
129
130 /* compute the total size */
131 thd->lex->comment.str= NULL;
132 thd->lex->comment.length= 0;
133 for (uint k= 0; k < 2; k++)
134 {
135 entry[k]=
136 (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
137 name[k].length);
138 if (!entry[k] || entry[k]->type != STRING_RESULT)
139 {
140 my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
141 return -1;
142 }
143 thd->lex->comment.length += entry[k]->length;
144 }
145
146 thd->lex->comment.str= // to be freed by the caller
147 (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME));
148 if (!thd->lex->comment.str)
149 {
150 my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
151 return -1;
152 }
153
154 /* fragments are merged into allocated buf while the user var:s get reset */
155 size_t gathered_length= 0;
156 for (uint k=0; k < 2; k++)
157 {
158 memcpy(const_cast<char*>(thd->lex->comment.str) + gathered_length, entry[k]->value,
159 entry[k]->length);
160 gathered_length += entry[k]->length;
161 }
162 for (uint k=0; k < 2; k++)
163 update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0);
164
165 DBUG_ASSERT(gathered_length == thd->lex->comment.length);
166
167 return 0;
168 }
169
170
171 /**
172 Execute a BINLOG statement.
173
174 To execute the BINLOG command properly the server needs to know
175 which format the BINLOG command's event is in. Therefore, the first
176 BINLOG statement seen must be a base64 encoding of the
177 Format_description_log_event, as outputted by mysqlbinlog. This
178 Format_description_log_event is cached in
179 rli->description_event_for_exec.
180
181 @param thd Pointer to THD object for the client thread executing the
182 statement.
183 */
184
mysql_client_binlog_statement(THD * thd)185 void mysql_client_binlog_statement(THD* thd)
186 {
187 DBUG_ENTER("mysql_client_binlog_statement");
188 DBUG_PRINT("info",("binlog base64: '%*s'",
189 (int) (thd->lex->comment.length < 2048 ?
190 thd->lex->comment.length : 2048),
191 thd->lex->comment.str));
192
193 if (check_global_access(thd, SUPER_ACL))
194 DBUG_VOID_RETURN;
195
196 /*
197 option_bits will be changed when applying the event. But we don't expect
198 it be changed permanently after BINLOG statement, so backup it first.
199 It will be restored at the end of this function.
200 */
201 ulonglong thd_options= thd->variables.option_bits;
202
203 /*
204 Allocation
205 */
206
207 int err;
208 Relay_log_info *rli;
209 rpl_group_info *rgi;
210 char *buf= NULL;
211 size_t coded_len= 0, decoded_len= 0;
212
213 rli= thd->rli_fake;
214 if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE)))
215 rli->sql_driver_thd= thd;
216 if (!(rgi= thd->rgi_fake))
217 rgi= thd->rgi_fake= new rpl_group_info(rli);
218 rgi->thd= thd;
219
220 const char *error= 0;
221 Log_event *ev = 0;
222 my_bool is_fragmented= FALSE;
223
224 /*
225 Out of memory check
226 */
227 if (!(rli))
228 {
229 my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); /* needed 1 bytes */
230 goto end;
231 }
232
233 DBUG_ASSERT(rli->belongs_to_client());
234
235 if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
236 if (binlog_defragment(thd))
237 goto end;
238
239 if (!(coded_len= thd->lex->comment.length))
240 {
241 my_error(ER_SYNTAX_ERROR, MYF(0));
242 goto end;
243 }
244
245 decoded_len= my_base64_needed_decoded_length((int)coded_len);
246 if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME))))
247 {
248 my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
249 goto end;
250 }
251
252 for (char const *strptr= thd->lex->comment.str ;
253 strptr < thd->lex->comment.str + thd->lex->comment.length ; )
254 {
255 char const *endptr= 0;
256 int bytes_decoded= my_base64_decode(strptr, coded_len, buf, &endptr,
257 MY_BASE64_DECODE_ALLOW_MULTIPLE_CHUNKS);
258
259 #ifndef HAVE_valgrind
260 /*
261 This debug printout should not be used for valgrind builds
262 since it will read from unassigned memory.
263 */
264 DBUG_PRINT("info",
265 ("bytes_decoded: %d strptr: %p endptr: %p ('%c':%d)",
266 bytes_decoded, strptr, endptr, *endptr,
267 *endptr));
268 #endif
269
270 if (bytes_decoded < 0)
271 {
272 my_error(ER_BASE64_DECODE_ERROR, MYF(0));
273 goto end;
274 }
275 else if (bytes_decoded == 0)
276 break; // If no bytes where read, the string contained only whitespace
277
278 DBUG_ASSERT(bytes_decoded > 0);
279 DBUG_ASSERT(endptr > strptr);
280 coded_len-= endptr - strptr;
281 strptr= endptr;
282
283 /*
284 Now we have one or more events stored in the buffer. The size of
285 the buffer is computed based on how much base64-encoded data
286 there were, so there should be ample space for the data (maybe
287 even too much, since a statement can consist of a considerable
288 number of events).
289
290 TODO: Switch to use a stream-based base64 encoder/decoder in
291 order to be able to read exactly what is necessary.
292 */
293
294 DBUG_PRINT("info",("binlog base64 decoded_len: %lu bytes_decoded: %d",
295 (ulong) decoded_len, bytes_decoded));
296
297 /*
298 Now we start to read events of the buffer, until there are no
299 more.
300 */
301 for (char *bufptr= buf ; bytes_decoded > 0 ; )
302 {
303 /*
304 Checking that the first event in the buffer is not truncated.
305 */
306 ulong event_len;
307 if (bytes_decoded < EVENT_LEN_OFFSET + 4 ||
308 (event_len= uint4korr(bufptr + EVENT_LEN_OFFSET)) >
309 (uint) bytes_decoded)
310 {
311 my_error(ER_SYNTAX_ERROR, MYF(0));
312 goto end;
313 }
314 DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
315 event_len, bytes_decoded));
316
317 if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
318 goto end;
319
320 ev= Log_event::read_log_event(bufptr, event_len, &error,
321 rli->relay_log.description_event_for_exec,
322 0);
323
324 DBUG_PRINT("info",("binlog base64 err=%s", error));
325 if (!ev)
326 {
327 /*
328 This could actually be an out-of-memory, but it is more likely
329 caused by a bad statement
330 */
331 my_error(ER_SYNTAX_ERROR, MYF(0));
332 goto end;
333 }
334
335 bytes_decoded -= event_len;
336 bufptr += event_len;
337
338 DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
339 ev->thd= thd;
340 /*
341 We go directly to the application phase, since we don't need
342 to check if the event shall be skipped or not.
343
344 Neither do we have to update the log positions, since that is
345 not used at all: the rli_fake instance is used only for error
346 reporting.
347 */
348 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
349 ulonglong save_skip_replication=
350 thd->variables.option_bits & OPTION_SKIP_REPLICATION;
351 thd->variables.option_bits=
352 (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
353 (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
354 OPTION_SKIP_REPLICATION : 0);
355
356 err= ev->apply_event(rgi);
357
358 thd->variables.option_bits=
359 (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
360 save_skip_replication;
361 #else
362 err= 0;
363 #endif
364 /*
365 Format_description_log_event should not be deleted because it
366 will be used to read info about the relay log's format; it
367 will be deleted when the SQL thread does not need it,
368 i.e. when this thread terminates.
369 */
370 if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
371 delete ev;
372 ev= 0;
373 if (err)
374 {
375 /*
376 TODO: Maybe a better error message since the BINLOG statement
377 now contains several events.
378 */
379 my_error(ER_UNKNOWN_ERROR, MYF(0));
380 goto end;
381 }
382 }
383 }
384
385
386 DBUG_PRINT("info",("binlog base64 execution finished successfully"));
387 my_ok(thd);
388
389 end:
390 if (unlikely(is_fragmented))
391 my_free(const_cast<char*>(thd->lex->comment.str));
392 thd->variables.option_bits= thd_options;
393 rgi->slave_close_thread_tables(thd);
394 my_free(buf);
395 DBUG_VOID_RETURN;
396 }
397