1 /* Copyright (c) 2006, 2013, Oracle and/or its affiliates.
2    Copyright (c) 2011, 2013, Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1335  USA */
16 
17 #include "mariadb.h"
18 #include <my_bit.h>
19 #include "rpl_utility.h"
20 #include "log_event.h"
21 
22 #if defined(MYSQL_CLIENT)
23 #error MYSQL_CLIENT must not be defined here
24 #endif
25 
26 #if !defined(MYSQL_SERVER)
27 #error MYSQL_SERVER must be defined here
28 #endif
29 
30 #if defined(HAVE_REPLICATION)
31 #include "rpl_rli.h"
32 #include "sql_select.h"
33 #endif
34 
35 
36 /**
37    Compute the maximum display length of a field.
38 
39    @param sql_type Type of the field
40    @param metadata The metadata from the master for the field.
41    @return Maximum length of the field in bytes.
42 
43    The precise values calculated by field->max_display_length() and
44    calculated by max_display_length_for_field() can differ (by +1 or -1)
45    for integer data types (TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT).
46    This slight difference is not important here, because we call
47    this function only for two *different* integer data types.
48  */
49 static uint32
max_display_length_for_field(const Conv_source & source)50 max_display_length_for_field(const Conv_source &source)
51 {
52   DBUG_PRINT("debug", ("sql_type: %s, metadata: 0x%x",
53                        source.type_handler()->name().ptr(), source.metadata()));
54   return source.type_handler()->max_display_length_for_field(source);
55 }
56 
57 
58 /*
59   Compare the pack lengths of a source field (on the master) and a
60   target field (on the slave).
61 
62   @param sh             Source type handler
63   @param source_length  Source length
64   @param th             Target type hander
65   @param target_length  Target length
66 
67   @retval CONV_TYPE_SUBSET_TO_SUPERSET The length of the source field is
68                                        smaller than the target field.
69   @retval CONV_TYPE_PRECISE            The length of the source and
70                                        the target fields are equal.
71   @retval CONV_TYPE_SUPERSET_TO_SUBSET The length of the source field is
72                                        greater than the target field.
73  */
74 static enum_conv_type
compare_lengths(const Type_handler * sh,uint32 source_length,const Type_handler * th,uint32 target_length)75 compare_lengths(const Type_handler *sh, uint32 source_length,
76                 const Type_handler *th, uint32 target_length)
77 {
78   DBUG_ENTER("compare_lengths");
79   DBUG_PRINT("debug", ("source_length: %lu, source_type: %s,"
80                        " target_length: %lu, target_type: %s",
81                        (unsigned long) source_length, sh->name().ptr(),
82                        (unsigned long) target_length, th->name().ptr()));
83   enum_conv_type result=
84     source_length < target_length ? CONV_TYPE_SUBSET_TO_SUPERSET :
85     source_length > target_length ? CONV_TYPE_SUPERSET_TO_SUBSET :
86                                     CONV_TYPE_PRECISE;
87   DBUG_PRINT("result", ("%d", result));
88   DBUG_RETURN(result);
89 }
90 
91 
92 /**
93   Calculate display length for MySQL56 temporal data types from their metadata.
94   It contains fractional precision in the low 16-bit word.
95 */
96 static uint32
max_display_length_for_temporal2_field(uint32 int_display_length,unsigned int metadata)97 max_display_length_for_temporal2_field(uint32 int_display_length,
98                                        unsigned int metadata)
99 {
100   metadata&= 0x00ff;
101   return int_display_length + metadata + (metadata ? 1 : 0);
102 }
103 
104 
105 uint32
max_display_length_for_field(const Conv_source & src) const106 Type_handler_newdecimal::max_display_length_for_field(const Conv_source &src)
107                                                       const
108 {
109   return src.metadata() >> 8;
110 }
111 
112 
113 uint32
max_display_length_for_field(const Conv_source & src) const114 Type_handler_typelib::max_display_length_for_field(const Conv_source &src)
115                                                    const
116 {
117   /*
118     Field_enum::rpl_conv_type_from() does not use  compare_lengths().
119     So we should not come here.
120   */
121   DBUG_ASSERT(0);
122   return src.metadata() & 0x00ff;
123 }
124 
125 
126 uint32
max_display_length_for_field(const Conv_source & src) const127 Type_handler_string::max_display_length_for_field(const Conv_source &src)
128                                                   const
129 {
130   /*
131     ENUM and SET are transferred using as STRING,
132     with the exact type code in metadata.
133     Make sure that we previously detected ENUM/SET and
134     translated them into a proper type handler.
135     See table_def::field_type_handler() for details.
136   */
137   DBUG_ASSERT((src.metadata() >> 8) != MYSQL_TYPE_SET);
138   DBUG_ASSERT((src.metadata() >> 8) != MYSQL_TYPE_ENUM);
139   /* This is taken from Field_string::unpack. */
140   return (((src.metadata() >> 4) & 0x300) ^ 0x300) + (src.metadata() & 0x00ff);
141 }
142 
143 
144 uint32
max_display_length_for_field(const Conv_source & src) const145 Type_handler_time2::max_display_length_for_field(const Conv_source &src)
146                                                  const
147 {
148   return max_display_length_for_temporal2_field(MIN_TIME_WIDTH,
149                                                 src.metadata());
150 }
151 
152 
153 uint32
max_display_length_for_field(const Conv_source & src) const154 Type_handler_timestamp2::max_display_length_for_field(const Conv_source &src)
155                                                       const
156 {
157   return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH,
158                                                 src.metadata());
159 }
160 
161 
162 uint32
max_display_length_for_field(const Conv_source & src) const163 Type_handler_datetime2::max_display_length_for_field(const Conv_source &src)
164                                                      const
165 {
166   return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH,
167                                                 src.metadata());
168 }
169 
170 
171 uint32
max_display_length_for_field(const Conv_source & src) const172 Type_handler_bit::max_display_length_for_field(const Conv_source &src)
173                                                const
174 {
175   /*
176     Decode the size of the bit field from the master.
177   */
178   DBUG_ASSERT((src.metadata() & 0xff) <= 7);
179   return 8 * (src.metadata() >> 8U) + (src.metadata() & 0x00ff);
180 }
181 
182 
183 uint32
max_display_length_for_field(const Conv_source & src) const184 Type_handler_var_string::max_display_length_for_field(const Conv_source &src)
185                                                       const
186 {
187   return src.metadata();
188 }
189 
190 
191 uint32
max_display_length_for_field(const Conv_source & src) const192 Type_handler_varchar::max_display_length_for_field(const Conv_source &src)
193                                                    const
194 {
195   return src.metadata();
196 }
197 
198 
199 uint32
200 Type_handler_varchar_compressed::
max_display_length_for_field(const Conv_source & src) const201   max_display_length_for_field(const Conv_source &src) const
202 {
203   DBUG_ASSERT(src.metadata() > 0);
204   return src.metadata() - 1;
205 }
206 
207 
208 /*
209   The actual length for these types does not really matter since
210   they are used to calc_pack_length, which ignores the given
211   length for these types.
212 
213   Since we want this to be accurate for other uses, we return the
214   maximum size in bytes of these BLOBs.
215 */
216 uint32
max_display_length_for_field(const Conv_source & src) const217 Type_handler_tiny_blob::max_display_length_for_field(const Conv_source &src)
218                                                      const
219 {
220   return (uint32) my_set_bits(1 * 8);
221 }
222 
223 
224 uint32
max_display_length_for_field(const Conv_source & src) const225 Type_handler_medium_blob::max_display_length_for_field(const Conv_source &src)
226                                                        const
227 {
228   return (uint32) my_set_bits(3 * 8);
229 }
230 
231 
232 uint32
max_display_length_for_field(const Conv_source & src) const233 Type_handler_blob::max_display_length_for_field(const Conv_source &src)
234                                                 const
235 {
236   /*
237     For the blob type, Field::real_type() lies and say that all
238     blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
239     at the length instead to decide what the max display size is.
240    */
241   return (uint32) my_set_bits(src.metadata() * 8);
242 }
243 
244 
245 uint32
max_display_length_for_field(const Conv_source & src) const246 Type_handler_blob_compressed::max_display_length_for_field(const Conv_source &src)
247                                                     const
248 {
249   return (uint32) my_set_bits(src.metadata() * 8);
250 }
251 
252 
253 uint32
max_display_length_for_field(const Conv_source & src) const254 Type_handler_long_blob::max_display_length_for_field(const Conv_source &src)
255                                                      const
256 {
257   return (uint32) my_set_bits(4 * 8);
258 }
259 
260 
261 uint32
max_display_length_for_field(const Conv_source & src) const262 Type_handler_olddecimal::max_display_length_for_field(const Conv_source &src)
263                                                       const
264 {
265   return ~(uint32) 0;
266 }
267 
268 
show_binlog_type(const Conv_source & src,const Field &,String * str) const269 void Type_handler::show_binlog_type(const Conv_source &src, const Field &,
270                                     String *str) const
271 {
272   str->set_ascii(name().ptr(), name().length());
273 }
274 
275 
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const276 void Type_handler_var_string::show_binlog_type(const Conv_source &src,
277                                                const Field &dst,
278                                                String *str) const
279 {
280   CHARSET_INFO *cs= str->charset();
281   const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
282     ? "char(%u octets)" : "binary(%u)";
283   size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
284                                     str->alloced_length(),
285                                     fmt, src.metadata());
286   str->length(length);
287 }
288 
289 
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const290 void Type_handler_varchar::show_binlog_type(const Conv_source &src,
291                                             const Field &dst,
292                                             String *str) const
293 {
294   CHARSET_INFO *cs= str->charset();
295   const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
296     ? "varchar(%u octets)" : "varbinary(%u)";
297   size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
298                                     str->alloced_length(),
299                                     fmt, src.metadata());
300   str->length(length);
301 }
302 
303 
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const304 void Type_handler_varchar_compressed::show_binlog_type(const Conv_source &src,
305                                                        const Field &dst,
306                                                        String *str) const
307 {
308   CHARSET_INFO *cs= str->charset();
309   const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
310     ? "varchar(%u octets) compressed" : "varbinary(%u) compressed";
311   size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
312                                     str->alloced_length(),
313                                     fmt, src.metadata());
314   str->length(length);
315 }
316 
show_binlog_type(const Conv_source & src,const Field &,String * str) const317 void Type_handler_bit::show_binlog_type(const Conv_source &src, const Field &,
318                                         String *str) const
319 {
320   CHARSET_INFO *cs= str->charset();
321   int bit_length= 8 * (src.metadata() >> 8) + (src.metadata() & 0xFF);
322   size_t length=
323     cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
324                        "bit(%d)", bit_length);
325   str->length(length);
326 }
327 
328 
show_binlog_type(const Conv_source & src,const Field &,String * str) const329 void Type_handler_olddecimal::show_binlog_type(const Conv_source &src,
330                                                const Field &,
331                                                String *str) const
332 {
333   CHARSET_INFO *cs= str->charset();
334   size_t length=
335     cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
336                        "decimal(%d,?)/*old*/", src.metadata());
337   str->length(length);
338 
339 }
340 
341 
show_binlog_type(const Conv_source & src,const Field &,String * str) const342 void Type_handler_newdecimal::show_binlog_type(const Conv_source &src,
343                                                const Field &,
344                                                String *str) const
345 {
346   CHARSET_INFO *cs= str->charset();
347   size_t length=
348     cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
349                        "decimal(%d,%d)",
350                        src.metadata() >> 8, src.metadata() & 0xff);
351   str->length(length);
352 }
353 
354 
show_binlog_type(const Conv_source & src,const Field &,String * str) const355 void Type_handler_blob_compressed::show_binlog_type(const Conv_source &src,
356                                                     const Field &,
357                                                     String *str) const
358 {
359   /*
360     Field::real_type() lies regarding the actual type of a BLOB, so
361     it is necessary to check the pack length to figure out what kind
362     of blob it really is.
363    */
364   switch (src.metadata()) {
365     case 1:
366       str->set_ascii(STRING_WITH_LEN("tinyblob compressed"));
367       break;
368     case 2:
369       str->set_ascii(STRING_WITH_LEN("blob compressed"));
370       break;
371     case 3:
372       str->set_ascii(STRING_WITH_LEN("mediumblob compressed"));
373       break;
374     default:
375       DBUG_ASSERT(0);
376       // Fall through
377     case 4:
378       str->set_ascii(STRING_WITH_LEN("longblob compressed"));
379   }
380 }
381 
382 
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const383 void Type_handler_string::show_binlog_type(const Conv_source &src,
384                                            const Field &dst,
385                                            String *str) const
386 {
387   /*
388     This is taken from Field_string::unpack.
389   */
390   CHARSET_INFO *cs= str->charset();
391   uint bytes= (((src.metadata() >> 4) & 0x300) ^ 0x300) +
392               (src.metadata() & 0x00ff);
393   const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
394     ? "char(%u octets)" : "binary(%u)";
395   size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
396                                     str->alloced_length(),
397                                     fmt, bytes);
398   str->length(length);
399 }
400 
401 
402 enum_conv_type
rpl_conv_type_from_same_data_type(uint16 metadata,const Relay_log_info * rli,const Conv_param & param) const403 Field::rpl_conv_type_from_same_data_type(uint16 metadata,
404                                          const Relay_log_info *rli,
405                                          const Conv_param &param) const
406 {
407   if (metadata == 0) // Metadata can only be zero if no metadata was provided
408   {
409     /*
410       If there is no metadata, we either have an old event where no
411       metadata were supplied, or a type that does not require any
412       metadata. In either case, conversion can be done but no
413       conversion table is necessary.
414      */
415     DBUG_PRINT("debug", ("Base types are identical, but there is no metadata"));
416     return CONV_TYPE_PRECISE;
417   }
418 
419   DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
420   int order= 0;
421   if (!compatible_field_size(metadata, rli, param.table_def_flags(), &order))
422     return CONV_TYPE_IMPOSSIBLE;
423   return order == 0 ? CONV_TYPE_PRECISE :
424          order < 0  ? CONV_TYPE_SUBSET_TO_SUPERSET :
425                       CONV_TYPE_SUPERSET_TO_SUBSET;
426 }
427 
428 
429 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const430 Field_new_decimal::rpl_conv_type_from(const Conv_source &source,
431                                       const Relay_log_info *rli,
432                                       const Conv_param &param) const
433 {
434   if (binlog_type() == source.real_field_type())
435     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
436   if (source.type_handler() == &type_handler_olddecimal ||
437       source.type_handler() == &type_handler_newdecimal ||
438       source.type_handler() == &type_handler_float ||
439       source.type_handler() == &type_handler_double)
440   {
441     /*
442       Then the other type is either FLOAT, DOUBLE, or old style
443       DECIMAL, so we require lossy conversion.
444     */
445     return CONV_TYPE_SUPERSET_TO_SUBSET;
446   }
447   return CONV_TYPE_IMPOSSIBLE;
448 }
449 
450 
451 /*
452   This covers FLOAT, DOUBLE and old DECIMAL
453 */
454 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const455 Field_real::rpl_conv_type_from(const Conv_source &source,
456                                const Relay_log_info *rli,
457                                const Conv_param &param) const
458 {
459   if (binlog_type() == source.real_field_type())
460     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
461   if (source.type_handler() == &type_handler_olddecimal ||
462       source.type_handler() == &type_handler_newdecimal)
463     return CONV_TYPE_SUPERSET_TO_SUBSET;  // Always require lossy conversions
464   if (source.type_handler() == &type_handler_float ||
465       source.type_handler() == &type_handler_double)
466   {
467     enum_conv_type order= compare_lengths(source.type_handler(),
468                                           max_display_length_for_field(source),
469                                           type_handler(), max_display_length());
470     DBUG_ASSERT(order != CONV_TYPE_PRECISE);
471     return order;
472   }
473   return CONV_TYPE_IMPOSSIBLE;
474 }
475 
476 
477 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const478 Field_int::rpl_conv_type_from(const Conv_source &source,
479                               const Relay_log_info *rli,
480                               const Conv_param &param) const
481 {
482   if (binlog_type() == source.real_field_type())
483     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
484   /*
485     The length comparison check will do the correct job of comparing
486     the field lengths (in bytes) of two integer types.
487   */
488   if (source.type_handler() == &type_handler_stiny  ||
489       source.type_handler() == &type_handler_sshort ||
490       source.type_handler() == &type_handler_sint24 ||
491       source.type_handler() == &type_handler_slong  ||
492       source.type_handler() == &type_handler_slonglong)
493   {
494     /*
495       max_display_length_for_field() is not fully precise for the integer
496       data types. So its result cannot be compared to the result of
497       max_dispay_length() when the table field and the binlog field
498       are of the same type.
499       This code should eventually be rewritten not to use
500       compare_lengths(), to detect subtype/supetype relations
501       just using the type codes.
502     */
503     DBUG_ASSERT(source.real_field_type() != real_type());
504     enum_conv_type order= compare_lengths(source.type_handler(),
505                                           max_display_length_for_field(source),
506                                           type_handler(), max_display_length());
507     DBUG_ASSERT(order != CONV_TYPE_PRECISE);
508     return order;
509   }
510   return CONV_TYPE_IMPOSSIBLE;
511 }
512 
513 
514 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const515 Field_enum::rpl_conv_type_from(const Conv_source &source,
516                                const Relay_log_info *rli,
517                                const Conv_param &param) const
518 {
519   /*
520     For some reasons Field_enum and Field_set store MYSQL_TYPE_STRING
521     as a type code in the binary log and encode the real type in metadata.
522     So we need to test real_type() here instread of binlog_type().
523   */
524   return real_type() == source.real_field_type() ?
525          rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
526          CONV_TYPE_IMPOSSIBLE;
527 }
528 
529 
530 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const531 Field_longstr::rpl_conv_type_from(const Conv_source &source,
532                                   const Relay_log_info *rli,
533                                   const Conv_param &param) const
534 {
535   /**
536     @todo
537       Implement Field_varstring_compressed::real_type() and
538       Field_blob_compressed::real_type() properly. All occurencies
539       of Field::real_type() have to be inspected and adjusted if needed.
540 
541       Until it is not ready we have to compare source_type against
542       binlog_type() when replicating from or to compressed data types.
543 
544       @sa Comment for Field::binlog_type()
545   */
546   bool same_type;
547   if (source.real_field_type() == MYSQL_TYPE_VARCHAR_COMPRESSED ||
548       source.real_field_type() == MYSQL_TYPE_BLOB_COMPRESSED ||
549       binlog_type() == MYSQL_TYPE_VARCHAR_COMPRESSED ||
550       binlog_type() == MYSQL_TYPE_BLOB_COMPRESSED)
551     same_type= binlog_type() == source.real_field_type();
552   else if (Type_handler_json_common::is_json_type_handler(type_handler()))
553     same_type= type_handler()->type_handler_base() == source.type_handler();
554   else
555     same_type= type_handler() == source.type_handler();
556 
557   if (same_type)
558     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
559 
560   if (source.type_handler() == &type_handler_tiny_blob ||
561       source.type_handler() == &type_handler_medium_blob ||
562       source.type_handler() == &type_handler_long_blob ||
563       source.type_handler() == &type_handler_blob ||
564       source.type_handler() == &type_handler_blob_compressed ||
565       source.type_handler() == &type_handler_string ||
566       source.type_handler() == &type_handler_var_string ||
567       source.type_handler() == &type_handler_varchar ||
568       source.type_handler() == &type_handler_varchar_compressed)
569   {
570     enum_conv_type order= compare_lengths(source.type_handler(),
571                                           max_display_length_for_field(source),
572                                           type_handler(), max_display_length());
573     /*
574       Here we know that the types are different, so if the order
575       gives that they do not require any conversion, we still need
576       to have non-lossy conversion enabled to allow conversion
577       between different (string) types of the same length.
578 
579       Also, if all conversions are disabled, it is not allowed to convert
580       between these types. Since the TEXT vs. BINARY is distinguished by
581       the charset, and the charset is not replicated, we cannot
582       currently distinguish between , e.g., TEXT and BLOB.
583      */
584     if (order == CONV_TYPE_PRECISE)
585       order= CONV_TYPE_SUBSET_TO_SUPERSET;
586     return order;
587   }
588   return CONV_TYPE_IMPOSSIBLE;
589 }
590 
591 
592 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const593 Field_newdate::rpl_conv_type_from(const Conv_source &source,
594                                   const Relay_log_info *rli,
595                                   const Conv_param &param) const
596 {
597   if (real_type() == source.real_field_type())
598     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
599   if (source.type_handler() == &type_handler_datetime2)
600     return CONV_TYPE_SUPERSET_TO_SUBSET;
601   return CONV_TYPE_IMPOSSIBLE;
602 }
603 
604 
605 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const606 Field_time::rpl_conv_type_from(const Conv_source &source,
607                                const Relay_log_info *rli,
608                                const Conv_param &param) const
609 {
610   if (binlog_type() == source.real_field_type())
611     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
612   // 'MySQL56 TIME(N)' -> 'MariaDB-5.3 TIME(N)' is non-lossy
613   if (decimals() == source.metadata() &&
614        source.type_handler() == &type_handler_time2)
615     return CONV_TYPE_VARIANT; // TODO: conversion from FSP1>FSP2
616   return CONV_TYPE_IMPOSSIBLE;
617 }
618 
619 
620 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const621 Field_timef::rpl_conv_type_from(const Conv_source &source,
622                                 const Relay_log_info *rli,
623                                 const Conv_param &param) const
624 {
625   if (binlog_type() == source.real_field_type())
626     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
627   /*
628     See comment in Field_datetimef::rpl_conv_type_from()
629     'MariaDB-5.3 TIME(0)' to 'MySQL56 TIME(0)' is non-lossy
630   */
631   if (source.metadata() == 0 && source.type_handler() == &type_handler_time)
632     return CONV_TYPE_VARIANT;
633   return CONV_TYPE_IMPOSSIBLE;
634 }
635 
636 
637 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const638 Field_timestamp::rpl_conv_type_from(const Conv_source &source,
639                                     const Relay_log_info *rli,
640                                     const Conv_param &param) const
641 {
642   if (binlog_type() == source.real_field_type())
643     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
644   // 'MySQL56 TIMESTAMP(N)' -> MariaDB-5.3 TIMESTAMP(N)' is non-lossy
645   if (source.metadata() == decimals() &&
646       source.type_handler() == &type_handler_timestamp2)
647     return CONV_TYPE_VARIANT; // TODO: conversion from FSP1>FSP2
648   return CONV_TYPE_IMPOSSIBLE;
649 }
650 
651 
652 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const653 Field_timestampf::rpl_conv_type_from(const Conv_source &source,
654                                      const Relay_log_info *rli,
655                                      const Conv_param &param) const
656 {
657   if (binlog_type() == source.real_field_type())
658     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
659   /*
660     See comment in Field_datetimef::rpl_conv_type_from()
661     'MariaDB-5.3 TIMESTAMP(0)' to 'MySQL56 TIMESTAMP(0)' is non-lossy
662   */
663   if (source.metadata() == 0 &&
664       source.type_handler() == &type_handler_timestamp)
665     return CONV_TYPE_VARIANT;
666   return CONV_TYPE_IMPOSSIBLE;
667 }
668 
669 
670 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const671 Field_datetime::rpl_conv_type_from(const Conv_source &source,
672                                    const Relay_log_info *rli,
673                                    const Conv_param &param) const
674 {
675   if (binlog_type() == source.real_field_type())
676     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
677   // 'MySQL56 DATETIME(N)' -> MariaDB-5.3 DATETIME(N) is non-lossy
678   if (source.metadata() == decimals() &&
679       source.type_handler() == &type_handler_datetime2)
680     return CONV_TYPE_VARIANT; // TODO: conversion from FSP1>FSP2
681   if (source.type_handler() == &type_handler_newdate)
682     return CONV_TYPE_SUBSET_TO_SUPERSET;
683   return CONV_TYPE_IMPOSSIBLE;
684 }
685 
686 
687 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const688 Field_datetimef::rpl_conv_type_from(const Conv_source &source,
689                                     const Relay_log_info *rli,
690                                     const Conv_param &param) const
691 {
692   if (binlog_type() == source.real_field_type())
693     return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
694   /*
695     'MariaDB-5.3 DATETIME(N)' does not provide information about fractional
696     precision in metadata. So we assume the precision on the master is equal
697     to the precision on the slave.
698     TODO: See MDEV-17394 what happend in case precisions are in case different
699     'MariaDB-5.3 DATETIME(0)' to 'MySQL56 DATETIME(0)' is non-lossy
700   */
701   if (source.metadata() == 0 &&
702       source.type_handler() == &type_handler_datetime)
703     return CONV_TYPE_VARIANT;
704   if (source.type_handler() == &type_handler_newdate)
705     return CONV_TYPE_SUBSET_TO_SUPERSET;
706   return CONV_TYPE_IMPOSSIBLE;
707 }
708 
709 
710 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const711 Field_date::rpl_conv_type_from(const Conv_source &source,
712                                const Relay_log_info *rli,
713                                const Conv_param &param) const
714 {
715   // old DATE
716   return binlog_type() == source.real_field_type() ?
717          rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
718          CONV_TYPE_IMPOSSIBLE;
719 }
720 
721 
722 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const723 Field_bit::rpl_conv_type_from(const Conv_source &source,
724                               const Relay_log_info *rli,
725                               const Conv_param &param) const
726 {
727   return binlog_type() == source.real_field_type() ?
728          rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
729          CONV_TYPE_IMPOSSIBLE;
730 }
731 
732 
733 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const734 Field_year::rpl_conv_type_from(const Conv_source &source,
735                                const Relay_log_info *rli,
736                                const Conv_param &param) const
737 {
738   return binlog_type() == source.real_field_type() ?
739          rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
740          CONV_TYPE_IMPOSSIBLE;
741 }
742 
743 
744 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const745 Field_null::rpl_conv_type_from(const Conv_source &source,
746                                const Relay_log_info *rli,
747                                const Conv_param &param) const
748 {
749   DBUG_ASSERT(0);
750   return CONV_TYPE_IMPOSSIBLE;
751 }
752 
753 
754 /**********************************************************************/
755 
756 
757 #if defined(HAVE_REPLICATION)
758 
759 /**
760  */
show_sql_type(const Conv_source & src,const Field & dst,String * str)761 static void show_sql_type(const Conv_source &src, const Field &dst,
762                           String *str)
763 {
764   DBUG_ENTER("show_sql_type");
765   DBUG_ASSERT(src.type_handler() != NULL);
766   DBUG_PRINT("enter", ("type: %s, metadata: 0x%x",
767                        src.type_handler()->name().ptr(), src.metadata()));
768   src.type_handler()->show_binlog_type(src, dst, str);
769   DBUG_VOID_RETURN;
770 }
771 
772 
773 /**
774    Check the order variable and print errors if the order is not
775    acceptable according to the current settings.
776 
777    @param order  The computed order of the conversion needed.
778    @param rli    The relay log info data structure: for error reporting.
779  */
is_conversion_ok(enum_conv_type type,const Relay_log_info * rli,ulonglong type_conversion_options)780 static bool is_conversion_ok(enum_conv_type type, const Relay_log_info *rli,
781                              ulonglong type_conversion_options)
782 {
783   DBUG_ENTER("is_conversion_ok");
784   bool allow_non_lossy, allow_lossy;
785 
786   allow_non_lossy= type_conversion_options &
787                     (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
788   allow_lossy= type_conversion_options &
789                (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
790 
791   DBUG_PRINT("enter", ("order: %d, flags:%s%s", (int) type,
792                        allow_non_lossy ? " ALL_NON_LOSSY" : "",
793                        allow_lossy ? " ALL_LOSSY" : ""));
794   switch (type) {
795   case CONV_TYPE_PRECISE:
796   case CONV_TYPE_VARIANT:
797     DBUG_RETURN(true);
798   case CONV_TYPE_SUBSET_TO_SUPERSET:
799     /* !!! Add error message saying that non-lossy conversions need to be allowed. */
800     DBUG_RETURN(allow_non_lossy);
801   case CONV_TYPE_SUPERSET_TO_SUBSET:
802     /* !!! Add error message saying that lossy conversions need to be allowed. */
803     DBUG_RETURN(allow_lossy);
804   case CONV_TYPE_IMPOSSIBLE:
805     DBUG_RETURN(false);
806   }
807 
808   DBUG_RETURN(false);
809 }
810 
811 
812 /**
813    Can a type potentially be converted to another type?
814 
815    This function check if the types are convertible and what
816    conversion is required.
817 
818    If conversion is not possible, and error is printed.
819 
820    If conversion is possible:
821 
822    - *order will be set to -1 if source type is smaller than target
823      type and a non-lossy conversion can be required. This includes
824      the case where the field types are different but types could
825      actually be converted in either direction.
826 
827    - *order will be set to 0 if no conversion is required.
828 
829    - *order will be set to 1 if the source type is strictly larger
830       than the target type and that conversion is potentially lossy.
831 
832    @param[in] field    Target field
833    @param[in] type     Source field type
834    @param[in] metadata Source field metadata
835    @param[in] rli      Relay log info (for error reporting)
836    @param[in] mflags   Flags from the table map event
837    @param[out] order   Order between source field and target field
838 
839    @return @c true if conversion is possible according to the current
840    settings, @c false if conversion is not possible according to the
841    current setting.
842  */
843 static enum_conv_type
can_convert_field_to(Field * field,const Conv_source & source,const Relay_log_info * rli,const Conv_param & param)844 can_convert_field_to(Field *field, const Conv_source &source,
845                      const Relay_log_info *rli,
846                      const Conv_param &param)
847 {
848   DBUG_ENTER("can_convert_field_to");
849 #ifndef DBUG_OFF
850   char field_type_buf[MAX_FIELD_WIDTH];
851   String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
852   field->sql_type(field_type);
853   DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
854                        field_type.c_ptr_safe(), field->real_type(),
855                        source.real_field_type(), source.metadata()));
856 #endif
857   DBUG_RETURN(field->rpl_conv_type_from(source, rli, param));
858 }
859 
860 
field_type_handler(uint col) const861 const Type_handler *table_def::field_type_handler(uint col) const
862 {
863   enum_field_types typecode= binlog_type(col);
864   uint16 metadata= field_metadata(col);
865   DBUG_ASSERT(typecode != MYSQL_TYPE_ENUM);
866   DBUG_ASSERT(typecode != MYSQL_TYPE_SET);
867 
868   if (typecode == MYSQL_TYPE_BLOB)
869   {
870     switch (metadata & 0xff) {
871     case 1: return &type_handler_tiny_blob;
872     case 2: return &type_handler_blob;
873     case 3: return &type_handler_medium_blob;
874     case 4: return &type_handler_long_blob;
875     default: return NULL;
876     }
877   }
878   if (typecode == MYSQL_TYPE_STRING)
879   {
880     uchar typecode2= metadata >> 8;
881     if (typecode2 == MYSQL_TYPE_SET)
882       return &type_handler_set;
883     if (typecode2 == MYSQL_TYPE_ENUM)
884       return &type_handler_enum;
885     return &type_handler_string;
886   }
887   /*
888     This type has not been used since before row-based replication,
889     so we can safely assume that it really is MYSQL_TYPE_NEWDATE.
890   */
891   if (typecode == MYSQL_TYPE_DATE)
892     return &type_handler_newdate;
893   return Type_handler::get_handler_by_real_type(typecode);
894 }
895 
896 
897 /**
898   Is the definition compatible with a table?
899 
900   This function will compare the master table with an existing table
901   on the slave and see if they are compatible with respect to the
902   current settings of @c SLAVE_TYPE_CONVERSIONS.
903 
904   If the tables are compatible and conversions are required, @c
905   *tmp_table_var will be set to a virtual temporary table with field
906   pointers for the fields that require conversions.  This allow simple
907   checking of whether a conversion are to be applied or not.
908 
909   If tables are compatible, but no conversions are necessary, @c
910   *tmp_table_var will be set to NULL.
911 
912   @param rli_arg[in]
913   Relay log info, for error reporting.
914 
915   @param table[in]
916   Table to compare with
917 
918   @param tmp_table_var[out]
919   Virtual temporary table for performing conversions, if necessary.
920 
921   @retval true Master table is compatible with slave table.
922   @retval false Master table is not compatible with slave table.
923 */
924 bool
compatible_with(THD * thd,rpl_group_info * rgi,TABLE * table,TABLE ** conv_table_var) const925 table_def::compatible_with(THD *thd, rpl_group_info *rgi,
926                            TABLE *table, TABLE **conv_table_var)
927   const
928 {
929   /*
930     We only check the initial columns for the tables.
931   */
932   uint const cols_to_check= MY_MIN(table->s->fields, size());
933   Relay_log_info *rli= rgi->rli;
934   TABLE *tmp_table= NULL;
935 
936   for (uint col= 0 ; col < cols_to_check ; ++col)
937   {
938     Field *const field= table->field[col];
939     const Type_handler *h= field_type_handler(col);
940     if (!h)
941     {
942       sql_print_error("In RBR mode, Slave received unknown field type field %d "
943                       " for column Name: %s.%s.%s.",
944                       binlog_type(col),
945                       field->table->s->db.str,
946                       field->table->s->table_name.str,
947                       field->field_name.str);
948       return false;
949     }
950 
951     if (!h)
952       return false; // An unknown data type found in the binary log
953     Conv_source source(h, field_metadata(col), field->charset());
954     enum_conv_type convtype= can_convert_field_to(field, source, rli,
955                                                   Conv_param(m_flags));
956     if (is_conversion_ok(convtype, rli, slave_type_conversions_options))
957     {
958       DBUG_PRINT("debug", ("Checking column %d -"
959                            " field '%s' can be converted - order: %d",
960                            col, field->field_name.str, convtype));
961       /*
962         If conversion type is not CONV_TYPE_RECISE, a conversion is required,
963         so we need to set up the conversion table.
964        */
965       if (convtype != CONV_TYPE_PRECISE && tmp_table == NULL)
966       {
967         /*
968           This will create the full table with all fields. This is
969           necessary to ge the correct field lengths for the record.
970         */
971         tmp_table= create_conversion_table(thd, rgi, table);
972         if (tmp_table == NULL)
973             return false;
974         /*
975           Clear all fields up to, but not including, this column.
976         */
977         for (unsigned int i= 0; i < col; ++i)
978           tmp_table->field[i]= NULL;
979       }
980 
981       if (convtype == CONV_TYPE_PRECISE && tmp_table != NULL)
982         tmp_table->field[col]= NULL;
983     }
984     else
985     {
986       DBUG_PRINT("debug", ("Checking column %d -"
987                            " field '%s' can not be converted",
988                            col, field->field_name.str));
989       DBUG_ASSERT(col < size() && col < table->s->fields);
990       DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
991       DBUG_ASSERT(table->in_use);
992       const char *db_name= table->s->db.str;
993       const char *tbl_name= table->s->table_name.str;
994       StringBuffer<MAX_FIELD_WIDTH> source_type(&my_charset_latin1);
995       StringBuffer<MAX_FIELD_WIDTH> target_type(&my_charset_latin1);
996       THD *thd= table->in_use;
997 
998       show_sql_type(source, *field, &source_type);
999       field->sql_rpl_type(&target_type);
1000       DBUG_ASSERT(source_type.length() > 0);
1001       DBUG_ASSERT(target_type.length() > 0);
1002       rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED, rgi->gtid_info(),
1003                   ER_THD(thd, ER_SLAVE_CONVERSION_FAILED),
1004                   col, db_name, tbl_name,
1005                   source_type.c_ptr_safe(), target_type.c_ptr_safe());
1006       return false;
1007     }
1008   }
1009 
1010 #ifndef DBUG_OFF
1011   if (tmp_table)
1012   {
1013     for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
1014       if (tmp_table->field[col])
1015       {
1016         char source_buf[MAX_FIELD_WIDTH];
1017         char target_buf[MAX_FIELD_WIDTH];
1018         String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
1019         String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
1020         tmp_table->field[col]->sql_type(source_type);
1021         table->field[col]->sql_type(target_type);
1022         DBUG_PRINT("debug", ("Field %s - conversion required."
1023                              " Source type: '%s', Target type: '%s'",
1024                              tmp_table->field[col]->field_name.str,
1025                              source_type.c_ptr_safe(), target_type.c_ptr_safe()));
1026       }
1027   }
1028 #endif
1029 
1030   *conv_table_var= tmp_table;
1031   return true;
1032 }
1033 
1034 
1035 /**
1036   A wrapper to Virtual_tmp_table, to get access to its constructor,
1037   which is protected for safety purposes (against illegal use on stack).
1038 */
1039 class Virtual_conversion_table: public Virtual_tmp_table
1040 {
1041 public:
Virtual_conversion_table(THD * thd)1042   Virtual_conversion_table(THD *thd) :Virtual_tmp_table(thd) { }
1043   /**
1044     Add a new field into the virtual table.
1045     @param handler      - The type handler of the field.
1046     @param metadata     - The RBR binary log metadata for this field.
1047     @param target_field - The field from the target table, to get extra
1048                           attributes from (e.g. typelib in case of ENUM).
1049   */
add(const Type_handler * handler,uint16 metadata,const Field * target_field)1050   bool add(const Type_handler *handler,
1051            uint16 metadata, const Field *target_field)
1052   {
1053     Field *tmp= handler->make_conversion_table_field(in_use->mem_root,
1054                                                      this, metadata,
1055                                                      target_field);
1056     if (!tmp)
1057       return true;
1058     Virtual_tmp_table::add(tmp);
1059     DBUG_PRINT("debug", ("sql_type: %s, target_field: '%s', max_length: %d, decimals: %d,"
1060                          " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
1061                          handler->name().ptr(), target_field->field_name.str,
1062                          tmp->field_length, tmp->decimals(), TRUE,
1063                          tmp->flags, tmp->pack_length()));
1064     return false;
1065   }
1066 };
1067 
1068 
1069 /**
1070   Create a conversion table.
1071 
1072   If the function is unable to create the conversion table, an error
1073   will be printed and NULL will be returned.
1074 
1075   @return Pointer to conversion table, or NULL if unable to create
1076   conversion table.
1077  */
1078 
create_conversion_table(THD * thd,rpl_group_info * rgi,TABLE * target_table) const1079 TABLE *table_def::create_conversion_table(THD *thd, rpl_group_info *rgi,
1080                                           TABLE *target_table) const
1081 {
1082   DBUG_ENTER("table_def::create_conversion_table");
1083 
1084   Virtual_conversion_table *conv_table;
1085   Relay_log_info *rli= rgi->rli;
1086   /*
1087     At slave, columns may differ. So we should create
1088     MY_MIN(columns@master, columns@slave) columns in the
1089     conversion table.
1090   */
1091   uint const cols_to_create= MY_MIN(target_table->s->fields, size());
1092   if (!(conv_table= new(thd) Virtual_conversion_table(thd)) ||
1093       conv_table->init(cols_to_create))
1094     goto err;
1095   for (uint col= 0 ; col < cols_to_create; ++col)
1096   {
1097     const Type_handler *ha= field_type_handler(col);
1098     DBUG_ASSERT(ha); // Checked at compatible_with() time
1099     if (conv_table->add(ha, field_metadata(col), target_table->field[col]))
1100     {
1101       DBUG_PRINT("debug", ("binlog_type: %d, metadata: %04X, target_field: '%s'"
1102                            " make_conversion_table_field() failed",
1103                            binlog_type(col), field_metadata(col),
1104                            target_table->field[col]->field_name.str));
1105       goto err;
1106     }
1107   }
1108 
1109   if (conv_table->open())
1110     goto err; // Could not allocate record buffer?
1111 
1112   DBUG_RETURN(conv_table);
1113 
1114 err:
1115   if (conv_table)
1116     delete conv_table;
1117   rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION, rgi->gtid_info(),
1118               ER_THD(thd, ER_SLAVE_CANT_CREATE_CONVERSION),
1119               target_table->s->db.str,
1120               target_table->s->table_name.str);
1121   DBUG_RETURN(NULL);
1122 }
1123 
1124 
1125 
Deferred_log_events(Relay_log_info * rli)1126 Deferred_log_events::Deferred_log_events(Relay_log_info *rli) : last_added(NULL)
1127 {
1128   my_init_dynamic_array(PSI_INSTRUMENT_ME, &array, sizeof(Log_event *), 32, 16, MYF(0));
1129 }
1130 
~Deferred_log_events()1131 Deferred_log_events::~Deferred_log_events()
1132 {
1133   delete_dynamic(&array);
1134 }
1135 
add(Log_event * ev)1136 int Deferred_log_events::add(Log_event *ev)
1137 {
1138   last_added= ev;
1139   insert_dynamic(&array, (uchar*) &ev);
1140   return 0;
1141 }
1142 
is_empty()1143 bool Deferred_log_events::is_empty()
1144 {
1145   return array.elements == 0;
1146 }
1147 
execute(rpl_group_info * rgi)1148 bool Deferred_log_events::execute(rpl_group_info *rgi)
1149 {
1150   bool res= false;
1151   DBUG_ENTER("Deferred_log_events::execute");
1152   DBUG_ASSERT(rgi->deferred_events_collecting);
1153 
1154   rgi->deferred_events_collecting= false;
1155   for (uint i=  0; !res && i < array.elements; i++)
1156   {
1157     Log_event *ev= (* (Log_event **)
1158                     dynamic_array_ptr(&array, i));
1159     res= ev->apply_event(rgi);
1160   }
1161   rgi->deferred_events_collecting= true;
1162   DBUG_RETURN(res);
1163 }
1164 
rewind()1165 void Deferred_log_events::rewind()
1166 {
1167   /*
1168     Reset preceding Query log event events which execution was
1169     deferred because of slave side filtering.
1170   */
1171   if (!is_empty())
1172   {
1173     for (uint i=  0; i < array.elements; i++)
1174     {
1175       Log_event *ev= *(Log_event **) dynamic_array_ptr(&array, i);
1176       delete ev;
1177     }
1178     last_added= NULL;
1179     if (array.elements > array.max_element)
1180       freeze_size(&array);
1181     reset_dynamic(&array);
1182   }
1183   last_added= NULL;
1184 }
1185 
1186 #endif // defined(HAVE_REPLICATION)
1187 
1188