1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "rpl_utility.h"
24 
25 #ifndef MYSQL_CLIENT
26 
27 #include "binlog_event.h"                // checksum_crv32
28 #include "template_utils.h"              // delete_container_pointers
29 #include "field.h"                       // Field
30 #include "log.h"                         // sql_print_error
31 #include "log_event.h"                   // Log_event
32 #include "rpl_rli.h"                     // Relay_log_info
33 #include "sql_class.h"                   // THD
34 #include "sql_tmp_table.h"               // create_virtual_tmp_table
35 #include "rpl_slave.h"
36 #include <algorithm>
37 
38 using std::min;
39 using std::max;
40 using binary_log::checksum_crc32;
41 
42 /**
43    Function to compare two size_t integers for their relative
44    order. Used below.
45  */
compare(size_t a,size_t b)46 static int compare(size_t a, size_t b)
47 {
48   if (a < b)
49     return -1;
50   if (b < a)
51     return 1;
52   return 0;
53 }
54 
55 
56 /*
57   Compare the pack lengths of a source field (on the master) and a
58   target field (on the slave).
59 
60   @param field    Target field.
61   @param type     Source field type.
62   @param metadata Source field metadata.
63 
64   @retval -1 The length of the source field is smaller than the target field.
65   @retval  0 The length of the source and target fields are the same.
66   @retval  1 The length of the source field is greater than the target field.
67  */
compare_lengths(Field * field,enum_field_types source_type,uint16 metadata)68 int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
69 {
70   DBUG_ENTER("compare_lengths");
71   size_t const source_length=
72     max_display_length_for_field(source_type, metadata);
73   size_t const target_length= field->max_display_length();
74   DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
75                        " target_length: %lu, target_type: %u",
76                        (unsigned long) source_length, source_type,
77                        (unsigned long) target_length, field->real_type()));
78   int result= compare(source_length, target_length);
79   DBUG_PRINT("result", ("%d", result));
80   DBUG_RETURN(result);
81 }
82 #endif //MYSQL_CLIENT
83 
84 /*********************************************************************
85  *                   table_def member definitions                    *
86  *********************************************************************/
87 
88 /*
89   This function returns the field size in raw bytes based on the type
90   and the encoded field data from the master's raw data.
91 */
calc_field_size(uint col,uchar * master_data) const92 uint32 table_def::calc_field_size(uint col, uchar *master_data) const
93 {
94   uint32 length= ::calc_field_size(type(col), master_data,
95                                    m_field_metadata[col]);
96   return length;
97 }
98 
99 /**
100  */
101 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
show_sql_type(enum_field_types type,uint16 metadata,String * str,const CHARSET_INFO * field_cs)102 static void show_sql_type(enum_field_types type, uint16 metadata, String *str,
103                           const CHARSET_INFO *field_cs)
104 {
105   DBUG_ENTER("show_sql_type");
106   DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata));
107 
108   switch (type)
109   {
110   case MYSQL_TYPE_TINY:
111     str->set_ascii(STRING_WITH_LEN("tinyint"));
112     break;
113 
114   case MYSQL_TYPE_SHORT:
115     str->set_ascii(STRING_WITH_LEN("smallint"));
116     break;
117 
118   case MYSQL_TYPE_LONG:
119     str->set_ascii(STRING_WITH_LEN("int"));
120     break;
121 
122   case MYSQL_TYPE_FLOAT:
123     str->set_ascii(STRING_WITH_LEN("float"));
124     break;
125 
126   case MYSQL_TYPE_DOUBLE:
127     str->set_ascii(STRING_WITH_LEN("double"));
128     break;
129 
130   case MYSQL_TYPE_NULL:
131     str->set_ascii(STRING_WITH_LEN("null"));
132     break;
133 
134   case MYSQL_TYPE_TIMESTAMP:
135   case MYSQL_TYPE_TIMESTAMP2:
136     str->set_ascii(STRING_WITH_LEN("timestamp"));
137     break;
138 
139   case MYSQL_TYPE_LONGLONG:
140     str->set_ascii(STRING_WITH_LEN("bigint"));
141     break;
142 
143   case MYSQL_TYPE_INT24:
144     str->set_ascii(STRING_WITH_LEN("mediumint"));
145     break;
146 
147   case MYSQL_TYPE_NEWDATE:
148   case MYSQL_TYPE_DATE:
149     str->set_ascii(STRING_WITH_LEN("date"));
150     break;
151 
152   case MYSQL_TYPE_TIME:
153   case MYSQL_TYPE_TIME2:
154     str->set_ascii(STRING_WITH_LEN("time"));
155     break;
156 
157   case MYSQL_TYPE_DATETIME:
158   case MYSQL_TYPE_DATETIME2:
159     str->set_ascii(STRING_WITH_LEN("datetime"));
160     break;
161 
162   case MYSQL_TYPE_YEAR:
163     str->set_ascii(STRING_WITH_LEN("year"));
164     break;
165 
166   case MYSQL_TYPE_VAR_STRING:
167   case MYSQL_TYPE_VARCHAR:
168     {
169       const CHARSET_INFO *cs= str->charset();
170       size_t length=
171         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
172                            "varchar(%u(bytes))", metadata);
173       str->length(length);
174     }
175     break;
176 
177   case MYSQL_TYPE_BIT:
178     {
179       const CHARSET_INFO *cs= str->charset();
180       int bit_length= 8 * (metadata >> 8) + (metadata & 0xFF);
181       size_t length=
182         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
183                            "bit(%d)", bit_length);
184       str->length(length);
185     }
186     break;
187 
188   case MYSQL_TYPE_DECIMAL:
189     {
190       const CHARSET_INFO *cs= str->charset();
191       size_t length=
192         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
193                            "decimal(%d,?)", metadata);
194       str->length(length);
195     }
196     break;
197 
198   case MYSQL_TYPE_NEWDECIMAL:
199     {
200       const CHARSET_INFO *cs= str->charset();
201       size_t length=
202         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
203                            "decimal(%d,%d)", metadata >> 8, metadata & 0xff);
204       str->length(length);
205     }
206     break;
207 
208   case MYSQL_TYPE_ENUM:
209     str->set_ascii(STRING_WITH_LEN("enum"));
210     break;
211 
212   case MYSQL_TYPE_SET:
213     str->set_ascii(STRING_WITH_LEN("set"));
214     break;
215 
216   case MYSQL_TYPE_BLOB:
217     /*
218       Field::real_type() lies regarding the actual type of a BLOB, so
219       it is necessary to check the pack length to figure out what kind
220       of blob it really is.
221      */
222     switch (metadata)
223     {
224     case 1:
225       str->set_ascii(STRING_WITH_LEN("tinyblob"));
226       break;
227 
228     case 2:
229       str->set_ascii(STRING_WITH_LEN("blob"));
230       break;
231 
232     case 3:
233       str->set_ascii(STRING_WITH_LEN("mediumblob"));
234       break;
235 
236     case 4:
237       str->set_ascii(STRING_WITH_LEN("longblob"));
238       break;
239 
240     default:
241       assert(0);
242       break;
243     }
244     break;
245 
246   case MYSQL_TYPE_STRING:
247     {
248       /*
249         This is taken from Field_string::unpack.
250       */
251       const CHARSET_INFO *cs= str->charset();
252       uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
253       size_t length=
254         cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
255                            "char(%d(bytes))", bytes);
256       str->length(length);
257     }
258     break;
259 
260   case MYSQL_TYPE_GEOMETRY:
261     str->set_ascii(STRING_WITH_LEN("geometry"));
262     break;
263 
264   case MYSQL_TYPE_JSON:
265     str->set_ascii(STRING_WITH_LEN("json"));
266     break;
267 
268   default:
269     str->set_ascii(STRING_WITH_LEN("<unknown type>"));
270   }
271   DBUG_VOID_RETURN;
272 }
273 
274 
275 /**
276    Check the order variable and print errors if the order is not
277    acceptable according to the current settings.
278 
279    @param order  The computed order of the conversion needed.
280    @param rli    The relay log info data structure: for error reporting.
281  */
is_conversion_ok(int order,Relay_log_info * rli)282 bool is_conversion_ok(int order, Relay_log_info *rli)
283 {
284   DBUG_ENTER("is_conversion_ok");
285   bool allow_non_lossy, allow_lossy;
286 
287   allow_non_lossy = slave_type_conversions_options &
288                     (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
289   allow_lossy= slave_type_conversions_options &
290                (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
291 
292   DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
293                        allow_non_lossy ? " ALL_NON_LOSSY" : "",
294                        allow_lossy ? " ALL_LOSSY" : ""));
295   if (order < 0 && !allow_non_lossy)
296   {
297     /* !!! Add error message saying that non-lossy conversions need to be allowed. */
298     DBUG_RETURN(false);
299   }
300 
301   if (order > 0 && !allow_lossy)
302   {
303     /* !!! Add error message saying that lossy conversions need to be allowed. */
304     DBUG_RETURN(false);
305   }
306 
307   DBUG_RETURN(true);
308 }
309 
310 
311 /**
312   Check if the types are criss cross means type1 is MYSQL_TYPE_TIMESTAMP
313   and type2 as MYSQL_TYPE_TIMESTAMP2 or vice versa.
314 */
timestamp_cross_check(enum_field_types type1,enum_field_types type2)315 inline bool timestamp_cross_check(enum_field_types type1,
316                                   enum_field_types type2)
317 {
318   return ((type1 == MYSQL_TYPE_TIMESTAMP &&
319           type2 == MYSQL_TYPE_TIMESTAMP2) ||
320           (type1 == MYSQL_TYPE_TIMESTAMP2 &&
321           type2 == MYSQL_TYPE_TIMESTAMP));
322 }
323 
324 
325 /**
326   Check if the types are criss cross means type1 is MYSQL_TYPE_DATETIME
327   and type2 as MYSQL_TYPE_DATETIME or vice versa.
328 */
datetime_cross_check(enum_field_types type1,enum_field_types type2)329 inline bool datetime_cross_check(enum_field_types type1,
330                                  enum_field_types type2)
331 {
332   return ((type1 == MYSQL_TYPE_DATETIME &&
333           type2 == MYSQL_TYPE_DATETIME2) ||
334           (type1 == MYSQL_TYPE_DATETIME2 &&
335           type2 == MYSQL_TYPE_DATETIME));
336 }
337 
338 
339 /**
340   Check if the types are criss cross means type1 is MYSQL_TYPE_TIME
341   and type2 as MYSQL_TYPE_TIME2 or vice versa.
342 */
time_cross_check(enum_field_types type1,enum_field_types type2)343 inline bool time_cross_check(enum_field_types type1,
344                              enum_field_types type2)
345 {
346   return ((type1 == MYSQL_TYPE_TIME&&
347           type2 == MYSQL_TYPE_TIME2) ||
348           (type1 == MYSQL_TYPE_TIME2 &&
349           type2 == MYSQL_TYPE_TIME));
350 }
351 
352 
353 /**
354    Can a type potentially be converted to another type?
355 
356    This function check if the types are convertible and what
357    conversion is required.
358 
359    If conversion is not possible, and error is printed.
360 
361    If conversion is possible:
362 
363    - *order will be set to -1 if source type is smaller than target
364      type and a non-lossy conversion can be required. This includes
365      the case where the field types are different but types could
366      actually be converted in either direction.
367 
368    - *order will be set to 0 if no conversion is required.
369 
370    - *order will be set to 1 if the source type is strictly larger
371       than the target type and that conversion is potentially lossy.
372 
373    @param[in] field    Target field
374    @param[in] type     Source field type
375    @param[in] metadata Source field metadata
376    @param[in] rli      Relay log info (for error reporting)
377    @param[in] mflags   Flags from the table map event
378    @param[out] order   Order between source field and target field
379 
380    @return @c true if conversion is possible according to the current
381    settings, @c false if conversion is not possible according to the
382    current setting.
383  */
384 static bool
can_convert_field_to(Field * field,enum_field_types source_type,uint16 metadata,Relay_log_info * rli,uint16 mflags,int * order_var)385 can_convert_field_to(Field *field,
386                      enum_field_types source_type, uint16 metadata,
387                      Relay_log_info *rli, uint16 mflags,
388                      int *order_var)
389 {
390   DBUG_ENTER("can_convert_field_to");
391 #ifndef NDEBUG
392   char field_type_buf[MAX_FIELD_WIDTH];
393   String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
394   field->sql_type(field_type);
395   DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
396                        field_type.c_ptr_safe(), field->real_type(), source_type, metadata));
397 #endif
398   /*
399     If the real type is the same, we need to check the metadata to
400     decide if conversions are allowed.
401    */
402   if (field->real_type() == source_type)
403   {
404     if (metadata == 0) // Metadata can only be zero if no metadata was provided
405     {
406       /*
407         If there is no metadata, we either have an old event where no
408         metadata were supplied, or a type that does not require any
409         metadata. In either case, conversion can be done but no
410         conversion table is necessary.
411        */
412       DBUG_PRINT("debug", ("Base types are identical, but there is no metadata"));
413       *order_var= 0;
414       DBUG_RETURN(true);
415     }
416 
417     DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
418     if (field->compatible_field_size(metadata, rli, mflags, order_var))
419       DBUG_RETURN(is_conversion_ok(*order_var, rli));
420     else
421       DBUG_RETURN(false);
422   }
423   else if (metadata == 0 &&
424            (timestamp_cross_check(field->real_type(), source_type) ||
425            datetime_cross_check(field->real_type(), source_type) ||
426            time_cross_check(field->real_type(), source_type)))
427   {
428     /*
429       In the above condition, we are taking care
430       of case where
431       1) Master having old TIME, TIMESTAMP, DATETIME
432       and slave have new TIME2, TIMESTAMP2, DATETIME2
433       or
434       2) Master having new TIMESTAMP2, DATETIME2, TIME2
435       with fraction part zero and slave have TIME,
436       TIMESTAMP, DATETIME.
437       We need second condition, as when we are
438       upgrading from 5.5 to 5.6 TIME, TIMESTAMP,
439       DATETIME columns are not upgraded to TIME(0),
440       TIMESTAMP(0), DATETIME(0).
441       So to support these conversion we are putting this
442       condition.
443     */
444     /*
445       TS-TODO: conversion from FSP1>FSP2.
446       Can do non-lossy conversion
447       from old TIME, TIMESTAMP, DATETIME
448       to new TIME(0), TIMESTAMP(0), DATETIME(0).
449     */
450     *order_var= -1;
451     DBUG_RETURN(true);
452   }
453   else if (!slave_type_conversions_options)
454     DBUG_RETURN(false);
455 
456   /*
457     Here, from and to will always be different. Since the types are
458     different, we cannot use the compatible_field_size() function, but
459     have to rely on hard-coded max-sizes for fields.
460   */
461 
462   DBUG_PRINT("debug", ("Base types are different, checking conversion"));
463   switch (source_type)                      // Source type (on master)
464   {
465   case MYSQL_TYPE_DECIMAL:
466   case MYSQL_TYPE_NEWDECIMAL:
467   case MYSQL_TYPE_FLOAT:
468   case MYSQL_TYPE_DOUBLE:
469     switch (field->real_type())
470     {
471     case MYSQL_TYPE_NEWDECIMAL:
472       /*
473         Then the other type is either FLOAT, DOUBLE, or old style
474         DECIMAL, so we require lossy conversion.
475       */
476       *order_var= 1;
477       DBUG_RETURN(is_conversion_ok(*order_var, rli));
478 
479     case MYSQL_TYPE_DECIMAL:
480     case MYSQL_TYPE_FLOAT:
481     case MYSQL_TYPE_DOUBLE:
482     {
483       if (source_type == MYSQL_TYPE_NEWDECIMAL ||
484           source_type == MYSQL_TYPE_DECIMAL)
485         *order_var = 1;                         // Always require lossy conversions
486       else
487         *order_var= compare_lengths(field, source_type, metadata);
488       assert(*order_var != 0);
489       DBUG_RETURN(is_conversion_ok(*order_var, rli));
490     }
491 
492     default:
493       DBUG_RETURN(false);
494     }
495     break;
496 
497   /*
498     The length comparison check will do the correct job of comparing
499     the field lengths (in bytes) of two integer types.
500   */
501   case MYSQL_TYPE_TINY:
502   case MYSQL_TYPE_SHORT:
503   case MYSQL_TYPE_INT24:
504   case MYSQL_TYPE_LONG:
505   case MYSQL_TYPE_LONGLONG:
506     switch (field->real_type())
507     {
508     case MYSQL_TYPE_TINY:
509     case MYSQL_TYPE_SHORT:
510     case MYSQL_TYPE_INT24:
511     case MYSQL_TYPE_LONG:
512     case MYSQL_TYPE_LONGLONG:
513       *order_var= compare_lengths(field, source_type, metadata);
514       assert(*order_var != 0);
515       DBUG_RETURN(is_conversion_ok(*order_var, rli));
516 
517     default:
518       DBUG_RETURN(false);
519     }
520     break;
521 
522   /*
523     Since source and target type is different, and it is not possible
524     to convert bit types to anything else, this will return false.
525    */
526   case MYSQL_TYPE_BIT:
527     DBUG_RETURN(false);
528 
529   /*
530     If all conversions are disabled, it is not allowed to convert
531     between these types. Since the TEXT vs. BINARY is distinguished by
532     the charset, and the charset is not replicated, we cannot
533     currently distinguish between , e.g., TEXT and BLOB.
534    */
535   case MYSQL_TYPE_TINY_BLOB:
536   case MYSQL_TYPE_MEDIUM_BLOB:
537   case MYSQL_TYPE_LONG_BLOB:
538   case MYSQL_TYPE_BLOB:
539   case MYSQL_TYPE_STRING:
540   case MYSQL_TYPE_VAR_STRING:
541   case MYSQL_TYPE_VARCHAR:
542     switch (field->real_type())
543     {
544     case MYSQL_TYPE_TINY_BLOB:
545     case MYSQL_TYPE_MEDIUM_BLOB:
546     case MYSQL_TYPE_LONG_BLOB:
547     case MYSQL_TYPE_BLOB:
548     case MYSQL_TYPE_STRING:
549     case MYSQL_TYPE_VAR_STRING:
550     case MYSQL_TYPE_VARCHAR:
551       *order_var= compare_lengths(field, source_type, metadata);
552       /*
553         Here we know that the types are different, so if the order
554         gives that they do not require any conversion, we still need
555         to have non-lossy conversion enabled to allow conversion
556         between different (string) types of the same length.
557        */
558       if (*order_var == 0)
559         *order_var= -1;
560       DBUG_RETURN(is_conversion_ok(*order_var, rli));
561 
562     default:
563       DBUG_RETURN(false);
564     }
565     break;
566 
567   case MYSQL_TYPE_GEOMETRY:
568   case MYSQL_TYPE_JSON:
569   case MYSQL_TYPE_TIMESTAMP:
570   case MYSQL_TYPE_DATE:
571   case MYSQL_TYPE_TIME:
572   case MYSQL_TYPE_DATETIME:
573   case MYSQL_TYPE_YEAR:
574   case MYSQL_TYPE_NEWDATE:
575   case MYSQL_TYPE_NULL:
576   case MYSQL_TYPE_ENUM:
577   case MYSQL_TYPE_SET:
578   case MYSQL_TYPE_TIMESTAMP2:
579   case MYSQL_TYPE_DATETIME2:
580   case MYSQL_TYPE_TIME2:
581     DBUG_RETURN(false);
582   }
583   DBUG_RETURN(false);                                 // To keep GCC happy
584 }
585 
586 
587 /**
588   Is the definition compatible with a table?
589 
590   This function will compare the master table with an existing table
591   on the slave and see if they are compatible with respect to the
592   current settings of @c SLAVE_TYPE_CONVERSIONS.
593 
594   If the tables are compatible and conversions are required, @c
595   *tmp_table_var will be set to a virtual temporary table with field
596   pointers for the fields that require conversions.  This allow simple
597   checking of whether a conversion are to be applied or not.
598 
599   If tables are compatible, but no conversions are necessary, @c
600   *tmp_table_var will be set to NULL.
601 
602   @param rli_arg[in]
603   Relay log info, for error reporting.
604 
605   @param table[in]
606   Table to compare with
607 
608   @param tmp_table_var[out]
609   Virtual temporary table for performing conversions, if necessary.
610 
611   @retval true Master table is compatible with slave table.
612   @retval false Master table is not compatible with slave table.
613 */
614 bool
compatible_with(THD * thd,Relay_log_info * rli,TABLE * table,TABLE ** conv_table_var) const615 table_def::compatible_with(THD *thd, Relay_log_info *rli,
616                            TABLE *table, TABLE **conv_table_var)
617   const
618 {
619   /*
620     We only check the initial columns for the tables.
621   */
622   uint const cols_to_check= min<ulong>(table->s->fields, size());
623   TABLE *tmp_table= NULL;
624 
625   for (uint col= 0 ; col < cols_to_check ; ++col)
626   {
627     Field *const field= table->field[col];
628     int order;
629     if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order))
630     {
631       DBUG_PRINT("debug", ("Checking column %d -"
632                            " field '%s' can be converted - order: %d",
633                            col, field->field_name, order));
634       assert(order >= -1 && order <= 1);
635 
636       /*
637         If order is not 0, a conversion is required, so we need to set
638         up the conversion table.
639        */
640       if (order != 0 && tmp_table == NULL)
641       {
642         /*
643           This will create the full table with all fields. This is
644           necessary to ge the correct field lengths for the record.
645         */
646         tmp_table= create_conversion_table(thd, rli, table);
647         if (tmp_table == NULL)
648             return false;
649         /*
650           Clear all fields up to, but not including, this column.
651         */
652         for (unsigned int i= 0; i < col; ++i)
653           tmp_table->field[i]= NULL;
654       }
655 
656       if (order == 0 && tmp_table != NULL)
657         tmp_table->field[col]= NULL;
658     }
659     else
660     {
661       DBUG_PRINT("debug", ("Checking column %d -"
662                            " field '%s' can not be converted",
663                            col, field->field_name));
664       assert(col < size() && col < table->s->fields);
665       assert(table->s->db.str && table->s->table_name.str);
666       const char *db_name= table->s->db.str;
667       const char *tbl_name= table->s->table_name.str;
668       char source_buf[MAX_FIELD_WIDTH];
669       char target_buf[MAX_FIELD_WIDTH];
670       String field_sql_type;
671       enum loglevel report_level= INFORMATION_LEVEL;
672       String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
673       String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
674       show_sql_type(type(col), field_metadata(col), &source_type, field->charset());
675       if (!ignored_error_code(ER_SLAVE_CONVERSION_FAILED))
676       {
677         report_level= ERROR_LEVEL;
678         thd->is_slave_error= 1;
679       }
680       /* In case of ignored errors report warnings only if log_warnings > 1. */
681       else if (log_warnings > 1)
682         report_level= WARNING_LEVEL;
683 
684       if (field->has_charset() &&
685           (field->type() == MYSQL_TYPE_VARCHAR ||
686            field->type() == MYSQL_TYPE_STRING))
687       {
688         field_sql_type.append((field->type() == MYSQL_TYPE_VARCHAR) ?
689                               "varchar" : "char");
690         const CHARSET_INFO *cs= field->charset();
691         size_t length= cs->cset->snprintf(cs, (char*) target_type.ptr(),
692                                           target_type.alloced_length(),
693                                           "%s(%u(bytes) %s)",
694                                           field_sql_type.c_ptr_safe(),
695                                           field->field_length,
696                                           field->charset()->csname);
697         target_type.length(length);
698       }
699       else
700         field->sql_type(target_type);
701 
702       if (report_level != INFORMATION_LEVEL)
703         rli->report(report_level, ER_SLAVE_CONVERSION_FAILED,
704                     ER(ER_SLAVE_CONVERSION_FAILED),
705                     col, db_name, tbl_name,
706                     source_type.c_ptr_safe(), target_type.c_ptr_safe());
707       return false;
708     }
709   }
710 
711 #ifndef NDEBUG
712   if (tmp_table)
713   {
714     for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
715       if (tmp_table->field[col])
716       {
717         char source_buf[MAX_FIELD_WIDTH];
718         char target_buf[MAX_FIELD_WIDTH];
719         String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
720         String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
721         tmp_table->field[col]->sql_type(source_type);
722         table->field[col]->sql_type(target_type);
723         DBUG_PRINT("debug", ("Field %s - conversion required."
724                              " Source type: '%s', Target type: '%s'",
725                              tmp_table->field[col]->field_name,
726                              source_type.c_ptr_safe(), target_type.c_ptr_safe()));
727       }
728   }
729 #endif
730 
731   *conv_table_var= tmp_table;
732   return true;
733 }
734 
735 /**
736   Create a conversion table.
737 
738   If the function is unable to create the conversion table, an error
739   will be printed and NULL will be returned.
740 
741   @return Pointer to conversion table, or NULL if unable to create
742   conversion table.
743  */
744 
create_conversion_table(THD * thd,Relay_log_info * rli,TABLE * target_table) const745 TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table) const
746 {
747   DBUG_ENTER("table_def::create_conversion_table");
748 
749   List<Create_field> field_list;
750   TABLE *conv_table= NULL;
751   /*
752     At slave, columns may differ. So we should create
753     min(columns@master, columns@slave) columns in the
754     conversion table.
755   */
756   uint const cols_to_create= min<ulong>(target_table->s->fields, size());
757 
758   // Default value : treat all values signed
759   bool unsigned_flag= FALSE;
760 
761   // Check if slave_type_conversions contains ALL_UNSIGNED
762   unsigned_flag= slave_type_conversions_options &
763                   (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_UNSIGNED);
764 
765   // Check if slave_type_conversions contains ALL_SIGNED
766   unsigned_flag= unsigned_flag && !(slave_type_conversions_options &
767                  (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_SIGNED));
768 
769   for (uint col= 0 ; col < cols_to_create; ++col)
770   {
771     Create_field *field_def=
772       (Create_field*) alloc_root(thd->mem_root, sizeof(Create_field));
773     if (field_list.push_back(field_def))
774       DBUG_RETURN(NULL);
775 
776     uint decimals= 0;
777     TYPELIB* interval= NULL;
778     uint pack_length= 0;
779     uint32 max_length=
780       max_display_length_for_field(type(col), field_metadata(col));
781 
782     switch(type(col))
783     {
784       int precision;
785     case MYSQL_TYPE_ENUM:
786     case MYSQL_TYPE_SET:
787       interval= static_cast<Field_enum*>(target_table->field[col])->typelib;
788       pack_length= field_metadata(col) & 0x00ff;
789       break;
790 
791     case MYSQL_TYPE_NEWDECIMAL:
792       /*
793         The display length of a DECIMAL type is not the same as the
794         length that should be supplied to make_field, so we correct
795         the length here.
796        */
797       precision= field_metadata(col) >> 8;
798       decimals= field_metadata(col) & 0x00ff;
799       max_length=
800         my_decimal_precision_to_length(precision, decimals, FALSE);
801       break;
802 
803     case MYSQL_TYPE_DECIMAL:
804       sql_print_error("In RBR mode, Slave received incompatible DECIMAL field "
805                       "(old-style decimal field) from Master while creating "
806                       "conversion table. Please consider changing datatype on "
807                       "Master to new style decimal by executing ALTER command for"
808                       " column Name: %s.%s.%s.",
809                       target_table->s->db.str,
810                       target_table->s->table_name.str,
811                       target_table->field[col]->field_name);
812       goto err;
813 
814     case MYSQL_TYPE_TINY_BLOB:
815     case MYSQL_TYPE_MEDIUM_BLOB:
816     case MYSQL_TYPE_LONG_BLOB:
817     case MYSQL_TYPE_BLOB:
818     case MYSQL_TYPE_GEOMETRY:
819     case MYSQL_TYPE_JSON:
820       pack_length= field_metadata(col) & 0x00ff;
821       break;
822 
823     default:
824       break;
825     }
826 
827     DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
828                          " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
829                          binlog_type(col), target_table->field[col]->field_name,
830                          max_length, decimals, TRUE, unsigned_flag, pack_length));
831     field_def->init_for_tmp_table(type(col),
832                                   max_length,
833                                   decimals,
834                                   TRUE,          // maybe_null
835                                   unsigned_flag, // unsigned_flag
836                                   pack_length);
837     field_def->charset= target_table->field[col]->charset();
838     field_def->interval= interval;
839   }
840 
841   conv_table= create_virtual_tmp_table(thd, field_list);
842 
843 err:
844   if (conv_table == NULL)
845   {
846     enum loglevel report_level= INFORMATION_LEVEL;
847     if (!ignored_error_code(ER_SLAVE_CANT_CREATE_CONVERSION))
848     {
849       report_level= ERROR_LEVEL;
850       thd->is_slave_error= 1;
851     }
852     /* In case of ignored errors report warnings only if log_warnings > 1. */
853     else if (log_warnings > 1)
854       report_level= WARNING_LEVEL;
855 
856     if (report_level != INFORMATION_LEVEL)
857       rli->report(report_level, ER_SLAVE_CANT_CREATE_CONVERSION,
858                   ER(ER_SLAVE_CANT_CREATE_CONVERSION),
859                   target_table->s->db.str,
860                   target_table->s->table_name.str);
861   }
862   DBUG_RETURN(conv_table);
863 }
864 
865 #endif /* MYSQL_CLIENT */
866 
867 PSI_memory_key key_memory_table_def_memory;
868 
table_def(unsigned char * types,ulong size,uchar * field_metadata,int metadata_size,uchar * null_bitmap,uint16 flags)869 table_def::table_def(unsigned char *types, ulong size,
870                      uchar *field_metadata, int metadata_size,
871                      uchar *null_bitmap, uint16 flags)
872   : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
873     m_field_metadata(0), m_null_bits(0), m_flags(flags),
874     m_memory(NULL)
875 {
876   m_memory= (uchar *)my_multi_malloc(key_memory_table_def_memory,
877                                      MYF(MY_WME),
878                                      &m_type, size,
879                                      &m_field_metadata,
880                                      size * sizeof(uint16),
881                                      &m_null_bits, (size + 7) / 8,
882                                      NULL);
883 
884   memset(m_field_metadata, 0, size * sizeof(uint16));
885 
886   if (m_type)
887     memcpy(m_type, types, size);
888   else
889     m_size= 0;
890   /*
891     Extract the data from the table map into the field metadata array
892     iff there is field metadata. The variable metadata_size will be
893     0 if we are replicating from an older version server since no field
894     metadata was written to the table map. This can also happen if
895     there were no fields in the master that needed extra metadata.
896   */
897   if (m_size && metadata_size)
898   {
899     int index= 0;
900     for (unsigned int i= 0; i < m_size; i++)
901     {
902       switch (binlog_type(i)) {
903       case MYSQL_TYPE_TINY_BLOB:
904       case MYSQL_TYPE_BLOB:
905       case MYSQL_TYPE_MEDIUM_BLOB:
906       case MYSQL_TYPE_LONG_BLOB:
907       case MYSQL_TYPE_DOUBLE:
908       case MYSQL_TYPE_FLOAT:
909       case MYSQL_TYPE_GEOMETRY:
910       case MYSQL_TYPE_JSON:
911       {
912         /*
913           These types store a single byte.
914         */
915         m_field_metadata[i]= field_metadata[index];
916         index++;
917         break;
918       }
919       case MYSQL_TYPE_SET:
920       case MYSQL_TYPE_ENUM:
921       case MYSQL_TYPE_STRING:
922       {
923         uint16 x= field_metadata[index++] << 8U; // real_type
924         x+= field_metadata[index++];            // pack or field length
925         m_field_metadata[i]= x;
926         break;
927       }
928       case MYSQL_TYPE_BIT:
929       {
930         uint16 x= field_metadata[index++];
931         x = x + (field_metadata[index++] << 8U);
932         m_field_metadata[i]= x;
933         break;
934       }
935       case MYSQL_TYPE_VARCHAR:
936       {
937         /*
938           These types store two bytes.
939         */
940         char *ptr= (char *)&field_metadata[index];
941         m_field_metadata[i]= uint2korr(ptr);
942         index= index + 2;
943         break;
944       }
945       case MYSQL_TYPE_NEWDECIMAL:
946       {
947         uint16 x= field_metadata[index++] << 8U; // precision
948         x+= field_metadata[index++];            // decimals
949         m_field_metadata[i]= x;
950         break;
951       }
952       case MYSQL_TYPE_TIME2:
953       case MYSQL_TYPE_DATETIME2:
954       case MYSQL_TYPE_TIMESTAMP2:
955         m_field_metadata[i]= field_metadata[index++];
956         break;
957       default:
958         m_field_metadata[i]= 0;
959         break;
960       }
961     }
962   }
963   if (m_size && null_bitmap)
964     memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
965 }
966 
967 
~table_def()968 table_def::~table_def()
969 {
970   my_free(m_memory);
971 #ifndef NDEBUG
972   m_type= 0;
973   m_size= 0;
974 #endif
975 }
976 
977 #ifndef MYSQL_CLIENT
978 
979 #define HASH_ROWS_POS_SEARCH_INVALID -1
980 
981 /**
982   Utility methods for handling row based operations.
983  */
984 
985 static uchar*
hash_slave_rows_get_key(const uchar * record,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))986 hash_slave_rows_get_key(const uchar *record,
987                         size_t *length,
988                         my_bool not_used MY_ATTRIBUTE((unused)))
989 {
990   DBUG_ENTER("get_key");
991 
992   HASH_ROW_ENTRY *entry=(HASH_ROW_ENTRY *) record;
993   HASH_ROW_PREAMBLE *preamble= entry->preamble;
994   *length= preamble->length;
995 
996   DBUG_RETURN((uchar*) &preamble->hash_value);
997 }
998 
999 static void
hash_slave_rows_free_entry(HASH_ROW_ENTRY * entry)1000 hash_slave_rows_free_entry(HASH_ROW_ENTRY *entry)
1001 {
1002   DBUG_ENTER("free_entry");
1003   if (entry)
1004   {
1005     if (entry->preamble)
1006       my_free(entry->preamble);
1007     if (entry->positions)
1008       my_free(entry->positions);
1009     my_free(entry);
1010   }
1011   DBUG_VOID_RETURN;
1012 }
1013 
is_empty(void)1014 bool Hash_slave_rows::is_empty(void)
1015 {
1016   return (m_hash.records == 0);
1017 }
1018 
1019 /**
1020    Hashing commodity structures and functions.
1021  */
1022 
init(void)1023 bool Hash_slave_rows::init(void)
1024 {
1025   if (my_hash_init(&m_hash,
1026                    &my_charset_bin,                /* the charater set information */
1027                    16 /* TODO */,                  /* growth size */
1028                    0,                              /* key offset */
1029                    0,                              /* key length */
1030                    hash_slave_rows_get_key,                        /* get function pointer */
1031                    (my_hash_free_key) hash_slave_rows_free_entry,  /* freefunction pointer */
1032                    MYF(0),                         /* flags */
1033                    key_memory_HASH_ROW_ENTRY))     /* memory instrumentation key */
1034     return true;
1035   return false;
1036 }
1037 
deinit(void)1038 bool Hash_slave_rows::deinit(void)
1039 {
1040   if (my_hash_inited(&m_hash))
1041     my_hash_free(&m_hash);
1042 
1043   return 0;
1044 }
1045 
size()1046 int Hash_slave_rows::size()
1047 {
1048   return m_hash.records;
1049 }
1050 
make_entry()1051 HASH_ROW_ENTRY* Hash_slave_rows::make_entry()
1052 {
1053   return make_entry(NULL, NULL);
1054 }
1055 
make_entry(const uchar * bi_start,const uchar * bi_ends)1056 HASH_ROW_ENTRY* Hash_slave_rows::make_entry(const uchar* bi_start, const uchar* bi_ends)
1057 {
1058   DBUG_ENTER("Hash_slave_rows::make_entry");
1059 
1060   HASH_ROW_ENTRY *entry= (HASH_ROW_ENTRY*) my_malloc(key_memory_HASH_ROW_ENTRY,
1061                                                      sizeof(HASH_ROW_ENTRY), MYF(0));
1062   HASH_ROW_PREAMBLE *preamble= (HASH_ROW_PREAMBLE *) my_malloc(key_memory_HASH_ROW_ENTRY,
1063                                                                sizeof(HASH_ROW_PREAMBLE), MYF(0));
1064   HASH_ROW_POS *pos= (HASH_ROW_POS *) my_malloc(key_memory_HASH_ROW_ENTRY,
1065                                                 sizeof(HASH_ROW_POS), MYF(0));
1066 
1067   if (!entry || !preamble || !pos ||
1068       DBUG_EVALUATE_IF("fake_myalloc_failure",1, 0))
1069     goto err;
1070 
1071   /**
1072      Filling in the preamble.
1073    */
1074   preamble->hash_value= 0;
1075   preamble->length= sizeof(my_hash_value_type);
1076   preamble->search_state= HASH_ROWS_POS_SEARCH_INVALID;
1077   preamble->is_search_state_inited= false;
1078 
1079   /**
1080      Filling in the positions.
1081    */
1082   pos->bi_start= bi_start;
1083   pos->bi_ends=  bi_ends;
1084 
1085   /**
1086     Filling in the entry
1087    */
1088   entry->preamble= preamble;
1089   entry->positions= pos;
1090 
1091   DBUG_RETURN(entry);
1092 
1093 err:
1094   if (entry)
1095     my_free(entry);
1096   if (preamble)
1097     my_free(preamble);
1098   if (pos)
1099     my_free(pos);
1100   DBUG_RETURN(NULL);
1101 }
1102 
1103 bool
put(TABLE * table,MY_BITMAP * cols,HASH_ROW_ENTRY * entry)1104 Hash_slave_rows::put(TABLE *table,
1105                      MY_BITMAP *cols,
1106                      HASH_ROW_ENTRY* entry)
1107 {
1108 
1109   DBUG_ENTER("Hash_slave_rows::put");
1110 
1111   HASH_ROW_PREAMBLE* preamble= entry->preamble;
1112 
1113   /**
1114      Skip blobs and BIT fields from key calculation.
1115      Handle X bits.
1116      Handle nulled fields.
1117      Handled fields not signaled.
1118   */
1119   preamble->hash_value= make_hash_key(table, cols);
1120 
1121   my_hash_insert(&m_hash, (uchar *) entry);
1122   DBUG_PRINT("debug", ("Added record to hash with key=%u", preamble->hash_value));
1123   DBUG_RETURN(false);
1124 }
1125 
1126 HASH_ROW_ENTRY*
get(TABLE * table,MY_BITMAP * cols)1127 Hash_slave_rows::get(TABLE *table, MY_BITMAP *cols)
1128 {
1129   DBUG_ENTER("Hash_slave_rows::get");
1130   HASH_SEARCH_STATE state;
1131   my_hash_value_type key;
1132   HASH_ROW_ENTRY *entry= NULL;
1133 
1134   key= make_hash_key(table, cols);
1135 
1136   DBUG_PRINT("debug", ("Looking for record with key=%u in the hash.", key));
1137 
1138   entry= (HASH_ROW_ENTRY*) my_hash_first(&m_hash,
1139                                          (const uchar*) &key,
1140                                          sizeof(my_hash_value_type),
1141                                          &state);
1142   if (entry)
1143   {
1144     DBUG_PRINT("debug", ("Found record with key=%u in the hash.", key));
1145 
1146     /**
1147        Save the search state in case we need to go through entries for
1148        the given key.
1149     */
1150     entry->preamble->search_state= state;
1151     entry->preamble->is_search_state_inited= true;
1152   }
1153 
1154   DBUG_RETURN(entry);
1155 }
1156 
next(HASH_ROW_ENTRY ** entry)1157 bool Hash_slave_rows::next(HASH_ROW_ENTRY** entry)
1158 {
1159   DBUG_ENTER("Hash_slave_rows::next");
1160   assert(*entry);
1161 
1162   if (*entry == NULL)
1163     DBUG_RETURN(true);
1164 
1165   HASH_ROW_PREAMBLE *preamble= (*entry)->preamble;
1166 
1167   if (!preamble->is_search_state_inited)
1168     DBUG_RETURN(true);
1169 
1170   my_hash_value_type key= preamble->hash_value;
1171   HASH_SEARCH_STATE state= preamble->search_state;
1172 
1173   /*
1174     Invalidate search for current preamble, because it is going to be
1175     used in the search below (and search state is used in a
1176     one-time-only basis).
1177    */
1178   preamble->search_state= HASH_ROWS_POS_SEARCH_INVALID;
1179   preamble->is_search_state_inited= false;
1180 
1181   DBUG_PRINT("debug", ("Looking for record with key=%u in the hash (next).", key));
1182 
1183   /**
1184      Do the actual search in the hash table.
1185    */
1186   *entry= (HASH_ROW_ENTRY*) my_hash_next(&m_hash,
1187                                          (const uchar*) &key,
1188                                          sizeof(my_hash_value_type),
1189                                          &state);
1190   if (*entry)
1191   {
1192     DBUG_PRINT("debug", ("Found record with key=%u in the hash (next).", key));
1193     preamble= (*entry)->preamble;
1194 
1195     /**
1196        Save the search state for next iteration (if any).
1197      */
1198     preamble->search_state= state;
1199     preamble->is_search_state_inited= true;
1200   }
1201 
1202   DBUG_RETURN(false);
1203 }
1204 
1205 bool
del(HASH_ROW_ENTRY * entry)1206 Hash_slave_rows::del(HASH_ROW_ENTRY *entry)
1207 {
1208   DBUG_ENTER("Hash_slave_rows::del");
1209   assert(entry);
1210 
1211   if (my_hash_delete(&m_hash, (uchar *) entry))
1212     DBUG_RETURN(true);
1213   DBUG_RETURN(false);
1214 }
1215 
1216 my_hash_value_type
make_hash_key(TABLE * table,MY_BITMAP * cols)1217 Hash_slave_rows::make_hash_key(TABLE *table, MY_BITMAP *cols)
1218 {
1219   DBUG_ENTER("Hash_slave_rows::make_hash_key");
1220   ha_checksum crc= 0L;
1221 
1222   uchar *record= table->record[0];
1223   uchar saved_x= 0, saved_filler= 0;
1224 
1225   if (table->s->null_bytes > 0)
1226   {
1227     /*
1228       If we have an X bit then we need to take care of it.
1229     */
1230     if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
1231     {
1232       saved_x= record[0];
1233       record[0]|= 1U;
1234     }
1235 
1236     /*
1237       If (last_null_bit_pos == 0 && null_bytes > 1), then:
1238       X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
1239       Ie, the entire byte is used.
1240     */
1241     if (table->s->last_null_bit_pos > 0)
1242     {
1243       saved_filler= record[table->s->null_bytes - 1];
1244       record[table->s->null_bytes - 1]|=
1245         256U - (1U << table->s->last_null_bit_pos);
1246     }
1247   }
1248 
1249   /*
1250     We can only checksum the bytes if all fields have been signaled
1251     in the before image. Otherwise, unpack_row will not have set the
1252     null_flags correctly (because it only unpacks those fields and
1253     their flags that were actually in the before image).
1254 
1255     @c record_compare, as it also skips null_flags if the read_set
1256     was not marked completely.
1257    */
1258   if (bitmap_is_set_all(cols))
1259   {
1260     crc= checksum_crc32(crc, table->null_flags, table->s->null_bytes);
1261     DBUG_PRINT("debug", ("make_hash_entry: hash after null_flags: %u", crc));
1262   }
1263 
1264   for (Field **ptr=table->field ;
1265        *ptr && ((*ptr)->field_index < cols->n_bits);
1266        ptr++)
1267   {
1268     Field *f= (*ptr);
1269 
1270     /*
1271       Field is set in the read_set and is isn't NULL.
1272      */
1273     if (bitmap_is_set(cols, f->field_index) &&
1274         !f->is_virtual_gcol() && // Avoid virtual generated columns on hashes
1275         !f->is_null())
1276     {
1277       /*
1278         BLOB and VARCHAR have pointers in their field, we must convert
1279         to string; GEOMETRY and JSON are implemented on top of BLOB.
1280         BIT may store its data among NULL bits, convert as well.
1281       */
1282       switch (f->type()) {
1283         case MYSQL_TYPE_BLOB:
1284         case MYSQL_TYPE_VARCHAR:
1285         case MYSQL_TYPE_GEOMETRY:
1286         case MYSQL_TYPE_JSON:
1287         case MYSQL_TYPE_BIT:
1288         {
1289           String tmp;
1290           f->val_str(&tmp);
1291           crc= checksum_crc32(crc, (uchar*) tmp.ptr(), tmp.length());
1292           break;
1293         }
1294         default:
1295           crc= checksum_crc32(crc, f->ptr, f->data_length());
1296           break;
1297       }
1298 #ifndef NDEBUG
1299       String tmp;
1300       f->val_str(&tmp);
1301       DBUG_PRINT("debug", ("make_hash_entry: hash after field %s=%s: %u", f->field_name, tmp.c_ptr_safe(), crc));
1302 #endif
1303     }
1304   }
1305 
1306   /*
1307     Restore the saved bytes.
1308 
1309     TODO[record format ndb]: Remove this code once NDB returns the
1310     correct record format.
1311   */
1312   if (table->s->null_bytes > 0)
1313   {
1314     if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
1315       record[0]= saved_x;
1316 
1317     if (table->s->last_null_bit_pos)
1318       record[table->s->null_bytes - 1]= saved_filler;
1319   }
1320 
1321   DBUG_PRINT("debug", ("Created key=%u", crc));
1322   DBUG_RETURN(crc);
1323 }
1324 
1325 
1326 #endif
1327 
1328 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
1329 
Deferred_log_events(Relay_log_info * rli)1330 Deferred_log_events::Deferred_log_events(Relay_log_info *rli)
1331   : m_array(key_memory_table_def_memory)
1332 {
1333 }
1334 
~Deferred_log_events()1335 Deferred_log_events::~Deferred_log_events()
1336 {
1337   m_array.clear();
1338 }
1339 
add(Log_event * ev)1340 int Deferred_log_events::add(Log_event *ev)
1341 {
1342   m_array.push_back(ev);
1343   ev->worker= NULL; // to mark event busy avoiding deletion
1344   return 0;
1345 }
1346 
is_empty()1347 bool Deferred_log_events::is_empty()
1348 {
1349   return m_array.empty();
1350 }
1351 
execute(Relay_log_info * rli)1352 bool Deferred_log_events::execute(Relay_log_info *rli)
1353 {
1354   bool res= false;
1355 
1356   assert(rli->deferred_events_collecting);
1357 
1358   rli->deferred_events_collecting= false;
1359   for (Log_event **it= m_array.begin(); !res && it != m_array.end(); ++it)
1360   {
1361     Log_event *ev= *it;
1362     res= ev->apply_event(rli);
1363   }
1364   rli->deferred_events_collecting= true;
1365   return res;
1366 }
1367 
rewind()1368 void Deferred_log_events::rewind()
1369 {
1370   /*
1371     Reset preceding Query log event events which execution was
1372     deferred because of slave side filtering.
1373   */
1374   delete_container_pointers(m_array);
1375   m_array.shrink_to_fit();
1376 }
1377 
1378 #endif
1379