1 /* Copyright (c) 2006, 2013, Oracle and/or its affiliates.
2 Copyright (c) 2011, 2013, Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #include "mariadb.h"
18 #include <my_bit.h>
19 #include "rpl_utility.h"
20 #include "log_event.h"
21
22 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
23 #include "rpl_rli.h"
24 #include "sql_select.h"
25
26 /**
27 Calculate display length for MySQL56 temporal data types from their metadata.
28 It contains fractional precision in the low 16-bit word.
29 */
30 static uint32
max_display_length_for_temporal2_field(uint32 int_display_length,unsigned int metadata)31 max_display_length_for_temporal2_field(uint32 int_display_length,
32 unsigned int metadata)
33 {
34 metadata&= 0x00ff;
35 return int_display_length + metadata + (metadata ? 1 : 0);
36 }
37
38
39 /**
40 Compute the maximum display length of a field.
41
42 @param sql_type Type of the field
43 @param metadata The metadata from the master for the field.
44 @return Maximum length of the field in bytes.
45
46 The precise values calculated by field->max_display_length() and
47 calculated by max_display_length_for_field() can differ (by +1 or -1)
48 for integer data types (TINYINT, SMALLINT, MEDIUMINT, INT, BIGINT).
49 This slight difference is not important here, because we call
50 this function only for two *different* integer data types.
51 */
52 static uint32
max_display_length_for_field(enum_field_types sql_type,unsigned int metadata)53 max_display_length_for_field(enum_field_types sql_type, unsigned int metadata)
54 {
55 DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata));
56 DBUG_ASSERT(metadata >> 16 == 0);
57
58 switch (sql_type) {
59 case MYSQL_TYPE_NEWDECIMAL:
60 return metadata >> 8;
61
62 case MYSQL_TYPE_FLOAT:
63 return 12;
64
65 case MYSQL_TYPE_DOUBLE:
66 return 22;
67
68 case MYSQL_TYPE_SET:
69 case MYSQL_TYPE_ENUM:
70 return metadata & 0x00ff;
71
72 case MYSQL_TYPE_STRING:
73 {
74 uchar type= metadata >> 8;
75 if (type == MYSQL_TYPE_SET || type == MYSQL_TYPE_ENUM)
76 return metadata & 0xff;
77 else
78 /* This is taken from Field_string::unpack. */
79 return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
80 }
81
82 case MYSQL_TYPE_YEAR:
83 case MYSQL_TYPE_TINY:
84 return 4;
85
86 case MYSQL_TYPE_SHORT:
87 return 6;
88
89 case MYSQL_TYPE_INT24:
90 return 9;
91
92 case MYSQL_TYPE_LONG:
93 return 11;
94
95 #ifdef HAVE_LONG_LONG
96 case MYSQL_TYPE_LONGLONG:
97 return 20;
98
99 #endif
100 case MYSQL_TYPE_NULL:
101 return 0;
102
103 case MYSQL_TYPE_NEWDATE:
104 return 3;
105
106 case MYSQL_TYPE_DATE:
107 return 3;
108
109 case MYSQL_TYPE_TIME:
110 return MIN_TIME_WIDTH;
111
112 case MYSQL_TYPE_TIME2:
113 return max_display_length_for_temporal2_field(MIN_TIME_WIDTH, metadata);
114
115 case MYSQL_TYPE_TIMESTAMP:
116 return MAX_DATETIME_WIDTH;
117
118 case MYSQL_TYPE_TIMESTAMP2:
119 return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH, metadata);
120
121 case MYSQL_TYPE_DATETIME:
122 return MAX_DATETIME_WIDTH;
123
124 case MYSQL_TYPE_DATETIME2:
125 return max_display_length_for_temporal2_field(MAX_DATETIME_WIDTH, metadata);
126
127 case MYSQL_TYPE_BIT:
128 /*
129 Decode the size of the bit field from the master.
130 */
131 DBUG_ASSERT((metadata & 0xff) <= 7);
132 return 8 * (metadata >> 8U) + (metadata & 0x00ff);
133
134 case MYSQL_TYPE_VAR_STRING:
135 case MYSQL_TYPE_VARCHAR:
136 return metadata;
137 case MYSQL_TYPE_VARCHAR_COMPRESSED:
138 return metadata - 1;
139
140 /*
141 The actual length for these types does not really matter since
142 they are used to calc_pack_length, which ignores the given
143 length for these types.
144
145 Since we want this to be accurate for other uses, we return the
146 maximum size in bytes of these BLOBs.
147 */
148
149 case MYSQL_TYPE_TINY_BLOB:
150 return (uint32)my_set_bits(1 * 8);
151
152 case MYSQL_TYPE_MEDIUM_BLOB:
153 return (uint32)my_set_bits(3 * 8);
154
155 case MYSQL_TYPE_BLOB:
156 case MYSQL_TYPE_BLOB_COMPRESSED:
157 /*
158 For the blob type, Field::real_type() lies and say that all
159 blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
160 at the length instead to decide what the max display size is.
161 */
162 return (uint32)my_set_bits(metadata * 8);
163
164 case MYSQL_TYPE_LONG_BLOB:
165 case MYSQL_TYPE_GEOMETRY:
166 return (uint32)my_set_bits(4 * 8);
167
168 default:
169 return ~(uint32) 0;
170 }
171 }
172
173
174 /*
175 Compare the pack lengths of a source field (on the master) and a
176 target field (on the slave).
177
178 @param field Target field.
179 @param type Source field type.
180 @param metadata Source field metadata.
181
182 @retval -1 The length of the source field is smaller than the target field.
183 @retval 0 The length of the source and target fields are the same.
184 @retval 1 The length of the source field is greater than the target field.
185 */
compare_lengths(Field * field,enum_field_types source_type,uint16 metadata)186 int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
187 {
188 DBUG_ENTER("compare_lengths");
189 size_t const source_length=
190 max_display_length_for_field(source_type, metadata);
191 size_t const target_length= field->max_display_length();
192 DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
193 " target_length: %lu, target_type: %u",
194 (unsigned long) source_length, source_type,
195 (unsigned long) target_length, field->real_type()));
196 int result= source_length < target_length ? -1 : source_length > target_length;
197 DBUG_PRINT("result", ("%d", result));
198 DBUG_RETURN(result);
199 }
200 #endif //MYSQL_CLIENT
201 /*********************************************************************
202 * table_def member definitions *
203 *********************************************************************/
204
205 /*
206 This function returns the field size in raw bytes based on the type
207 and the encoded field data from the master's raw data.
208 */
calc_field_size(uint col,uchar * master_data) const209 uint32 table_def::calc_field_size(uint col, uchar *master_data) const
210 {
211 uint32 length= 0;
212
213 switch (type(col)) {
214 case MYSQL_TYPE_NEWDECIMAL:
215 length= my_decimal_get_binary_size(m_field_metadata[col] >> 8,
216 m_field_metadata[col] & 0xff);
217 break;
218 case MYSQL_TYPE_DECIMAL:
219 case MYSQL_TYPE_FLOAT:
220 case MYSQL_TYPE_DOUBLE:
221 length= m_field_metadata[col];
222 break;
223 /*
224 The cases for SET and ENUM are include for completeness, however
225 both are mapped to type MYSQL_TYPE_STRING and their real types
226 are encoded in the field metadata.
227 */
228 case MYSQL_TYPE_SET:
229 case MYSQL_TYPE_ENUM:
230 case MYSQL_TYPE_STRING:
231 {
232 uchar type= m_field_metadata[col] >> 8U;
233 if ((type == MYSQL_TYPE_SET) || (type == MYSQL_TYPE_ENUM))
234 length= m_field_metadata[col] & 0x00ff;
235 else
236 {
237 /*
238 We are reading the actual size from the master_data record
239 because this field has the actual lengh stored in the first
240 byte.
241 */
242 length= (uint) *master_data + 1;
243 DBUG_ASSERT(length != 0);
244 }
245 break;
246 }
247 case MYSQL_TYPE_YEAR:
248 case MYSQL_TYPE_TINY:
249 length= 1;
250 break;
251 case MYSQL_TYPE_SHORT:
252 length= 2;
253 break;
254 case MYSQL_TYPE_INT24:
255 length= 3;
256 break;
257 case MYSQL_TYPE_LONG:
258 length= 4;
259 break;
260 #ifdef HAVE_LONG_LONG
261 case MYSQL_TYPE_LONGLONG:
262 length= 8;
263 break;
264 #endif
265 case MYSQL_TYPE_NULL:
266 length= 0;
267 break;
268 case MYSQL_TYPE_NEWDATE:
269 length= 3;
270 break;
271 case MYSQL_TYPE_DATE:
272 case MYSQL_TYPE_TIME:
273 length= 3;
274 break;
275 case MYSQL_TYPE_TIME2:
276 length= my_time_binary_length(m_field_metadata[col]);
277 break;
278 case MYSQL_TYPE_TIMESTAMP:
279 length= 4;
280 break;
281 case MYSQL_TYPE_TIMESTAMP2:
282 length= my_timestamp_binary_length(m_field_metadata[col]);
283 break;
284 case MYSQL_TYPE_DATETIME:
285 length= 8;
286 break;
287 case MYSQL_TYPE_DATETIME2:
288 length= my_datetime_binary_length(m_field_metadata[col]);
289 break;
290 case MYSQL_TYPE_BIT:
291 {
292 /*
293 Decode the size of the bit field from the master.
294 from_len is the length in bytes from the master
295 from_bit_len is the number of extra bits stored in the master record
296 If from_bit_len is not 0, add 1 to the length to account for accurate
297 number of bytes needed.
298 */
299 uint from_len= (m_field_metadata[col] >> 8U) & 0x00ff;
300 uint from_bit_len= m_field_metadata[col] & 0x00ff;
301 DBUG_ASSERT(from_bit_len <= 7);
302 length= from_len + ((from_bit_len > 0) ? 1 : 0);
303 break;
304 }
305 case MYSQL_TYPE_VARCHAR:
306 case MYSQL_TYPE_VARCHAR_COMPRESSED:
307 {
308 length= m_field_metadata[col] > 255 ? 2 : 1; // c&p of Field_varstring::data_length()
309 length+= length == 1 ? (uint32) *master_data : uint2korr(master_data);
310 break;
311 }
312 case MYSQL_TYPE_TINY_BLOB:
313 case MYSQL_TYPE_MEDIUM_BLOB:
314 case MYSQL_TYPE_LONG_BLOB:
315 case MYSQL_TYPE_BLOB:
316 case MYSQL_TYPE_BLOB_COMPRESSED:
317 case MYSQL_TYPE_GEOMETRY:
318 {
319 /*
320 Compute the length of the data. We cannot use get_length() here
321 since it is dependent on the specific table (and also checks the
322 packlength using the internal 'table' pointer) and replication
323 is using a fixed format for storing data in the binlog.
324 */
325 switch (m_field_metadata[col]) {
326 case 1:
327 length= *master_data;
328 break;
329 case 2:
330 length= uint2korr(master_data);
331 break;
332 case 3:
333 length= uint3korr(master_data);
334 break;
335 case 4:
336 length= uint4korr(master_data);
337 break;
338 default:
339 DBUG_ASSERT(0); // Should not come here
340 break;
341 }
342
343 length+= m_field_metadata[col];
344 break;
345 }
346 default:
347 length= ~(uint32) 0;
348 }
349 return length;
350 }
351
352 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
353 /**
354 */
show_sql_type(enum_field_types type,uint16 metadata,String * str,bool char_with_octets)355 void show_sql_type(enum_field_types type, uint16 metadata, String *str,
356 bool char_with_octets)
357 {
358 DBUG_ENTER("show_sql_type");
359 DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata));
360
361 switch (type)
362 {
363 case MYSQL_TYPE_TINY:
364 str->set_ascii(STRING_WITH_LEN("tinyint"));
365 break;
366
367 case MYSQL_TYPE_SHORT:
368 str->set_ascii(STRING_WITH_LEN("smallint"));
369 break;
370
371 case MYSQL_TYPE_LONG:
372 str->set_ascii(STRING_WITH_LEN("int"));
373 break;
374
375 case MYSQL_TYPE_FLOAT:
376 str->set_ascii(STRING_WITH_LEN("float"));
377 break;
378
379 case MYSQL_TYPE_DOUBLE:
380 str->set_ascii(STRING_WITH_LEN("double"));
381 break;
382
383 case MYSQL_TYPE_NULL:
384 str->set_ascii(STRING_WITH_LEN("null"));
385 break;
386
387 case MYSQL_TYPE_TIMESTAMP:
388 case MYSQL_TYPE_TIMESTAMP2:
389 str->set_ascii(STRING_WITH_LEN("timestamp"));
390 break;
391
392 case MYSQL_TYPE_LONGLONG:
393 str->set_ascii(STRING_WITH_LEN("bigint"));
394 break;
395
396 case MYSQL_TYPE_INT24:
397 str->set_ascii(STRING_WITH_LEN("mediumint"));
398 break;
399
400 case MYSQL_TYPE_NEWDATE:
401 case MYSQL_TYPE_DATE:
402 str->set_ascii(STRING_WITH_LEN("date"));
403 break;
404
405 case MYSQL_TYPE_TIME:
406 case MYSQL_TYPE_TIME2:
407 str->set_ascii(STRING_WITH_LEN("time"));
408 break;
409
410 case MYSQL_TYPE_DATETIME:
411 case MYSQL_TYPE_DATETIME2:
412 str->set_ascii(STRING_WITH_LEN("datetime"));
413 break;
414
415 case MYSQL_TYPE_YEAR:
416 str->set_ascii(STRING_WITH_LEN("year"));
417 break;
418
419 case MYSQL_TYPE_VAR_STRING:
420 case MYSQL_TYPE_VARCHAR:
421 case MYSQL_TYPE_VARCHAR_COMPRESSED:
422 {
423 CHARSET_INFO *cs= str->charset();
424 size_t length=0;
425 if (char_with_octets)
426 length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
427 "varchar(%u octets)", metadata);
428 else
429 length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
430 "varbinary(%u)", metadata);
431 str->length(length);
432 }
433 break;
434
435 case MYSQL_TYPE_BIT:
436 {
437 CHARSET_INFO *cs= str->charset();
438 int bit_length= 8 * (metadata >> 8) + (metadata & 0xFF);
439 size_t length=
440 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
441 "bit(%d)", bit_length);
442 str->length(length);
443 }
444 break;
445
446 case MYSQL_TYPE_DECIMAL:
447 {
448 CHARSET_INFO *cs= str->charset();
449 size_t length=
450 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
451 "decimal(%d,?)/*old*/", metadata);
452 str->length(length);
453 }
454 break;
455
456 case MYSQL_TYPE_NEWDECIMAL:
457 {
458 CHARSET_INFO *cs= str->charset();
459 size_t length=
460 cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
461 "decimal(%d,%d)", metadata >> 8, metadata & 0xff);
462 str->length(length);
463 }
464 break;
465
466 case MYSQL_TYPE_ENUM:
467 str->set_ascii(STRING_WITH_LEN("enum"));
468 break;
469
470 case MYSQL_TYPE_SET:
471 str->set_ascii(STRING_WITH_LEN("set"));
472 break;
473
474 case MYSQL_TYPE_BLOB:
475 case MYSQL_TYPE_BLOB_COMPRESSED:
476 /*
477 Field::real_type() lies regarding the actual type of a BLOB, so
478 it is necessary to check the pack length to figure out what kind
479 of blob it really is.
480 */
481 switch (metadata)
482 {
483 case 1:
484 str->set_ascii(STRING_WITH_LEN("tinyblob"));
485 break;
486
487 case 2:
488 str->set_ascii(STRING_WITH_LEN("blob"));
489 break;
490
491 case 3:
492 str->set_ascii(STRING_WITH_LEN("mediumblob"));
493 break;
494
495 case 4:
496 str->set_ascii(STRING_WITH_LEN("longblob"));
497 break;
498
499 default:
500 DBUG_ASSERT(0);
501 break;
502 }
503
504 if (type == MYSQL_TYPE_BLOB_COMPRESSED)
505 str->append(STRING_WITH_LEN(" compressed"));
506 break;
507
508 case MYSQL_TYPE_STRING:
509 {
510 /*
511 This is taken from Field_string::unpack.
512 */
513 CHARSET_INFO *cs= str->charset();
514 uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
515 size_t length=0;
516 if (char_with_octets)
517 length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
518 "char(%u octets)", bytes);
519 else
520 length= cs->cset->snprintf(cs, (char*) str->ptr(), str->alloced_length(),
521 "binary(%u)", bytes);
522 str->length(length);
523 }
524 break;
525
526 case MYSQL_TYPE_GEOMETRY:
527 str->set_ascii(STRING_WITH_LEN("geometry"));
528 break;
529
530 default:
531 str->set_ascii(STRING_WITH_LEN("<unknown type>"));
532 }
533 DBUG_VOID_RETURN;
534 }
535
536
537 /**
538 Check the order variable and print errors if the order is not
539 acceptable according to the current settings.
540
541 @param order The computed order of the conversion needed.
542 @param rli The relay log info data structure: for error reporting.
543 */
is_conversion_ok(int order,Relay_log_info * rli)544 bool is_conversion_ok(int order, Relay_log_info *rli)
545 {
546 DBUG_ENTER("is_conversion_ok");
547 bool allow_non_lossy, allow_lossy;
548
549 allow_non_lossy = slave_type_conversions_options &
550 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
551 allow_lossy= slave_type_conversions_options &
552 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
553
554 DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
555 allow_non_lossy ? " ALL_NON_LOSSY" : "",
556 allow_lossy ? " ALL_LOSSY" : ""));
557 if (order < 0 && !allow_non_lossy)
558 {
559 /* !!! Add error message saying that non-lossy conversions need to be allowed. */
560 DBUG_RETURN(false);
561 }
562
563 if (order > 0 && !allow_lossy)
564 {
565 /* !!! Add error message saying that lossy conversions need to be allowed. */
566 DBUG_RETURN(false);
567 }
568
569 DBUG_RETURN(true);
570 }
571
572
573 /**
574 Can a type potentially be converted to another type?
575
576 This function check if the types are convertible and what
577 conversion is required.
578
579 If conversion is not possible, and error is printed.
580
581 If conversion is possible:
582
583 - *order will be set to -1 if source type is smaller than target
584 type and a non-lossy conversion can be required. This includes
585 the case where the field types are different but types could
586 actually be converted in either direction.
587
588 - *order will be set to 0 if no conversion is required.
589
590 - *order will be set to 1 if the source type is strictly larger
591 than the target type and that conversion is potentially lossy.
592
593 @param[in] field Target field
594 @param[in] type Source field type
595 @param[in] metadata Source field metadata
596 @param[in] rli Relay log info (for error reporting)
597 @param[in] mflags Flags from the table map event
598 @param[out] order Order between source field and target field
599
600 @return @c true if conversion is possible according to the current
601 settings, @c false if conversion is not possible according to the
602 current setting.
603 */
604 static bool
can_convert_field_to(Field * field,enum_field_types source_type,uint16 metadata,Relay_log_info * rli,uint16 mflags,int * order_var)605 can_convert_field_to(Field *field,
606 enum_field_types source_type, uint16 metadata,
607 Relay_log_info *rli, uint16 mflags,
608 int *order_var)
609 {
610 DBUG_ENTER("can_convert_field_to");
611 bool same_type;
612 #ifndef DBUG_OFF
613 char field_type_buf[MAX_FIELD_WIDTH];
614 String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
615 field->sql_type(field_type);
616 DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, source_metadata: 0x%x",
617 field_type.c_ptr_safe(), field->real_type(), source_type, metadata));
618 #endif
619 /**
620 @todo
621 Implement Field_varstring_cmopressed::real_type() and
622 Field_blob_compressed::real_type() properly. All occurencies
623 of Field::real_type() have to be inspected and adjusted if needed.
624
625 Until it is not ready we have to compare source_type against
626 binlog_type() when replicating from or to compressed data types.
627
628 @sa Comment for Field::binlog_type()
629 */
630 if (source_type == MYSQL_TYPE_VARCHAR_COMPRESSED ||
631 source_type == MYSQL_TYPE_BLOB_COMPRESSED ||
632 field->binlog_type() == MYSQL_TYPE_VARCHAR_COMPRESSED ||
633 field->binlog_type() == MYSQL_TYPE_BLOB_COMPRESSED)
634 same_type= field->binlog_type() == source_type;
635 else
636 same_type= field->real_type() == source_type;
637
638 /*
639 If the real type is the same, we need to check the metadata to
640 decide if conversions are allowed.
641 */
642 if (same_type)
643 {
644 if (metadata == 0) // Metadata can only be zero if no metadata was provided
645 {
646 /*
647 If there is no metadata, we either have an old event where no
648 metadata were supplied, or a type that does not require any
649 metadata. In either case, conversion can be done but no
650 conversion table is necessary.
651 */
652 DBUG_PRINT("debug", ("Base types are identical, but there is no metadata"));
653 *order_var= 0;
654 DBUG_RETURN(true);
655 }
656
657 DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
658 if (field->compatible_field_size(metadata, rli, mflags, order_var))
659 DBUG_RETURN(is_conversion_ok(*order_var, rli));
660 else
661 DBUG_RETURN(false);
662 }
663 else if (
664 /*
665 Conversion from MariaDB TIMESTAMP(0), TIME(0), DATETIME(0)
666 to the corresponding MySQL56 types is non-lossy.
667 */
668 (metadata == 0 &&
669 ((field->real_type() == MYSQL_TYPE_TIMESTAMP2 &&
670 source_type == MYSQL_TYPE_TIMESTAMP) ||
671 (field->real_type() == MYSQL_TYPE_TIME2 &&
672 source_type == MYSQL_TYPE_TIME) ||
673 (field->real_type() == MYSQL_TYPE_DATETIME2 &&
674 source_type == MYSQL_TYPE_DATETIME))) ||
675 /*
676 Conversion from MySQL56 TIMESTAMP(N), TIME(N), DATETIME(N)
677 to the corresponding MariaDB or MySQL55 types is non-lossy.
678 */
679 (metadata == field->decimals() &&
680 ((field->real_type() == MYSQL_TYPE_TIMESTAMP &&
681 source_type == MYSQL_TYPE_TIMESTAMP2) ||
682 (field->real_type() == MYSQL_TYPE_TIME &&
683 source_type == MYSQL_TYPE_TIME2) ||
684 (field->real_type() == MYSQL_TYPE_DATETIME &&
685 source_type == MYSQL_TYPE_DATETIME2))))
686 {
687 /*
688 TS-TODO: conversion from FSP1>FSP2.
689 */
690 *order_var= -1;
691 DBUG_RETURN(true);
692 }
693 else if (!slave_type_conversions_options)
694 DBUG_RETURN(false);
695
696 /*
697 Here, from and to will always be different. Since the types are
698 different, we cannot use the compatible_field_size() function, but
699 have to rely on hard-coded max-sizes for fields.
700 */
701
702 DBUG_PRINT("debug", ("Base types are different, checking conversion"));
703 switch (source_type) // Source type (on master)
704 {
705 case MYSQL_TYPE_DECIMAL:
706 case MYSQL_TYPE_NEWDECIMAL:
707 case MYSQL_TYPE_FLOAT:
708 case MYSQL_TYPE_DOUBLE:
709 switch (field->real_type())
710 {
711 case MYSQL_TYPE_NEWDECIMAL:
712 /*
713 Then the other type is either FLOAT, DOUBLE, or old style
714 DECIMAL, so we require lossy conversion.
715 */
716 *order_var= 1;
717 DBUG_RETURN(is_conversion_ok(*order_var, rli));
718
719 case MYSQL_TYPE_DECIMAL:
720 case MYSQL_TYPE_FLOAT:
721 case MYSQL_TYPE_DOUBLE:
722 {
723 if (source_type == MYSQL_TYPE_NEWDECIMAL ||
724 source_type == MYSQL_TYPE_DECIMAL)
725 *order_var = 1; // Always require lossy conversions
726 else
727 *order_var= compare_lengths(field, source_type, metadata);
728 DBUG_ASSERT(*order_var != 0);
729 DBUG_RETURN(is_conversion_ok(*order_var, rli));
730 }
731
732 default:
733 DBUG_RETURN(false);
734 }
735 break;
736
737 /*
738 The length comparison check will do the correct job of comparing
739 the field lengths (in bytes) of two integer types.
740 */
741 case MYSQL_TYPE_TINY:
742 case MYSQL_TYPE_SHORT:
743 case MYSQL_TYPE_INT24:
744 case MYSQL_TYPE_LONG:
745 case MYSQL_TYPE_LONGLONG:
746 switch (field->real_type())
747 {
748 case MYSQL_TYPE_TINY:
749 case MYSQL_TYPE_SHORT:
750 case MYSQL_TYPE_INT24:
751 case MYSQL_TYPE_LONG:
752 case MYSQL_TYPE_LONGLONG:
753 /*
754 max_display_length_for_field() is not fully precise for the integer
755 data types. So its result cannot be compared to the result of
756 field->max_dispay_length() when the table field and the binlog field
757 are of the same type.
758 This code should eventually be rewritten not to use
759 compare_lengths(), to detect subtype/supetype relations
760 just using the type codes.
761 */
762 DBUG_ASSERT(source_type != field->real_type());
763 *order_var= compare_lengths(field, source_type, metadata);
764 DBUG_ASSERT(*order_var != 0);
765 DBUG_RETURN(is_conversion_ok(*order_var, rli));
766
767 default:
768 DBUG_RETURN(false);
769 }
770 break;
771
772 /*
773 Since source and target type is different, and it is not possible
774 to convert bit types to anything else, this will return false.
775 */
776 case MYSQL_TYPE_BIT:
777 DBUG_RETURN(false);
778
779 /*
780 If all conversions are disabled, it is not allowed to convert
781 between these types. Since the TEXT vs. BINARY is distinguished by
782 the charset, and the charset is not replicated, we cannot
783 currently distinguish between , e.g., TEXT and BLOB.
784 */
785 case MYSQL_TYPE_TINY_BLOB:
786 case MYSQL_TYPE_MEDIUM_BLOB:
787 case MYSQL_TYPE_LONG_BLOB:
788 case MYSQL_TYPE_BLOB:
789 case MYSQL_TYPE_BLOB_COMPRESSED:
790 case MYSQL_TYPE_STRING:
791 case MYSQL_TYPE_VAR_STRING:
792 case MYSQL_TYPE_VARCHAR:
793 case MYSQL_TYPE_VARCHAR_COMPRESSED:
794 switch (field->real_type())
795 {
796 case MYSQL_TYPE_TINY_BLOB:
797 case MYSQL_TYPE_MEDIUM_BLOB:
798 case MYSQL_TYPE_LONG_BLOB:
799 case MYSQL_TYPE_BLOB:
800 case MYSQL_TYPE_BLOB_COMPRESSED:
801 case MYSQL_TYPE_STRING:
802 case MYSQL_TYPE_VAR_STRING:
803 case MYSQL_TYPE_VARCHAR:
804 case MYSQL_TYPE_VARCHAR_COMPRESSED:
805 *order_var= compare_lengths(field, source_type, metadata);
806 /*
807 Here we know that the types are different, so if the order
808 gives that they do not require any conversion, we still need
809 to have non-lossy conversion enabled to allow conversion
810 between different (string) types of the same length.
811 */
812 if (*order_var == 0)
813 *order_var= -1;
814 DBUG_RETURN(is_conversion_ok(*order_var, rli));
815
816 default:
817 DBUG_RETURN(false);
818 }
819 break;
820
821 case MYSQL_TYPE_GEOMETRY:
822 case MYSQL_TYPE_TIMESTAMP:
823 case MYSQL_TYPE_DATE:
824 case MYSQL_TYPE_TIME:
825 case MYSQL_TYPE_DATETIME:
826 case MYSQL_TYPE_YEAR:
827 case MYSQL_TYPE_NULL:
828 case MYSQL_TYPE_ENUM:
829 case MYSQL_TYPE_SET:
830 case MYSQL_TYPE_TIMESTAMP2:
831 case MYSQL_TYPE_TIME2:
832 DBUG_RETURN(false);
833 case MYSQL_TYPE_NEWDATE:
834 {
835 if (field->real_type() == MYSQL_TYPE_DATETIME2 ||
836 field->real_type() == MYSQL_TYPE_DATETIME)
837 {
838 *order_var= -1;
839 DBUG_RETURN(is_conversion_ok(*order_var, rli));
840 }
841 else
842 {
843 DBUG_RETURN(false);
844 }
845 }
846 break;
847
848 //case MYSQL_TYPE_DATETIME: TODO: fix MDEV-17394 and uncomment.
849 //
850 //The "old" type does not specify the fraction part size which is required
851 //for correct conversion.
852 case MYSQL_TYPE_DATETIME2:
853 {
854 if (field->real_type() == MYSQL_TYPE_NEWDATE)
855 {
856 *order_var= 1;
857 DBUG_RETURN(is_conversion_ok(*order_var, rli));
858 }
859 else
860 {
861 DBUG_RETURN(false);
862 }
863 }
864 break;
865 }
866 DBUG_RETURN(false); // To keep GCC happy
867 }
868
869
870 /**
871 Is the definition compatible with a table?
872
873 This function will compare the master table with an existing table
874 on the slave and see if they are compatible with respect to the
875 current settings of @c SLAVE_TYPE_CONVERSIONS.
876
877 If the tables are compatible and conversions are required, @c
878 *tmp_table_var will be set to a virtual temporary table with field
879 pointers for the fields that require conversions. This allow simple
880 checking of whether a conversion are to be applied or not.
881
882 If tables are compatible, but no conversions are necessary, @c
883 *tmp_table_var will be set to NULL.
884
885 @param rli_arg[in]
886 Relay log info, for error reporting.
887
888 @param table[in]
889 Table to compare with
890
891 @param tmp_table_var[out]
892 Virtual temporary table for performing conversions, if necessary.
893
894 @retval true Master table is compatible with slave table.
895 @retval false Master table is not compatible with slave table.
896 */
897 bool
compatible_with(THD * thd,rpl_group_info * rgi,TABLE * table,TABLE ** conv_table_var) const898 table_def::compatible_with(THD *thd, rpl_group_info *rgi,
899 TABLE *table, TABLE **conv_table_var)
900 const
901 {
902 /*
903 We only check the initial columns for the tables.
904 */
905 uint const cols_to_check= MY_MIN(table->s->fields, size());
906 Relay_log_info *rli= rgi->rli;
907 TABLE *tmp_table= NULL;
908
909 for (uint col= 0 ; col < cols_to_check ; ++col)
910 {
911 Field *const field= table->field[col];
912 int order;
913 if (can_convert_field_to(field, type(col), field_metadata(col), rli, m_flags, &order))
914 {
915 DBUG_PRINT("debug", ("Checking column %d -"
916 " field '%s' can be converted - order: %d",
917 col, field->field_name.str, order));
918 DBUG_ASSERT(order >= -1 && order <= 1);
919
920 /*
921 If order is not 0, a conversion is required, so we need to set
922 up the conversion table.
923 */
924 if (order != 0 && tmp_table == NULL)
925 {
926 /*
927 This will create the full table with all fields. This is
928 necessary to ge the correct field lengths for the record.
929 */
930 tmp_table= create_conversion_table(thd, rgi, table);
931 if (tmp_table == NULL)
932 return false;
933 /*
934 Clear all fields up to, but not including, this column.
935 */
936 for (unsigned int i= 0; i < col; ++i)
937 tmp_table->field[i]= NULL;
938 }
939
940 if (order == 0 && tmp_table != NULL)
941 tmp_table->field[col]= NULL;
942 }
943 else
944 {
945 DBUG_PRINT("debug", ("Checking column %d -"
946 " field '%s' can not be converted",
947 col, field->field_name.str));
948 DBUG_ASSERT(col < size() && col < table->s->fields);
949 DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
950 DBUG_ASSERT(table->in_use);
951 const char *db_name= table->s->db.str;
952 const char *tbl_name= table->s->table_name.str;
953 char source_buf[MAX_FIELD_WIDTH];
954 char target_buf[MAX_FIELD_WIDTH];
955 String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
956 String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
957 THD *thd= table->in_use;
958 bool char_with_octets= field->cmp_type() == STRING_RESULT ?
959 field->has_charset() : true;
960
961 show_sql_type(type(col), field_metadata(col), &source_type,
962 char_with_octets);
963 field->sql_rpl_type(&target_type);
964
965 rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED, rgi->gtid_info(),
966 ER_THD(thd, ER_SLAVE_CONVERSION_FAILED),
967 col, db_name, tbl_name,
968 source_type.c_ptr_safe(), target_type.c_ptr_safe());
969 return false;
970 }
971 }
972
973 #ifndef DBUG_OFF
974 if (tmp_table)
975 {
976 for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
977 if (tmp_table->field[col])
978 {
979 char source_buf[MAX_FIELD_WIDTH];
980 char target_buf[MAX_FIELD_WIDTH];
981 String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
982 String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
983 tmp_table->field[col]->sql_type(source_type);
984 table->field[col]->sql_type(target_type);
985 DBUG_PRINT("debug", ("Field %s - conversion required."
986 " Source type: '%s', Target type: '%s'",
987 tmp_table->field[col]->field_name.str,
988 source_type.c_ptr_safe(), target_type.c_ptr_safe()));
989 }
990 }
991 #endif
992
993 *conv_table_var= tmp_table;
994 return true;
995 }
996
997
998 /**
999 A wrapper to Virtual_tmp_table, to get access to its constructor,
1000 which is protected for safety purposes (against illegal use on stack).
1001 */
1002 class Virtual_conversion_table: public Virtual_tmp_table
1003 {
1004 public:
Virtual_conversion_table(THD * thd)1005 Virtual_conversion_table(THD *thd) :Virtual_tmp_table(thd) { }
1006 /**
1007 Add a new field into the virtual table.
1008 @param sql_type - The real_type of the field.
1009 @param metadata - The RBR binary log metadata for this field.
1010 @param target_field - The field from the target table, to get extra
1011 attributes from (e.g. typelib in case of ENUM).
1012 */
add(enum_field_types sql_type,uint16 metadata,const Field * target_field)1013 bool add(enum_field_types sql_type,
1014 uint16 metadata, const Field *target_field)
1015 {
1016 const Type_handler *handler= Type_handler::get_handler_by_real_type(sql_type);
1017 if (!handler)
1018 {
1019 sql_print_error("In RBR mode, Slave received unknown field type field %d "
1020 " for column Name: %s.%s.%s.",
1021 (int) sql_type,
1022 target_field->table->s->db.str,
1023 target_field->table->s->table_name.str,
1024 target_field->field_name.str);
1025 return true;
1026 }
1027 Field *tmp= handler->make_conversion_table_field(this, metadata,
1028 target_field);
1029 if (!tmp)
1030 return true;
1031 Virtual_tmp_table::add(tmp);
1032 DBUG_PRINT("debug", ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
1033 " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
1034 sql_type, target_field->field_name.str,
1035 tmp->field_length, tmp->decimals(), TRUE,
1036 tmp->flags, tmp->pack_length()));
1037 return false;
1038 }
1039 };
1040
1041
1042 /**
1043 Create a conversion table.
1044
1045 If the function is unable to create the conversion table, an error
1046 will be printed and NULL will be returned.
1047
1048 @return Pointer to conversion table, or NULL if unable to create
1049 conversion table.
1050 */
1051
create_conversion_table(THD * thd,rpl_group_info * rgi,TABLE * target_table) const1052 TABLE *table_def::create_conversion_table(THD *thd, rpl_group_info *rgi,
1053 TABLE *target_table) const
1054 {
1055 DBUG_ENTER("table_def::create_conversion_table");
1056
1057 Virtual_conversion_table *conv_table;
1058 Relay_log_info *rli= rgi->rli;
1059 /*
1060 At slave, columns may differ. So we should create
1061 MY_MIN(columns@master, columns@slave) columns in the
1062 conversion table.
1063 */
1064 uint const cols_to_create= MY_MIN(target_table->s->fields, size());
1065 if (!(conv_table= new(thd) Virtual_conversion_table(thd)) ||
1066 conv_table->init(cols_to_create))
1067 goto err;
1068 for (uint col= 0 ; col < cols_to_create; ++col)
1069 {
1070 if (conv_table->add(type(col), field_metadata(col),
1071 target_table->field[col]))
1072 {
1073 DBUG_PRINT("debug", ("binlog_type: %d, metadata: %04X, target_field: '%s'"
1074 " make_conversion_table_field() failed",
1075 binlog_type(col), field_metadata(col),
1076 target_table->field[col]->field_name.str));
1077 goto err;
1078 }
1079 }
1080
1081 if (conv_table->open())
1082 goto err; // Could not allocate record buffer?
1083
1084 DBUG_RETURN(conv_table);
1085
1086 err:
1087 if (conv_table)
1088 delete conv_table;
1089 rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION, rgi->gtid_info(),
1090 ER_THD(thd, ER_SLAVE_CANT_CREATE_CONVERSION),
1091 target_table->s->db.str,
1092 target_table->s->table_name.str);
1093 DBUG_RETURN(NULL);
1094 }
1095 #endif /* MYSQL_CLIENT */
1096
table_def(unsigned char * types,ulong size,uchar * field_metadata,int metadata_size,uchar * null_bitmap,uint16 flags)1097 table_def::table_def(unsigned char *types, ulong size,
1098 uchar *field_metadata, int metadata_size,
1099 uchar *null_bitmap, uint16 flags)
1100 : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
1101 m_field_metadata(0), m_null_bits(0), m_flags(flags),
1102 m_memory(NULL)
1103 {
1104 m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
1105 &m_type, size,
1106 &m_field_metadata,
1107 size * sizeof(uint16),
1108 &m_null_bits, (size + 7) / 8,
1109 NULL);
1110
1111 bzero(m_field_metadata, size * sizeof(uint16));
1112
1113 if (m_type)
1114 memcpy(m_type, types, size);
1115 else
1116 m_size= 0;
1117 /*
1118 Extract the data from the table map into the field metadata array
1119 iff there is field metadata. The variable metadata_size will be
1120 0 if we are replicating from an older version server since no field
1121 metadata was written to the table map. This can also happen if
1122 there were no fields in the master that needed extra metadata.
1123 */
1124 if (m_size && metadata_size)
1125 {
1126 int index= 0;
1127 for (unsigned int i= 0; i < m_size; i++)
1128 {
1129 switch (binlog_type(i)) {
1130 case MYSQL_TYPE_TINY_BLOB:
1131 case MYSQL_TYPE_BLOB:
1132 case MYSQL_TYPE_BLOB_COMPRESSED:
1133 case MYSQL_TYPE_MEDIUM_BLOB:
1134 case MYSQL_TYPE_LONG_BLOB:
1135 case MYSQL_TYPE_DOUBLE:
1136 case MYSQL_TYPE_FLOAT:
1137 case MYSQL_TYPE_GEOMETRY:
1138 {
1139 /*
1140 These types store a single byte.
1141 */
1142 m_field_metadata[i]= field_metadata[index];
1143 index++;
1144 break;
1145 }
1146 case MYSQL_TYPE_SET:
1147 case MYSQL_TYPE_ENUM:
1148 case MYSQL_TYPE_STRING:
1149 {
1150 uint16 x= field_metadata[index++] << 8U; // real_type
1151 x+= field_metadata[index++]; // pack or field length
1152 m_field_metadata[i]= x;
1153 break;
1154 }
1155 case MYSQL_TYPE_BIT:
1156 {
1157 uint16 x= field_metadata[index++];
1158 x = x + (field_metadata[index++] << 8U);
1159 m_field_metadata[i]= x;
1160 break;
1161 }
1162 case MYSQL_TYPE_VARCHAR:
1163 case MYSQL_TYPE_VARCHAR_COMPRESSED:
1164 {
1165 /*
1166 These types store two bytes.
1167 */
1168 char *ptr= (char *)&field_metadata[index];
1169 m_field_metadata[i]= uint2korr(ptr);
1170 index= index + 2;
1171 break;
1172 }
1173 case MYSQL_TYPE_NEWDECIMAL:
1174 {
1175 uint16 x= field_metadata[index++] << 8U; // precision
1176 x+= field_metadata[index++]; // decimals
1177 m_field_metadata[i]= x;
1178 break;
1179 }
1180 case MYSQL_TYPE_TIME2:
1181 case MYSQL_TYPE_DATETIME2:
1182 case MYSQL_TYPE_TIMESTAMP2:
1183 m_field_metadata[i]= field_metadata[index++];
1184 break;
1185 default:
1186 m_field_metadata[i]= 0;
1187 break;
1188 }
1189 }
1190 }
1191 if (m_size && null_bitmap)
1192 memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
1193 }
1194
1195
~table_def()1196 table_def::~table_def()
1197 {
1198 my_free(m_memory);
1199 #ifndef DBUG_OFF
1200 m_type= 0;
1201 m_size= 0;
1202 #endif
1203 }
1204
1205
1206 /**
1207 @param even_buf point to the buffer containing serialized event
1208 @param event_len length of the event accounting possible checksum alg
1209
1210 @return TRUE if test fails
1211 FALSE as success
1212 */
event_checksum_test(uchar * event_buf,ulong event_len,enum enum_binlog_checksum_alg alg)1213 bool event_checksum_test(uchar *event_buf, ulong event_len, enum enum_binlog_checksum_alg alg)
1214 {
1215 bool res= FALSE;
1216 uint16 flags= 0; // to store in FD's buffer flags orig value
1217
1218 if (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF)
1219 {
1220 ha_checksum incoming;
1221 ha_checksum computed;
1222
1223 if (event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT)
1224 {
1225 #ifdef DBUG_ASSERT_EXISTS
1226 int8 fd_alg= event_buf[event_len - BINLOG_CHECKSUM_LEN -
1227 BINLOG_CHECKSUM_ALG_DESC_LEN];
1228 #endif
1229 /*
1230 FD event is checksummed and therefore verified w/o the binlog-in-use flag
1231 */
1232 flags= uint2korr(event_buf + FLAGS_OFFSET);
1233 if (flags & LOG_EVENT_BINLOG_IN_USE_F)
1234 event_buf[FLAGS_OFFSET] &= ~LOG_EVENT_BINLOG_IN_USE_F;
1235 /*
1236 The only algorithm currently is CRC32. Zero indicates
1237 the binlog file is checksum-free *except* the FD-event.
1238 */
1239 DBUG_ASSERT(fd_alg == BINLOG_CHECKSUM_ALG_CRC32 || fd_alg == 0);
1240 DBUG_ASSERT(alg == BINLOG_CHECKSUM_ALG_CRC32);
1241 /*
1242 Complile time guard to watch over the max number of alg
1243 */
1244 compile_time_assert(BINLOG_CHECKSUM_ALG_ENUM_END <= 0x80);
1245 }
1246 incoming= uint4korr(event_buf + event_len - BINLOG_CHECKSUM_LEN);
1247 /* checksum the event content without the checksum part itself */
1248 computed= my_checksum(0, event_buf, event_len - BINLOG_CHECKSUM_LEN);
1249 if (flags != 0)
1250 {
1251 /* restoring the orig value of flags of FD */
1252 DBUG_ASSERT(event_buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
1253 event_buf[FLAGS_OFFSET]= (uchar) flags;
1254 }
1255 res= DBUG_EVALUATE_IF("simulate_checksum_test_failure", TRUE, computed != incoming);
1256 }
1257 return res;
1258 }
1259
1260 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
1261
Deferred_log_events(Relay_log_info * rli)1262 Deferred_log_events::Deferred_log_events(Relay_log_info *rli) : last_added(NULL)
1263 {
1264 my_init_dynamic_array(&array, sizeof(Log_event *), 32, 16, MYF(0));
1265 }
1266
~Deferred_log_events()1267 Deferred_log_events::~Deferred_log_events()
1268 {
1269 delete_dynamic(&array);
1270 }
1271
add(Log_event * ev)1272 int Deferred_log_events::add(Log_event *ev)
1273 {
1274 last_added= ev;
1275 insert_dynamic(&array, (uchar*) &ev);
1276 return 0;
1277 }
1278
is_empty()1279 bool Deferred_log_events::is_empty()
1280 {
1281 return array.elements == 0;
1282 }
1283
execute(rpl_group_info * rgi)1284 bool Deferred_log_events::execute(rpl_group_info *rgi)
1285 {
1286 bool res= false;
1287 DBUG_ENTER("Deferred_log_events::execute");
1288 DBUG_ASSERT(rgi->deferred_events_collecting);
1289
1290 rgi->deferred_events_collecting= false;
1291 for (uint i= 0; !res && i < array.elements; i++)
1292 {
1293 Log_event *ev= (* (Log_event **)
1294 dynamic_array_ptr(&array, i));
1295 res= ev->apply_event(rgi);
1296 }
1297 rgi->deferred_events_collecting= true;
1298 DBUG_RETURN(res);
1299 }
1300
rewind()1301 void Deferred_log_events::rewind()
1302 {
1303 /*
1304 Reset preceding Query log event events which execution was
1305 deferred because of slave side filtering.
1306 */
1307 if (!is_empty())
1308 {
1309 for (uint i= 0; i < array.elements; i++)
1310 {
1311 Log_event *ev= *(Log_event **) dynamic_array_ptr(&array, i);
1312 delete ev;
1313 }
1314 last_added= NULL;
1315 if (array.elements > array.max_element)
1316 freeze_size(&array);
1317 reset_dynamic(&array);
1318 }
1319 last_added= NULL;
1320 }
1321
1322 #endif
1323