1 /* Copyright (c) 2006, 2013, Oracle and/or its affiliates.
2 Copyright (c) 2011, 2013, Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "mariadb.h"
18 #include <my_bit.h>
19 #include "rpl_utility.h"
20 #include "log_event.h"
21
22 #if defined(MYSQL_CLIENT)
23 #error MYSQL_CLIENT must not be defined here
24 #endif
25
26 #if !defined(MYSQL_SERVER)
27 #error MYSQL_SERVER must be defined here
28 #endif
29
30 #if defined(HAVE_REPLICATION)
31 #include "rpl_rli.h"
32 #include "sql_select.h"
33 #endif
34
35
36 /**
37 Compute the maximum display length of a field.
38
39 @param sql_type Type of the field
40 @param metadata The metadata from the master for the field.
41 @return Maximum length of the field in bytes.
42
43 The precise values calculated by field->max_display_length() and
44 calculated by max_display_length_for_field() can differ (by +1 or -1)
45 for integer data types (TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT).
46 This slight difference is not important here, because we call
47 this function only for two *different* integer data types.
48 */
49 static uint32
max_display_length_for_field(const Conv_source & source)50 max_display_length_for_field(const Conv_source &source)
51 {
52 DBUG_PRINT("debug", ("sql_type: %s, metadata: 0x%x",
53 source.type_handler()->name().ptr(), source.metadata()));
54 return source.type_handler()->max_display_length_for_field(source);
55 }
56
57
58 /*
59 Compare the pack lengths of a source field (on the master) and a
60 target field (on the slave).
61
62 @param sh Source type handler
63 @param source_length Source length
64 @param th Target type hander
65 @param target_length Target length
66
67 @retval CONV_TYPE_SUBSET_TO_SUPERSET The length of the source field is
68 smaller than the target field.
69 @retval CONV_TYPE_PRECISE The length of the source and
70 the target fields are equal.
71 @retval CONV_TYPE_SUPERSET_TO_SUBSET The length of the source field is
72 greater than the target field.
73 */
74 static enum_conv_type
compare_lengths(const Type_handler * sh,uint32 source_length,const Type_handler * th,uint32 target_length)75 compare_lengths(const Type_handler *sh, uint32 source_length,
76 const Type_handler *th, uint32 target_length)
77 {
78 DBUG_ENTER("compare_lengths");
79 DBUG_PRINT("debug", ("source_length: %lu, source_type: %s,"
80 " target_length: %lu, target_type: %s",
81 (unsigned long) source_length, sh->name().ptr(),
82 (unsigned long) target_length, th->name().ptr()));
83 enum_conv_type result=
84 source_length < target_length ? CONV_TYPE_SUBSET_TO_SUPERSET :
85 source_length > target_length ? CONV_TYPE_SUPERSET_TO_SUBSET :
86 CONV_TYPE_PRECISE;
87 DBUG_PRINT("result", ("%d", result));
88 DBUG_RETURN(result);
89 }
90
91
92 /**
93 Calculate display length for MySQL56 temporal data types from their metadata.
94 It contains fractional precision in the low 16-bit word.
95 */
96 static uint32
max_display_length_for_temporal2_field(uint32 int_display_length,unsigned int metadata)97 max_display_length_for_temporal2_field(uint32 int_display_length,
98 unsigned int metadata)
99 {
100 metadata&= 0x00ff;
101 return int_display_length + metadata + (metadata ? 1 : 0);
102 }
103
104
105 uint32
max_display_length_for_field(const Conv_source & src) const106 Type_handler_newdecimal::max_display_length_for_field(const Conv_source &src)
107 const
108 {
109 return src.metadata() >> 8;
110 }
111
112
113 uint32
max_display_length_for_field(const Conv_source & src) const114 Type_handler_typelib::max_display_length_for_field(const Conv_source &src)
115 const
116 {
117 /*
118 Field_enum::rpl_conv_type_from() does not use compare_lengths().
119 So we should not come here.
120 */
121 DBUG_ASSERT(0);
122 return src.metadata() & 0x00ff;
123 }
124
125
126 uint32
max_display_length_for_field(const Conv_source & src) const127 Type_handler_string::max_display_length_for_field(const Conv_source &src)
128 const
129 {
130 /*
131 ENUM and SET are transferred using as STRING,
132 with the exact type code in metadata.
133 Make sure that we previously detected ENUM/SET and
134 translated them into a proper type handler.
135 See table_def::field_type_handler() for details.
136 */
137 DBUG_ASSERT((src.metadata() >> 8) != MYSQL_TYPE_SET);
138 DBUG_ASSERT((src.metadata() >> 8) != MYSQL_TYPE_ENUM);
139 /* This is taken from Field_string::unpack. */
140 return (((src.metadata() >> 4) & 0x300) ^ 0x300) + (src.metadata() & 0x00ff);
141 }
142
143
144 uint32
max_display_length_for_field(const Conv_source & src) const145 Type_handler_time2::max_display_length_for_field(const Conv_source &src)
146 const
147 {
148 return max_display_length_for_temporal2_field(MIN_TIME_WIDTH,
149 src.metadata());
150 }
151
152
153 uint32
max_display_length_for_field(const Conv_source & src) const154 Type_handler_timestamp2::max_display_length_for_field(const Conv_source &src)
155 const
156 {
157 return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH,
158 src.metadata());
159 }
160
161
162 uint32
max_display_length_for_field(const Conv_source & src) const163 Type_handler_datetime2::max_display_length_for_field(const Conv_source &src)
164 const
165 {
166 return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH,
167 src.metadata());
168 }
169
170
171 uint32
max_display_length_for_field(const Conv_source & src) const172 Type_handler_bit::max_display_length_for_field(const Conv_source &src)
173 const
174 {
175 /*
176 Decode the size of the bit field from the master.
177 */
178 DBUG_ASSERT((src.metadata() & 0xff) <= 7);
179 return 8 * (src.metadata() >> 8U) + (src.metadata() & 0x00ff);
180 }
181
182
183 uint32
max_display_length_for_field(const Conv_source & src) const184 Type_handler_var_string::max_display_length_for_field(const Conv_source &src)
185 const
186 {
187 return src.metadata();
188 }
189
190
191 uint32
max_display_length_for_field(const Conv_source & src) const192 Type_handler_varchar::max_display_length_for_field(const Conv_source &src)
193 const
194 {
195 return src.metadata();
196 }
197
198
199 uint32
200 Type_handler_varchar_compressed::
max_display_length_for_field(const Conv_source & src) const201 max_display_length_for_field(const Conv_source &src) const
202 {
203 DBUG_ASSERT(src.metadata() > 0);
204 return src.metadata() - 1;
205 }
206
207
208 /*
209 The actual length for these types does not really matter since
210 they are used to calc_pack_length, which ignores the given
211 length for these types.
212
213 Since we want this to be accurate for other uses, we return the
214 maximum size in bytes of these BLOBs.
215 */
216 uint32
max_display_length_for_field(const Conv_source & src) const217 Type_handler_tiny_blob::max_display_length_for_field(const Conv_source &src)
218 const
219 {
220 return (uint32) my_set_bits(1 * 8);
221 }
222
223
224 uint32
max_display_length_for_field(const Conv_source & src) const225 Type_handler_medium_blob::max_display_length_for_field(const Conv_source &src)
226 const
227 {
228 return (uint32) my_set_bits(3 * 8);
229 }
230
231
232 uint32
max_display_length_for_field(const Conv_source & src) const233 Type_handler_blob::max_display_length_for_field(const Conv_source &src)
234 const
235 {
236 /*
237 For the blob type, Field::real_type() lies and say that all
238 blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
239 at the length instead to decide what the max display size is.
240 */
241 return (uint32) my_set_bits(src.metadata() * 8);
242 }
243
244
245 uint32
max_display_length_for_field(const Conv_source & src) const246 Type_handler_blob_compressed::max_display_length_for_field(const Conv_source &src)
247 const
248 {
249 return (uint32) my_set_bits(src.metadata() * 8);
250 }
251
252
253 uint32
max_display_length_for_field(const Conv_source & src) const254 Type_handler_long_blob::max_display_length_for_field(const Conv_source &src)
255 const
256 {
257 return (uint32) my_set_bits(4 * 8);
258 }
259
260
261 uint32
max_display_length_for_field(const Conv_source & src) const262 Type_handler_olddecimal::max_display_length_for_field(const Conv_source &src)
263 const
264 {
265 return ~(uint32) 0;
266 }
267
268
show_binlog_type(const Conv_source & src,const Field &,String * str) const269 void Type_handler::show_binlog_type(const Conv_source &src, const Field &,
270 String *str) const
271 {
272 str->set_ascii(name().ptr(), name().length());
273 }
274
275
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const276 void Type_handler_var_string::show_binlog_type(const Conv_source &src,
277 const Field &dst,
278 String *str) const
279 {
280 CHARSET_INFO *cs= str->charset();
281 const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
282 ? "char(%u octets)" : "binary(%u)";
283 size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
284 str->alloced_length(),
285 fmt, src.metadata());
286 str->length(length);
287 }
288
289
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const290 void Type_handler_varchar::show_binlog_type(const Conv_source &src,
291 const Field &dst,
292 String *str) const
293 {
294 CHARSET_INFO *cs= str->charset();
295 const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
296 ? "varchar(%u octets)" : "varbinary(%u)";
297 size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
298 str->alloced_length(),
299 fmt, src.metadata());
300 str->length(length);
301 }
302
303
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const304 void Type_handler_varchar_compressed::show_binlog_type(const Conv_source &src,
305 const Field &dst,
306 String *str) const
307 {
308 CHARSET_INFO *cs= str->charset();
309 const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
310 ? "varchar(%u octets) compressed" : "varbinary(%u) compressed";
311 size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
312 str->alloced_length(),
313 fmt, src.metadata());
314 str->length(length);
315 }
316
show_binlog_type(const Conv_source & src,const Field &,String * str) const317 void Type_handler_bit::show_binlog_type(const Conv_source &src, const Field &,
318 String *str) const
319 {
320 CHARSET_INFO *cs= str->charset();
321 int bit_length= 8 * (src.metadata() >> 8) + (src.metadata() & 0xFF);
322 size_t length=
323 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
324 "bit(%d)", bit_length);
325 str->length(length);
326 }
327
328
show_binlog_type(const Conv_source & src,const Field &,String * str) const329 void Type_handler_olddecimal::show_binlog_type(const Conv_source &src,
330 const Field &,
331 String *str) const
332 {
333 CHARSET_INFO *cs= str->charset();
334 size_t length=
335 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
336 "decimal(%d,?)/*old*/", src.metadata());
337 str->length(length);
338
339 }
340
341
show_binlog_type(const Conv_source & src,const Field &,String * str) const342 void Type_handler_newdecimal::show_binlog_type(const Conv_source &src,
343 const Field &,
344 String *str) const
345 {
346 CHARSET_INFO *cs= str->charset();
347 size_t length=
348 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
349 "decimal(%d,%d)",
350 src.metadata() >> 8, src.metadata() & 0xff);
351 str->length(length);
352 }
353
354
show_binlog_type(const Conv_source & src,const Field &,String * str) const355 void Type_handler_blob_compressed::show_binlog_type(const Conv_source &src,
356 const Field &,
357 String *str) const
358 {
359 /*
360 Field::real_type() lies regarding the actual type of a BLOB, so
361 it is necessary to check the pack length to figure out what kind
362 of blob it really is.
363 */
364 switch (src.metadata()) {
365 case 1:
366 str->set_ascii(STRING_WITH_LEN("tinyblob compressed"));
367 break;
368 case 2:
369 str->set_ascii(STRING_WITH_LEN("blob compressed"));
370 break;
371 case 3:
372 str->set_ascii(STRING_WITH_LEN("mediumblob compressed"));
373 break;
374 default:
375 DBUG_ASSERT(0);
376 // Fall through
377 case 4:
378 str->set_ascii(STRING_WITH_LEN("longblob compressed"));
379 }
380 }
381
382
show_binlog_type(const Conv_source & src,const Field & dst,String * str) const383 void Type_handler_string::show_binlog_type(const Conv_source &src,
384 const Field &dst,
385 String *str) const
386 {
387 /*
388 This is taken from Field_string::unpack.
389 */
390 CHARSET_INFO *cs= str->charset();
391 uint bytes= (((src.metadata() >> 4) & 0x300) ^ 0x300) +
392 (src.metadata() & 0x00ff);
393 const char* fmt= dst.cmp_type() != STRING_RESULT || dst.has_charset()
394 ? "char(%u octets)" : "binary(%u)";
395 size_t length= cs->cset->snprintf(cs, (char*) str->ptr(),
396 str->alloced_length(),
397 fmt, bytes);
398 str->length(length);
399 }
400
401
402 enum_conv_type
rpl_conv_type_from_same_data_type(uint16 metadata,const Relay_log_info * rli,const Conv_param & param) const403 Field::rpl_conv_type_from_same_data_type(uint16 metadata,
404 const Relay_log_info *rli,
405 const Conv_param ¶m) const
406 {
407 if (metadata == 0) // Metadata can only be zero if no metadata was provided
408 {
409 /*
410 If there is no metadata, we either have an old event where no
411 metadata were supplied, or a type that does not require any
412 metadata. In either case, conversion can be done but no
413 conversion table is necessary.
414 */
415 DBUG_PRINT("debug", ("Base types are identical, but there is no metadata"));
416 return CONV_TYPE_PRECISE;
417 }
418
419 DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
420 int order= 0;
421 if (!compatible_field_size(metadata, rli, param.table_def_flags(), &order))
422 return CONV_TYPE_IMPOSSIBLE;
423 return order == 0 ? CONV_TYPE_PRECISE :
424 order < 0 ? CONV_TYPE_SUBSET_TO_SUPERSET :
425 CONV_TYPE_SUPERSET_TO_SUBSET;
426 }
427
428
429 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const430 Field_new_decimal::rpl_conv_type_from(const Conv_source &source,
431 const Relay_log_info *rli,
432 const Conv_param ¶m) const
433 {
434 if (binlog_type() == source.real_field_type())
435 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
436 if (source.type_handler() == &type_handler_olddecimal ||
437 source.type_handler() == &type_handler_newdecimal ||
438 source.type_handler() == &type_handler_float ||
439 source.type_handler() == &type_handler_double)
440 {
441 /*
442 Then the other type is either FLOAT, DOUBLE, or old style
443 DECIMAL, so we require lossy conversion.
444 */
445 return CONV_TYPE_SUPERSET_TO_SUBSET;
446 }
447 return CONV_TYPE_IMPOSSIBLE;
448 }
449
450
451 /*
452 This covers FLOAT, DOUBLE and old DECIMAL
453 */
454 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const455 Field_real::rpl_conv_type_from(const Conv_source &source,
456 const Relay_log_info *rli,
457 const Conv_param ¶m) const
458 {
459 if (binlog_type() == source.real_field_type())
460 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
461 if (source.type_handler() == &type_handler_olddecimal ||
462 source.type_handler() == &type_handler_newdecimal)
463 return CONV_TYPE_SUPERSET_TO_SUBSET; // Always require lossy conversions
464 if (source.type_handler() == &type_handler_float ||
465 source.type_handler() == &type_handler_double)
466 {
467 enum_conv_type order= compare_lengths(source.type_handler(),
468 max_display_length_for_field(source),
469 type_handler(), max_display_length());
470 DBUG_ASSERT(order != CONV_TYPE_PRECISE);
471 return order;
472 }
473 return CONV_TYPE_IMPOSSIBLE;
474 }
475
476
477 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const478 Field_int::rpl_conv_type_from(const Conv_source &source,
479 const Relay_log_info *rli,
480 const Conv_param ¶m) const
481 {
482 if (binlog_type() == source.real_field_type())
483 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
484 /*
485 The length comparison check will do the correct job of comparing
486 the field lengths (in bytes) of two integer types.
487 */
488 if (source.type_handler() == &type_handler_stiny ||
489 source.type_handler() == &type_handler_sshort ||
490 source.type_handler() == &type_handler_sint24 ||
491 source.type_handler() == &type_handler_slong ||
492 source.type_handler() == &type_handler_slonglong)
493 {
494 /*
495 max_display_length_for_field() is not fully precise for the integer
496 data types. So its result cannot be compared to the result of
497 max_dispay_length() when the table field and the binlog field
498 are of the same type.
499 This code should eventually be rewritten not to use
500 compare_lengths(), to detect subtype/supetype relations
501 just using the type codes.
502 */
503 DBUG_ASSERT(source.real_field_type() != real_type());
504 enum_conv_type order= compare_lengths(source.type_handler(),
505 max_display_length_for_field(source),
506 type_handler(), max_display_length());
507 DBUG_ASSERT(order != CONV_TYPE_PRECISE);
508 return order;
509 }
510 return CONV_TYPE_IMPOSSIBLE;
511 }
512
513
514 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const515 Field_enum::rpl_conv_type_from(const Conv_source &source,
516 const Relay_log_info *rli,
517 const Conv_param ¶m) const
518 {
519 /*
520 For some reasons Field_enum and Field_set store MYSQL_TYPE_STRING
521 as a type code in the binary log and encode the real type in metadata.
522 So we need to test real_type() here instread of binlog_type().
523 */
524 return real_type() == source.real_field_type() ?
525 rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
526 CONV_TYPE_IMPOSSIBLE;
527 }
528
529
530 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const531 Field_longstr::rpl_conv_type_from(const Conv_source &source,
532 const Relay_log_info *rli,
533 const Conv_param ¶m) const
534 {
535 /**
536 @todo
537 Implement Field_varstring_compressed::real_type() and
538 Field_blob_compressed::real_type() properly. All occurencies
539 of Field::real_type() have to be inspected and adjusted if needed.
540
541 Until it is not ready we have to compare source_type against
542 binlog_type() when replicating from or to compressed data types.
543
544 @sa Comment for Field::binlog_type()
545 */
546 bool same_type;
547 if (source.real_field_type() == MYSQL_TYPE_VARCHAR_COMPRESSED ||
548 source.real_field_type() == MYSQL_TYPE_BLOB_COMPRESSED ||
549 binlog_type() == MYSQL_TYPE_VARCHAR_COMPRESSED ||
550 binlog_type() == MYSQL_TYPE_BLOB_COMPRESSED)
551 same_type= binlog_type() == source.real_field_type();
552 else if (Type_handler_json_common::is_json_type_handler(type_handler()))
553 same_type= type_handler()->type_handler_base() == source.type_handler();
554 else
555 same_type= type_handler() == source.type_handler();
556
557 if (same_type)
558 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
559
560 if (source.type_handler() == &type_handler_tiny_blob ||
561 source.type_handler() == &type_handler_medium_blob ||
562 source.type_handler() == &type_handler_long_blob ||
563 source.type_handler() == &type_handler_blob ||
564 source.type_handler() == &type_handler_blob_compressed ||
565 source.type_handler() == &type_handler_string ||
566 source.type_handler() == &type_handler_var_string ||
567 source.type_handler() == &type_handler_varchar ||
568 source.type_handler() == &type_handler_varchar_compressed)
569 {
570 enum_conv_type order= compare_lengths(source.type_handler(),
571 max_display_length_for_field(source),
572 type_handler(), max_display_length());
573 /*
574 Here we know that the types are different, so if the order
575 gives that they do not require any conversion, we still need
576 to have non-lossy conversion enabled to allow conversion
577 between different (string) types of the same length.
578
579 Also, if all conversions are disabled, it is not allowed to convert
580 between these types. Since the TEXT vs. BINARY is distinguished by
581 the charset, and the charset is not replicated, we cannot
582 currently distinguish between , e.g., TEXT and BLOB.
583 */
584 if (order == CONV_TYPE_PRECISE)
585 order= CONV_TYPE_SUBSET_TO_SUPERSET;
586 return order;
587 }
588 return CONV_TYPE_IMPOSSIBLE;
589 }
590
591
592 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const593 Field_newdate::rpl_conv_type_from(const Conv_source &source,
594 const Relay_log_info *rli,
595 const Conv_param ¶m) const
596 {
597 if (real_type() == source.real_field_type())
598 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
599 if (source.type_handler() == &type_handler_datetime2)
600 return CONV_TYPE_SUPERSET_TO_SUBSET;
601 return CONV_TYPE_IMPOSSIBLE;
602 }
603
604
605 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const606 Field_time::rpl_conv_type_from(const Conv_source &source,
607 const Relay_log_info *rli,
608 const Conv_param ¶m) const
609 {
610 if (binlog_type() == source.real_field_type())
611 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
612 // 'MySQL56 TIME(N)' -> 'MariaDB-5.3 TIME(N)' is non-lossy
613 if (decimals() == source.metadata() &&
614 source.type_handler() == &type_handler_time2)
615 return CONV_TYPE_VARIANT; // TODO: conversion from FSP1>FSP2
616 return CONV_TYPE_IMPOSSIBLE;
617 }
618
619
620 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const621 Field_timef::rpl_conv_type_from(const Conv_source &source,
622 const Relay_log_info *rli,
623 const Conv_param ¶m) const
624 {
625 if (binlog_type() == source.real_field_type())
626 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
627 /*
628 See comment in Field_datetimef::rpl_conv_type_from()
629 'MariaDB-5.3 TIME(0)' to 'MySQL56 TIME(0)' is non-lossy
630 */
631 if (source.metadata() == 0 && source.type_handler() == &type_handler_time)
632 return CONV_TYPE_VARIANT;
633 return CONV_TYPE_IMPOSSIBLE;
634 }
635
636
637 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const638 Field_timestamp::rpl_conv_type_from(const Conv_source &source,
639 const Relay_log_info *rli,
640 const Conv_param ¶m) const
641 {
642 if (binlog_type() == source.real_field_type())
643 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
644 // 'MySQL56 TIMESTAMP(N)' -> MariaDB-5.3 TIMESTAMP(N)' is non-lossy
645 if (source.metadata() == decimals() &&
646 source.type_handler() == &type_handler_timestamp2)
647 return CONV_TYPE_VARIANT; // TODO: conversion from FSP1>FSP2
648 return CONV_TYPE_IMPOSSIBLE;
649 }
650
651
652 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const653 Field_timestampf::rpl_conv_type_from(const Conv_source &source,
654 const Relay_log_info *rli,
655 const Conv_param ¶m) const
656 {
657 if (binlog_type() == source.real_field_type())
658 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
659 /*
660 See comment in Field_datetimef::rpl_conv_type_from()
661 'MariaDB-5.3 TIMESTAMP(0)' to 'MySQL56 TIMESTAMP(0)' is non-lossy
662 */
663 if (source.metadata() == 0 &&
664 source.type_handler() == &type_handler_timestamp)
665 return CONV_TYPE_VARIANT;
666 return CONV_TYPE_IMPOSSIBLE;
667 }
668
669
670 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const671 Field_datetime::rpl_conv_type_from(const Conv_source &source,
672 const Relay_log_info *rli,
673 const Conv_param ¶m) const
674 {
675 if (binlog_type() == source.real_field_type())
676 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
677 // 'MySQL56 DATETIME(N)' -> MariaDB-5.3 DATETIME(N) is non-lossy
678 if (source.metadata() == decimals() &&
679 source.type_handler() == &type_handler_datetime2)
680 return CONV_TYPE_VARIANT; // TODO: conversion from FSP1>FSP2
681 if (source.type_handler() == &type_handler_newdate)
682 return CONV_TYPE_SUBSET_TO_SUPERSET;
683 return CONV_TYPE_IMPOSSIBLE;
684 }
685
686
687 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const688 Field_datetimef::rpl_conv_type_from(const Conv_source &source,
689 const Relay_log_info *rli,
690 const Conv_param ¶m) const
691 {
692 if (binlog_type() == source.real_field_type())
693 return rpl_conv_type_from_same_data_type(source.metadata(), rli, param);
694 /*
695 'MariaDB-5.3 DATETIME(N)' does not provide information about fractional
696 precision in metadata. So we assume the precision on the master is equal
697 to the precision on the slave.
698 TODO: See MDEV-17394 what happend in case precisions are in case different
699 'MariaDB-5.3 DATETIME(0)' to 'MySQL56 DATETIME(0)' is non-lossy
700 */
701 if (source.metadata() == 0 &&
702 source.type_handler() == &type_handler_datetime)
703 return CONV_TYPE_VARIANT;
704 if (source.type_handler() == &type_handler_newdate)
705 return CONV_TYPE_SUBSET_TO_SUPERSET;
706 return CONV_TYPE_IMPOSSIBLE;
707 }
708
709
710 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const711 Field_date::rpl_conv_type_from(const Conv_source &source,
712 const Relay_log_info *rli,
713 const Conv_param ¶m) const
714 {
715 // old DATE
716 return binlog_type() == source.real_field_type() ?
717 rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
718 CONV_TYPE_IMPOSSIBLE;
719 }
720
721
722 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const723 Field_bit::rpl_conv_type_from(const Conv_source &source,
724 const Relay_log_info *rli,
725 const Conv_param ¶m) const
726 {
727 return binlog_type() == source.real_field_type() ?
728 rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
729 CONV_TYPE_IMPOSSIBLE;
730 }
731
732
733 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const734 Field_year::rpl_conv_type_from(const Conv_source &source,
735 const Relay_log_info *rli,
736 const Conv_param ¶m) const
737 {
738 return binlog_type() == source.real_field_type() ?
739 rpl_conv_type_from_same_data_type(source.metadata(), rli, param) :
740 CONV_TYPE_IMPOSSIBLE;
741 }
742
743
744 enum_conv_type
rpl_conv_type_from(const Conv_source & source,const Relay_log_info * rli,const Conv_param & param) const745 Field_null::rpl_conv_type_from(const Conv_source &source,
746 const Relay_log_info *rli,
747 const Conv_param ¶m) const
748 {
749 DBUG_ASSERT(0);
750 return CONV_TYPE_IMPOSSIBLE;
751 }
752
753
754 /**********************************************************************/
755
756
757 #if defined(HAVE_REPLICATION)
758
759 /**
760 */
show_sql_type(const Conv_source & src,const Field & dst,String * str)761 static void show_sql_type(const Conv_source &src, const Field &dst,
762 String *str)
763 {
764 DBUG_ENTER("show_sql_type");
765 DBUG_ASSERT(src.type_handler() != NULL);
766 DBUG_PRINT("enter", ("type: %s, metadata: 0x%x",
767 src.type_handler()->name().ptr(), src.metadata()));
768 src.type_handler()->show_binlog_type(src, dst, str);
769 DBUG_VOID_RETURN;
770 }
771
772
773 /**
774 Check the order variable and print errors if the order is not
775 acceptable according to the current settings.
776
777 @param order The computed order of the conversion needed.
778 @param rli The relay log info data structure: for error reporting.
779 */
is_conversion_ok(enum_conv_type type,const Relay_log_info * rli,ulonglong type_conversion_options)780 static bool is_conversion_ok(enum_conv_type type, const Relay_log_info *rli,
781 ulonglong type_conversion_options)
782 {
783 DBUG_ENTER("is_conversion_ok");
784 bool allow_non_lossy, allow_lossy;
785
786 allow_non_lossy= type_conversion_options &
787 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
788 allow_lossy= type_conversion_options &
789 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
790
791 DBUG_PRINT("enter", ("order: %d, flags:%s%s", (int) type,
792 allow_non_lossy ? " ALL_NON_LOSSY" : "",
793 allow_lossy ? " ALL_LOSSY" : ""));
794 switch (type) {
795 case CONV_TYPE_PRECISE:
796 case CONV_TYPE_VARIANT:
797 DBUG_RETURN(true);
798 case CONV_TYPE_SUBSET_TO_SUPERSET:
799 /* !!! Add error message saying that non-lossy conversions need to be allowed. */
800 DBUG_RETURN(allow_non_lossy);
801 case CONV_TYPE_SUPERSET_TO_SUBSET:
802 /* !!! Add error message saying that lossy conversions need to be allowed. */
803 DBUG_RETURN(allow_lossy);
804 case CONV_TYPE_IMPOSSIBLE:
805 DBUG_RETURN(false);
806 }
807
808 DBUG_RETURN(false);
809 }
810
811
812 /**
813 Can a type potentially be converted to another type?
814
815 This function check if the types are convertible and what
816 conversion is required.
817
818 If conversion is not possible, and error is printed.
819
820 If conversion is possible:
821
822 - *order will be set to -1 if source type is smaller than target
823 type and a non-lossy conversion can be required. This includes
824 the case where the field types are different but types could
825 actually be converted in either direction.
826
827 - *order will be set to 0 if no conversion is required.
828
829 - *order will be set to 1 if the source type is strictly larger
830 than the target type and that conversion is potentially lossy.
831
832 @param[in] field Target field
833 @param[in] type Source field type
834 @param[in] metadata Source field metadata
835 @param[in] rli Relay log info (for error reporting)
836 @param[in] mflags Flags from the table map event
837 @param[out] order Order between source field and target field
838
839 @return @c true if conversion is possible according to the current
840 settings, @c false if conversion is not possible according to the
841 current setting.
842 */
843 static enum_conv_type
can_convert_field_to(Field * field,const Conv_source & source,const Relay_log_info * rli,const Conv_param & param)844 can_convert_field_to(Field *field, const Conv_source &source,
845 const Relay_log_info *rli,
846 const Conv_param ¶m)
847 {
848 DBUG_ENTER("can_convert_field_to");
849 #ifndef DBUG_OFF
850 char field_type_buf[MAX_FIELD_WIDTH];
851 String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
852 field->sql_type(field_type);
853 DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
854 field_type.c_ptr_safe(), field->real_type(),
855 source.real_field_type(), source.metadata()));
856 #endif
857 DBUG_RETURN(field->rpl_conv_type_from(source, rli, param));
858 }
859
860
field_type_handler(uint col) const861 const Type_handler *table_def::field_type_handler(uint col) const
862 {
863 enum_field_types typecode= binlog_type(col);
864 uint16 metadata= field_metadata(col);
865 DBUG_ASSERT(typecode != MYSQL_TYPE_ENUM);
866 DBUG_ASSERT(typecode != MYSQL_TYPE_SET);
867
868 if (typecode == MYSQL_TYPE_BLOB)
869 {
870 switch (metadata & 0xff) {
871 case 1: return &type_handler_tiny_blob;
872 case 2: return &type_handler_blob;
873 case 3: return &type_handler_medium_blob;
874 case 4: return &type_handler_long_blob;
875 default: return NULL;
876 }
877 }
878 if (typecode == MYSQL_TYPE_STRING)
879 {
880 uchar typecode2= metadata >> 8;
881 if (typecode2 == MYSQL_TYPE_SET)
882 return &type_handler_set;
883 if (typecode2 == MYSQL_TYPE_ENUM)
884 return &type_handler_enum;
885 return &type_handler_string;
886 }
887 /*
888 This type has not been used since before row-based replication,
889 so we can safely assume that it really is MYSQL_TYPE_NEWDATE.
890 */
891 if (typecode == MYSQL_TYPE_DATE)
892 return &type_handler_newdate;
893 return Type_handler::get_handler_by_real_type(typecode);
894 }
895
896
897 /**
898 Is the definition compatible with a table?
899
900 This function will compare the master table with an existing table
901 on the slave and see if they are compatible with respect to the
902 current settings of @c SLAVE_TYPE_CONVERSIONS.
903
904 If the tables are compatible and conversions are required, @c
905 *tmp_table_var will be set to a virtual temporary table with field
906 pointers for the fields that require conversions. This allow simple
907 checking of whether a conversion are to be applied or not.
908
909 If tables are compatible, but no conversions are necessary, @c
910 *tmp_table_var will be set to NULL.
911
912 @param rli_arg[in]
913 Relay log info, for error reporting.
914
915 @param table[in]
916 Table to compare with
917
918 @param tmp_table_var[out]
919 Virtual temporary table for performing conversions, if necessary.
920
921 @retval true Master table is compatible with slave table.
922 @retval false Master table is not compatible with slave table.
923 */
924 bool
compatible_with(THD * thd,rpl_group_info * rgi,TABLE * table,TABLE ** conv_table_var) const925 table_def::compatible_with(THD *thd, rpl_group_info *rgi,
926 TABLE *table, TABLE **conv_table_var)
927 const
928 {
929 /*
930 We only check the initial columns for the tables.
931 */
932 uint const cols_to_check= MY_MIN(table->s->fields, size());
933 Relay_log_info *rli= rgi->rli;
934 TABLE *tmp_table= NULL;
935
936 for (uint col= 0 ; col < cols_to_check ; ++col)
937 {
938 Field *const field= table->field[col];
939 const Type_handler *h= field_type_handler(col);
940 if (!h)
941 {
942 sql_print_error("In RBR mode, Slave received unknown field type field %d "
943 " for column Name: %s.%s.%s.",
944 binlog_type(col),
945 field->table->s->db.str,
946 field->table->s->table_name.str,
947 field->field_name.str);
948 return false;
949 }
950
951 if (!h)
952 return false; // An unknown data type found in the binary log
953 Conv_source source(h, field_metadata(col), field->charset());
954 enum_conv_type convtype= can_convert_field_to(field, source, rli,
955 Conv_param(m_flags));
956 if (is_conversion_ok(convtype, rli, slave_type_conversions_options))
957 {
958 DBUG_PRINT("debug", ("Checking column %d -"
959 " field '%s' can be converted - order: %d",
960 col, field->field_name.str, convtype));
961 /*
962 If conversion type is not CONV_TYPE_RECISE, a conversion is required,
963 so we need to set up the conversion table.
964 */
965 if (convtype != CONV_TYPE_PRECISE && tmp_table == NULL)
966 {
967 /*
968 This will create the full table with all fields. This is
969 necessary to ge the correct field lengths for the record.
970 */
971 tmp_table= create_conversion_table(thd, rgi, table);
972 if (tmp_table == NULL)
973 return false;
974 /*
975 Clear all fields up to, but not including, this column.
976 */
977 for (unsigned int i= 0; i < col; ++i)
978 tmp_table->field[i]= NULL;
979 }
980
981 if (convtype == CONV_TYPE_PRECISE && tmp_table != NULL)
982 tmp_table->field[col]= NULL;
983 }
984 else
985 {
986 DBUG_PRINT("debug", ("Checking column %d -"
987 " field '%s' can not be converted",
988 col, field->field_name.str));
989 DBUG_ASSERT(col < size() && col < table->s->fields);
990 DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
991 DBUG_ASSERT(table->in_use);
992 const char *db_name= table->s->db.str;
993 const char *tbl_name= table->s->table_name.str;
994 StringBuffer<MAX_FIELD_WIDTH> source_type(&my_charset_latin1);
995 StringBuffer<MAX_FIELD_WIDTH> target_type(&my_charset_latin1);
996 THD *thd= table->in_use;
997
998 show_sql_type(source, *field, &source_type);
999 field->sql_rpl_type(&target_type);
1000 DBUG_ASSERT(source_type.length() > 0);
1001 DBUG_ASSERT(target_type.length() > 0);
1002 rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED, rgi->gtid_info(),
1003 ER_THD(thd, ER_SLAVE_CONVERSION_FAILED),
1004 col, db_name, tbl_name,
1005 source_type.c_ptr_safe(), target_type.c_ptr_safe());
1006 return false;
1007 }
1008 }
1009
1010 #ifndef DBUG_OFF
1011 if (tmp_table)
1012 {
1013 for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
1014 if (tmp_table->field[col])
1015 {
1016 char source_buf[MAX_FIELD_WIDTH];
1017 char target_buf[MAX_FIELD_WIDTH];
1018 String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
1019 String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
1020 tmp_table->field[col]->sql_type(source_type);
1021 table->field[col]->sql_type(target_type);
1022 DBUG_PRINT("debug", ("Field %s - conversion required."
1023 " Source type: '%s', Target type: '%s'",
1024 tmp_table->field[col]->field_name.str,
1025 source_type.c_ptr_safe(), target_type.c_ptr_safe()));
1026 }
1027 }
1028 #endif
1029
1030 *conv_table_var= tmp_table;
1031 return true;
1032 }
1033
1034
1035 /**
1036 A wrapper to Virtual_tmp_table, to get access to its constructor,
1037 which is protected for safety purposes (against illegal use on stack).
1038 */
1039 class Virtual_conversion_table: public Virtual_tmp_table
1040 {
1041 public:
Virtual_conversion_table(THD * thd)1042 Virtual_conversion_table(THD *thd) :Virtual_tmp_table(thd) { }
1043 /**
1044 Add a new field into the virtual table.
1045 @param handler - The type handler of the field.
1046 @param metadata - The RBR binary log metadata for this field.
1047 @param target_field - The field from the target table, to get extra
1048 attributes from (e.g. typelib in case of ENUM).
1049 */
add(const Type_handler * handler,uint16 metadata,const Field * target_field)1050 bool add(const Type_handler *handler,
1051 uint16 metadata, const Field *target_field)
1052 {
1053 Field *tmp= handler->make_conversion_table_field(in_use->mem_root,
1054 this, metadata,
1055 target_field);
1056 if (!tmp)
1057 return true;
1058 Virtual_tmp_table::add(tmp);
1059 DBUG_PRINT("debug", ("sql_type: %s, target_field: '%s', max_length: %d, decimals: %d,"
1060 " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
1061 handler->name().ptr(), target_field->field_name.str,
1062 tmp->field_length, tmp->decimals(), TRUE,
1063 tmp->flags, tmp->pack_length()));
1064 return false;
1065 }
1066 };
1067
1068
1069 /**
1070 Create a conversion table.
1071
1072 If the function is unable to create the conversion table, an error
1073 will be printed and NULL will be returned.
1074
1075 @return Pointer to conversion table, or NULL if unable to create
1076 conversion table.
1077 */
1078
create_conversion_table(THD * thd,rpl_group_info * rgi,TABLE * target_table) const1079 TABLE *table_def::create_conversion_table(THD *thd, rpl_group_info *rgi,
1080 TABLE *target_table) const
1081 {
1082 DBUG_ENTER("table_def::create_conversion_table");
1083
1084 Virtual_conversion_table *conv_table;
1085 Relay_log_info *rli= rgi->rli;
1086 /*
1087 At slave, columns may differ. So we should create
1088 MY_MIN(columns@master, columns@slave) columns in the
1089 conversion table.
1090 */
1091 uint const cols_to_create= MY_MIN(target_table->s->fields, size());
1092 if (!(conv_table= new(thd) Virtual_conversion_table(thd)) ||
1093 conv_table->init(cols_to_create))
1094 goto err;
1095 for (uint col= 0 ; col < cols_to_create; ++col)
1096 {
1097 const Type_handler *ha= field_type_handler(col);
1098 DBUG_ASSERT(ha); // Checked at compatible_with() time
1099 if (conv_table->add(ha, field_metadata(col), target_table->field[col]))
1100 {
1101 DBUG_PRINT("debug", ("binlog_type: %d, metadata: %04X, target_field: '%s'"
1102 " make_conversion_table_field() failed",
1103 binlog_type(col), field_metadata(col),
1104 target_table->field[col]->field_name.str));
1105 goto err;
1106 }
1107 }
1108
1109 if (conv_table->open())
1110 goto err; // Could not allocate record buffer?
1111
1112 DBUG_RETURN(conv_table);
1113
1114 err:
1115 if (conv_table)
1116 delete conv_table;
1117 rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION, rgi->gtid_info(),
1118 ER_THD(thd, ER_SLAVE_CANT_CREATE_CONVERSION),
1119 target_table->s->db.str,
1120 target_table->s->table_name.str);
1121 DBUG_RETURN(NULL);
1122 }
1123
1124
1125
Deferred_log_events(Relay_log_info * rli)1126 Deferred_log_events::Deferred_log_events(Relay_log_info *rli) : last_added(NULL)
1127 {
1128 my_init_dynamic_array(PSI_INSTRUMENT_ME, &array, sizeof(Log_event *), 32, 16, MYF(0));
1129 }
1130
~Deferred_log_events()1131 Deferred_log_events::~Deferred_log_events()
1132 {
1133 delete_dynamic(&array);
1134 }
1135
add(Log_event * ev)1136 int Deferred_log_events::add(Log_event *ev)
1137 {
1138 last_added= ev;
1139 insert_dynamic(&array, (uchar*) &ev);
1140 return 0;
1141 }
1142
is_empty()1143 bool Deferred_log_events::is_empty()
1144 {
1145 return array.elements == 0;
1146 }
1147
execute(rpl_group_info * rgi)1148 bool Deferred_log_events::execute(rpl_group_info *rgi)
1149 {
1150 bool res= false;
1151 DBUG_ENTER("Deferred_log_events::execute");
1152 DBUG_ASSERT(rgi->deferred_events_collecting);
1153
1154 rgi->deferred_events_collecting= false;
1155 for (uint i= 0; !res && i < array.elements; i++)
1156 {
1157 Log_event *ev= (* (Log_event **)
1158 dynamic_array_ptr(&array, i));
1159 res= ev->apply_event(rgi);
1160 }
1161 rgi->deferred_events_collecting= true;
1162 DBUG_RETURN(res);
1163 }
1164
rewind()1165 void Deferred_log_events::rewind()
1166 {
1167 /*
1168 Reset preceding Query log event events which execution was
1169 deferred because of slave side filtering.
1170 */
1171 if (!is_empty())
1172 {
1173 for (uint i= 0; i < array.elements; i++)
1174 {
1175 Log_event *ev= *(Log_event **) dynamic_array_ptr(&array, i);
1176 delete ev;
1177 }
1178 last_added= NULL;
1179 if (array.elements > array.max_element)
1180 freeze_size(&array);
1181 reset_dynamic(&array);
1182 }
1183 last_added= NULL;
1184 }
1185
1186 #endif // defined(HAVE_REPLICATION)
1187
1188