1 /*
2    Copyright (c) 2005, 2013, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1335  USA */
16 
17 #include "mariadb.h"
18 #include "sql_priv.h"
19 #include "sql_binlog.h"
20 #include "sql_parse.h"
21 #include "sql_acl.h"
22 #include "rpl_rli.h"
23 #include "slave.h"
24 #include "log_event.h"
25 
26 
27 /**
28   Check if the event type is allowed in a BINLOG statement.
29 
30   @retval 0 if the event type is ok.
31   @retval 1 if the event type is not ok.
32 */
check_event_type(int type,Relay_log_info * rli)33 static int check_event_type(int type, Relay_log_info *rli)
34 {
35   Format_description_log_event *fd_event=
36     rli->relay_log.description_event_for_exec;
37 
38   /*
39     Convert event type id of certain old versions (see comment in
40     Format_description_log_event::Format_description_log_event(char*,...)).
41   */
42   if (fd_event && fd_event->event_type_permutation)
43   {
44     IF_DBUG({
45         int new_type= fd_event->event_type_permutation[type];
46         DBUG_PRINT("info",
47                    ("converting event type %d to %d (%s)",
48                     type, new_type,
49                     Log_event::get_type_str((Log_event_type)new_type)));
50       },
51       (void)0);
52     type= fd_event->event_type_permutation[type];
53   }
54 
55   switch (type)
56   {
57   case START_EVENT_V3:
58   case FORMAT_DESCRIPTION_EVENT:
59     /*
60       We need a preliminary FD event in order to parse the FD event,
61       if we don't already have one.
62     */
63     if (!fd_event)
64       if (!(rli->relay_log.description_event_for_exec=
65             new Format_description_log_event(4)))
66       {
67         my_error(ER_OUTOFMEMORY, MYF(0), 1);
68         return 1;
69       }
70 
71     /* It is always allowed to execute FD events. */
72     return 0;
73 
74   case TABLE_MAP_EVENT:
75   case WRITE_ROWS_EVENT_V1:
76   case UPDATE_ROWS_EVENT_V1:
77   case DELETE_ROWS_EVENT_V1:
78   case WRITE_ROWS_EVENT:
79   case UPDATE_ROWS_EVENT:
80   case DELETE_ROWS_EVENT:
81   case PRE_GA_WRITE_ROWS_EVENT:
82   case PRE_GA_UPDATE_ROWS_EVENT:
83   case PRE_GA_DELETE_ROWS_EVENT:
84     /*
85       Row events are only allowed if a Format_description_event has
86       already been seen.
87     */
88     if (fd_event)
89       return 0;
90     else
91     {
92       my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT,
93                MYF(0), Log_event::get_type_str((Log_event_type)type));
94       return 1;
95     }
96     break;
97 
98   default:
99     /*
100       It is not meaningful to execute other events than row-events and
101       FD events. It would even be dangerous to execute Stop_log_event
102       and Rotate_log_event since they call Relay_log_info::flush(), which
103       is not allowed to call by other threads than the slave SQL
104       thread when the slave SQL thread is running.
105     */
106     my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT,
107              MYF(0), Log_event::get_type_str((Log_event_type)type));
108     return 1;
109   }
110 }
111 
112 /**
113   Copy fragments into the standard placeholder thd->lex->comment.str.
114 
115   Compute the size of the (still) encoded total,
116   allocate and then copy fragments one after another.
117   The size can exceed max(max_allowed_packet) which is not a
118   problem as no String instance is created off this char array.
119 
120   @param thd  THD handle
121   @return
122      0        at success,
123     -1        otherwise.
124 */
binlog_defragment(THD * thd)125 int binlog_defragment(THD *thd)
126 {
127   user_var_entry *entry[2];
128   LEX_CSTRING name[2]= { thd->lex->comment, thd->lex->ident };
129 
130   /* compute the total size */
131   thd->lex->comment.str= NULL;
132   thd->lex->comment.length= 0;
133   for (uint k= 0; k < 2; k++)
134   {
135     entry[k]=
136       (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
137                                        name[k].length);
138     if (!entry[k] || entry[k]->type != STRING_RESULT)
139     {
140       my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
141       return -1;
142     }
143     thd->lex->comment.length += entry[k]->length;
144   }
145 
146   thd->lex->comment.str=                            // to be freed by the caller
147     (char *) my_malloc(PSI_INSTRUMENT_ME, thd->lex->comment.length, MYF(MY_WME));
148   if (!thd->lex->comment.str)
149   {
150     my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);
151     return -1;
152   }
153 
154   /* fragments are merged into allocated buf while the user var:s get reset */
155   size_t gathered_length= 0;
156   for (uint k=0; k < 2; k++)
157   {
158     memcpy(const_cast<char*>(thd->lex->comment.str) + gathered_length, entry[k]->value,
159            entry[k]->length);
160     gathered_length += entry[k]->length;
161   }
162   for (uint k=0; k < 2; k++)
163     update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0);
164 
165   DBUG_ASSERT(gathered_length == thd->lex->comment.length);
166 
167   return 0;
168 }
169 
170 
171 /**
172   Execute a BINLOG statement.
173 
174   To execute the BINLOG command properly the server needs to know
175   which format the BINLOG command's event is in.  Therefore, the first
176   BINLOG statement seen must be a base64 encoding of the
177   Format_description_log_event, as outputted by mysqlbinlog.  This
178   Format_description_log_event is cached in
179   rli->description_event_for_exec.
180 
181   @param thd Pointer to THD object for the client thread executing the
182   statement.
183 */
184 
mysql_client_binlog_statement(THD * thd)185 void mysql_client_binlog_statement(THD* thd)
186 {
187   DBUG_ENTER("mysql_client_binlog_statement");
188   DBUG_PRINT("info",("binlog base64: '%*s'",
189                      (int) (thd->lex->comment.length < 2048 ?
190                             thd->lex->comment.length : 2048),
191                      thd->lex->comment.str));
192 
193   if (check_global_access(thd, PRIV_STMT_BINLOG))
194     DBUG_VOID_RETURN;
195 
196   /*
197     option_bits will be changed when applying the event. But we don't expect
198     it be changed permanently after BINLOG statement, so backup it first.
199     It will be restored at the end of this function.
200   */
201   ulonglong thd_options= thd->variables.option_bits;
202 
203   /*
204     Allocation
205   */
206 
207   int err;
208   Relay_log_info *rli;
209   rpl_group_info *rgi;
210   char *buf= NULL;
211   size_t coded_len= 0, decoded_len= 0;
212 
213   rli= thd->rli_fake;
214   if (!rli && (rli= thd->rli_fake= new Relay_log_info(FALSE, "BINLOG_BASE64_EVENT")))
215     rli->sql_driver_thd= thd;
216   if (!(rgi= thd->rgi_fake))
217     rgi= thd->rgi_fake= new rpl_group_info(rli);
218   rgi->thd= thd;
219 
220   const char *error= 0;
221   Log_event *ev = 0;
222   my_bool is_fragmented= FALSE;
223 
224   /*
225     Out of memory check
226   */
227   if (!(rli))
228   {
229     my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);  /* needed 1 bytes */
230     goto end;
231   }
232 
233   DBUG_ASSERT(rli->belongs_to_client());
234 
235   if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
236     if (binlog_defragment(thd))
237       goto end;
238 
239   if (!(coded_len= thd->lex->comment.length))
240   {
241     my_error(ER_SYNTAX_ERROR, MYF(0));
242     goto end;
243   }
244 
245   decoded_len= my_base64_needed_decoded_length((int)coded_len);
246   if (!(buf= (char *) my_malloc(key_memory_binlog_statement_buffer,
247                                 decoded_len, MYF(MY_WME))))
248   {
249     my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);
250     goto end;
251   }
252 
253   for (char const *strptr= thd->lex->comment.str ;
254        strptr < thd->lex->comment.str + thd->lex->comment.length ; )
255   {
256     char const *endptr= 0;
257     int bytes_decoded= my_base64_decode(strptr, coded_len, buf, &endptr,
258                                      MY_BASE64_DECODE_ALLOW_MULTIPLE_CHUNKS);
259 
260 #ifndef HAVE_valgrind
261       /*
262         This debug printout should not be used for valgrind builds
263         since it will read from unassigned memory.
264       */
265     DBUG_PRINT("info",
266                ("bytes_decoded: %d  strptr: %p  endptr: %p ('%c':%d)",
267                 bytes_decoded, strptr, endptr, *endptr,
268                 *endptr));
269 #endif
270 
271     if (bytes_decoded < 0)
272     {
273       my_error(ER_BASE64_DECODE_ERROR, MYF(0));
274       goto end;
275     }
276     else if (bytes_decoded == 0)
277       break; // If no bytes where read, the string contained only whitespace
278 
279     DBUG_ASSERT(bytes_decoded > 0);
280     DBUG_ASSERT(endptr > strptr);
281     coded_len-= endptr - strptr;
282     strptr= endptr;
283 
284     /*
285       Now we have one or more events stored in the buffer. The size of
286       the buffer is computed based on how much base64-encoded data
287       there were, so there should be ample space for the data (maybe
288       even too much, since a statement can consist of a considerable
289       number of events).
290 
291       TODO: Switch to use a stream-based base64 encoder/decoder in
292       order to be able to read exactly what is necessary.
293     */
294 
295     DBUG_PRINT("info",("binlog base64 decoded_len: %lu  bytes_decoded: %d",
296                        (ulong) decoded_len, bytes_decoded));
297 
298     /*
299       Now we start to read events of the buffer, until there are no
300       more.
301     */
302     for (char *bufptr= buf ; bytes_decoded > 0 ; )
303     {
304       /*
305         Checking that the first event in the buffer is not truncated.
306       */
307       ulong event_len;
308       if (bytes_decoded < EVENT_LEN_OFFSET + 4 ||
309           (event_len= uint4korr(bufptr + EVENT_LEN_OFFSET)) >
310            (uint) bytes_decoded)
311       {
312         my_error(ER_SYNTAX_ERROR, MYF(0));
313         goto end;
314       }
315       DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
316                           event_len, bytes_decoded));
317 
318       if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli))
319         goto end;
320 
321       ev= Log_event::read_log_event(bufptr, event_len, &error,
322                                     rli->relay_log.description_event_for_exec,
323                                     0);
324 
325       DBUG_PRINT("info",("binlog base64 err=%s", error));
326       if (!ev)
327       {
328         /*
329           This could actually be an out-of-memory, but it is more likely
330           caused by a bad statement
331         */
332         my_error(ER_SYNTAX_ERROR, MYF(0));
333         goto end;
334       }
335 
336       bytes_decoded -= event_len;
337       bufptr += event_len;
338 
339       DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
340       ev->thd= thd;
341       /*
342         We go directly to the application phase, since we don't need
343         to check if the event shall be skipped or not.
344 
345         Neither do we have to update the log positions, since that is
346         not used at all: the rli_fake instance is used only for error
347         reporting.
348       */
349 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
350       ulonglong save_skip_replication=
351                         thd->variables.option_bits & OPTION_SKIP_REPLICATION;
352       thd->variables.option_bits=
353         (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
354         (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ?
355          OPTION_SKIP_REPLICATION : 0);
356 
357       err= ev->apply_event(rgi);
358 
359       thd->variables.option_bits=
360         (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) |
361         save_skip_replication;
362 #else
363       err= 0;
364 #endif
365       /*
366         Format_description_log_event should not be deleted because it
367         will be used to read info about the relay log's format; it
368         will be deleted when the SQL thread does not need it,
369         i.e. when this thread terminates.
370       */
371       if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
372         delete ev;
373       ev= 0;
374       if (err)
375       {
376         /*
377           TODO: Maybe a better error message since the BINLOG statement
378           now contains several events.
379         */
380         my_error(ER_UNKNOWN_ERROR, MYF(0));
381         goto end;
382       }
383     }
384   }
385 
386 
387   DBUG_PRINT("info",("binlog base64 execution finished successfully"));
388   my_ok(thd);
389 
390 end:
391   if (unlikely(is_fragmented))
392     my_free(const_cast<char*>(thd->lex->comment.str));
393   thd->variables.option_bits= thd_options;
394   rgi->slave_close_thread_tables(thd);
395   my_free(buf);
396   DBUG_VOID_RETURN;
397 }
398