1 /* Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/rpl_utility.h"
24
25 #include <string.h>
26 #include <iterator>
27 #include <new>
28 #include <utility>
29
30 #include "lex_string.h"
31 #include "libbinlogevents/export/binary_log_funcs.h"
32 #include "my_byteorder.h"
33 #include "my_dbug.h"
34 #include "my_loglevel.h"
35 #include "my_sys.h"
36 #include "mysql/components/services/log_builtins.h"
37 #include "mysql/service_mysql_alloc.h"
38 #include "sql/thr_malloc.h"
39
40 struct TYPELIB;
41
42 #ifdef MYSQL_SERVER
43
44 #include <algorithm>
45
46 #include "libbinlogevents/include/binlog_event.h" // checksum_crv32
47 #include "m_ctype.h"
48 #include "m_string.h"
49 #include "my_base.h"
50 #include "my_bitmap.h"
51 #include "mysql/components/services/log_builtins.h"
52 #include "mysql/psi/psi_memory.h"
53 #include "mysqld_error.h"
54 #include "sql/create_field.h"
55 #include "sql/dd/dd.h" // get_dictionary
56 #include "sql/dd/dictionary.h" // is_dd_table_access_allowed
57 #include "sql/derror.h" // ER_THD
58 #include "sql/field.h" // Field
59 #include "sql/log.h"
60 #include "sql/log_event.h" // Log_event
61 #include "sql/my_decimal.h"
62 #include "sql/mysqld.h" // slave_type_conversions_options
63 #include "sql/psi_memory_key.h"
64 #include "sql/rpl_rli.h" // Relay_log_info
65 #include "sql/rpl_slave.h"
66 #include "sql/sql_class.h" // THD
67 #include "sql/sql_const.h"
68 #include "sql/sql_lex.h" // LEX
69 #include "sql/sql_list.h"
70 #include "sql/sql_plugin_ref.h"
71 #include "sql/sql_tmp_table.h" // create_tmp_table_from_fields
72 #include "sql_show.h" // show_sql_type
73 #include "sql_string.h"
74 #include "template_utils.h" // delete_container_pointers
75 #include "typelib.h"
76
77 using binary_log::checksum_crc32;
78 using std::max;
79 using std::min;
80 using std::unique_ptr;
81
82 #endif // MYSQL_SERVER
83
84 /*********************************************************************
85 * table_def member definitions *
86 *********************************************************************/
87
88 /*
89 This function returns the field size in raw bytes based on the type
90 and the encoded field data from the master's raw data.
91 */
calc_field_size(uint col,const uchar * master_data) const92 uint32 table_def::calc_field_size(uint col, const uchar *master_data) const {
93 uint32 length =
94 ::calc_field_size(type(col), master_data, m_field_metadata[col]);
95 return length;
96 }
97
98 #if defined(MYSQL_SERVER)
99 /**
100 Function to compare two size_t integers for their relative
101 order. Used below.
102 */
compare(size_t a,size_t b)103 static int compare(size_t a, size_t b) {
104 if (a < b) return -1;
105 if (b < a) return 1;
106 return 0;
107 }
108
109 /*
110 Compare the pack lengths of a source field (on the master) and a
111 target field (on the slave).
112
113 @param field Target field.
114 @param type Source field type.
115 @param metadata Source field metadata.
116
117 @retval -1 The length of the source field is smaller than the target field.
118 @retval 0 The length of the source and target fields are the same.
119 @retval 1 The length of the source field is greater than the target field.
120 */
compare_lengths(Field * field,enum_field_types source_type,uint16 metadata)121 static int compare_lengths(Field *field, enum_field_types source_type,
122 uint16 metadata) {
123 DBUG_TRACE;
124 size_t const source_length =
125 max_display_length_for_field(source_type, metadata);
126 size_t const target_length = field->max_display_length();
127 DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
128 " target_length: %lu, target_type: %u",
129 (unsigned long)source_length, source_type,
130 (unsigned long)target_length, field->real_type()));
131 int result = compare(source_length, target_length);
132 DBUG_PRINT("result", ("%d", result));
133 return result;
134 }
135
136 /**
137 Check the order variable and print errors if the order is not
138 acceptable according to the current settings.
139
140 @param order The computed order of the conversion needed.
141 */
is_conversion_ok(int order)142 static bool is_conversion_ok(int order) {
143 DBUG_TRACE;
144 bool allow_non_lossy, allow_lossy;
145
146 allow_non_lossy = slave_type_conversions_options &
147 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
148 allow_lossy = slave_type_conversions_options &
149 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
150
151 DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
152 allow_non_lossy ? " ALL_NON_LOSSY" : "",
153 allow_lossy ? " ALL_LOSSY" : ""));
154 if (order < 0 && !allow_non_lossy) {
155 /* !!! Add error message saying that non-lossy conversions need to be
156 * allowed. */
157 return false;
158 }
159
160 if (order > 0 && !allow_lossy) {
161 /* !!! Add error message saying that lossy conversions need to be allowed.
162 */
163 return false;
164 }
165
166 return true;
167 }
168
169 /**
170 Check if the types are criss cross means type1 is MYSQL_TYPE_TIMESTAMP
171 and type2 as MYSQL_TYPE_TIMESTAMP2 or vice versa.
172 */
timestamp_cross_check(enum_field_types type1,enum_field_types type2)173 inline bool timestamp_cross_check(enum_field_types type1,
174 enum_field_types type2) {
175 return ((type1 == MYSQL_TYPE_TIMESTAMP && type2 == MYSQL_TYPE_TIMESTAMP2) ||
176 (type1 == MYSQL_TYPE_TIMESTAMP2 && type2 == MYSQL_TYPE_TIMESTAMP));
177 }
178
179 /**
180 Check if the types are criss cross means type1 is MYSQL_TYPE_DATETIME
181 and type2 as MYSQL_TYPE_DATETIME or vice versa.
182 */
datetime_cross_check(enum_field_types type1,enum_field_types type2)183 inline bool datetime_cross_check(enum_field_types type1,
184 enum_field_types type2) {
185 return ((type1 == MYSQL_TYPE_DATETIME && type2 == MYSQL_TYPE_DATETIME2) ||
186 (type1 == MYSQL_TYPE_DATETIME2 && type2 == MYSQL_TYPE_DATETIME));
187 }
188
189 /**
190 Check if the types are criss cross means type1 is MYSQL_TYPE_TIME
191 and type2 as MYSQL_TYPE_TIME2 or vice versa.
192 */
time_cross_check(enum_field_types type1,enum_field_types type2)193 inline bool time_cross_check(enum_field_types type1, enum_field_types type2) {
194 return ((type1 == MYSQL_TYPE_TIME && type2 == MYSQL_TYPE_TIME2) ||
195 (type1 == MYSQL_TYPE_TIME2 && type2 == MYSQL_TYPE_TIME));
196 }
197
198 /**
199 Can a type potentially be converted to another type?
200
201 This function check if the types are convertible and what
202 conversion is required.
203
204 If conversion is not possible, and error is printed.
205
206 If conversion is possible:
207
208 - *order will be set to -1 if source type is smaller than target
209 type and a non-lossy conversion can be required. This includes
210 the case where the field types are different but types could
211 actually be converted in either direction.
212
213 - *order will be set to 0 if no conversion is required.
214
215 - *order will be set to 1 if the source type is strictly larger
216 than the target type and that conversion is potentially lossy.
217
218 @param[in] field Target field
219 @param[in] source_type Source field type
220 @param[in] metadata Source field metadata
221 @param[in] is_array Whether the source field is a typed array
222 @param[in] rli Relay log info (for error reporting)
223 @param[in] mflags Flags from the table map event
224 @param[out] order_var Order between source field and target field
225
226 @return @c true if conversion is possible according to the current
227 settings, @c false if conversion is not possible according to the
228 current setting.
229 */
can_convert_field_to(Field * field,enum_field_types source_type,uint metadata,bool is_array,Relay_log_info * rli,uint16 mflags,int * order_var)230 static bool can_convert_field_to(Field *field, enum_field_types source_type,
231 uint metadata, bool is_array,
232 Relay_log_info *rli, uint16 mflags,
233 int *order_var) {
234 DBUG_TRACE;
235 #ifndef DBUG_OFF
236 char field_type_buf[MAX_FIELD_WIDTH];
237 String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
238 field->sql_type(field_type);
239 DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, "
240 "source_metadata: 0x%x",
241 field_type.c_ptr_safe(), field->real_type(), source_type,
242 metadata));
243 #endif
244 // Can't convert from scalar to array and vice versa
245 if (is_array != field->is_array()) return false;
246
247 /*
248 If the real type is the same, we need to check the metadata to
249 decide if conversions are allowed.
250 */
251 if (field->real_type() == source_type) {
252 if (metadata == 0) // Metadata can only be zero if no metadata was provided
253 {
254 /*
255 If there is no metadata, we either have an old event where no
256 metadata were supplied, or a type that does not require any
257 metadata. In either case, conversion can be done but no
258 conversion table is necessary.
259 */
260 DBUG_PRINT("debug",
261 ("Base types are identical, but there is no metadata"));
262 *order_var = 0;
263 return true;
264 }
265
266 DBUG_PRINT("debug",
267 ("Base types are identical, doing field size comparison"));
268 if (field->compatible_field_size(metadata, rli, mflags, order_var))
269 return is_conversion_ok(*order_var);
270 else
271 return false;
272 } else if (is_array) {
273 // Can't covert between typed array of different types
274 return false;
275 } else if (metadata == 0 &&
276 (timestamp_cross_check(field->real_type(), source_type) ||
277 datetime_cross_check(field->real_type(), source_type) ||
278 time_cross_check(field->real_type(), source_type))) {
279 /*
280 In the above condition, we are taking care
281 of case where
282 1) Master having old TIME, TIMESTAMP, DATETIME
283 and slave have new TIME2, TIMESTAMP2, DATETIME2
284 or
285 2) Master having new TIMESTAMP2, DATETIME2, TIME2
286 with fraction part zero and slave have TIME,
287 TIMESTAMP, DATETIME.
288 We need second condition, as when we are
289 upgrading from 5.5 to 5.6 TIME, TIMESTAMP,
290 DATETIME columns are not upgraded to TIME(0),
291 TIMESTAMP(0), DATETIME(0).
292 So to support these conversion we are putting this
293 condition.
294 */
295 /*
296 TS-TODO: conversion from FSP1>FSP2.
297 Can do non-lossy conversion
298 from old TIME, TIMESTAMP, DATETIME
299 to new TIME(0), TIMESTAMP(0), DATETIME(0).
300 */
301 *order_var = -1;
302 return true;
303 } else if (!slave_type_conversions_options)
304 return false;
305
306 /*
307 Here, from and to will always be different. Since the types are
308 different, we cannot use the compatible_field_size() function, but
309 have to rely on hard-coded max-sizes for fields.
310 */
311
312 DBUG_PRINT("debug", ("Base types are different, checking conversion"));
313 switch (source_type) // Source type (on master)
314 {
315 case MYSQL_TYPE_DECIMAL:
316 case MYSQL_TYPE_NEWDECIMAL:
317 case MYSQL_TYPE_FLOAT:
318 case MYSQL_TYPE_DOUBLE:
319 switch (field->real_type()) {
320 case MYSQL_TYPE_NEWDECIMAL:
321 /*
322 Then the other type is either FLOAT, DOUBLE, or old style
323 DECIMAL, so we require lossy conversion.
324 */
325 *order_var = 1;
326 return is_conversion_ok(*order_var);
327
328 case MYSQL_TYPE_DECIMAL:
329 case MYSQL_TYPE_FLOAT:
330 case MYSQL_TYPE_DOUBLE: {
331 if (source_type == MYSQL_TYPE_NEWDECIMAL ||
332 source_type == MYSQL_TYPE_DECIMAL)
333 *order_var = 1; // Always require lossy conversions
334 else
335 *order_var = compare_lengths(field, source_type, metadata);
336 DBUG_ASSERT(*order_var != 0);
337 return is_conversion_ok(*order_var);
338 }
339
340 default:
341 return false;
342 }
343 break;
344
345 /*
346 The length comparison check will do the correct job of comparing
347 the field lengths (in bytes) of two integer types.
348 */
349 case MYSQL_TYPE_TINY:
350 case MYSQL_TYPE_SHORT:
351 case MYSQL_TYPE_INT24:
352 case MYSQL_TYPE_LONG:
353 case MYSQL_TYPE_LONGLONG:
354 switch (field->real_type()) {
355 case MYSQL_TYPE_TINY:
356 case MYSQL_TYPE_SHORT:
357 case MYSQL_TYPE_INT24:
358 case MYSQL_TYPE_LONG:
359 case MYSQL_TYPE_LONGLONG:
360 *order_var = compare_lengths(field, source_type, metadata);
361 DBUG_ASSERT(*order_var != 0);
362 return is_conversion_ok(*order_var);
363
364 default:
365 return false;
366 }
367 break;
368
369 /*
370 Since source and target type is different, and it is not possible
371 to convert bit types to anything else, this will return false.
372 */
373 case MYSQL_TYPE_BIT:
374 return false;
375
376 /*
377 If all conversions are disabled, it is not allowed to convert
378 between these types. Since the TEXT vs. BINARY is distinguished by
379 the charset, and the charset is not replicated, we cannot
380 currently distinguish between , e.g., TEXT and BLOB.
381 */
382 case MYSQL_TYPE_TINY_BLOB:
383 case MYSQL_TYPE_MEDIUM_BLOB:
384 case MYSQL_TYPE_LONG_BLOB:
385 case MYSQL_TYPE_BLOB:
386 case MYSQL_TYPE_STRING:
387 case MYSQL_TYPE_VAR_STRING:
388 case MYSQL_TYPE_VARCHAR:
389 switch (field->real_type()) {
390 case MYSQL_TYPE_TINY_BLOB:
391 case MYSQL_TYPE_MEDIUM_BLOB:
392 case MYSQL_TYPE_LONG_BLOB:
393 case MYSQL_TYPE_BLOB:
394 case MYSQL_TYPE_STRING:
395 case MYSQL_TYPE_VAR_STRING:
396 case MYSQL_TYPE_VARCHAR:
397 *order_var = compare_lengths(field, source_type, metadata);
398 /*
399 Here we know that the types are different, so if the order
400 gives that they do not require any conversion, we still need
401 to have non-lossy conversion enabled to allow conversion
402 between different (string) types of the same length.
403 */
404 if (*order_var == 0) *order_var = -1;
405 return is_conversion_ok(*order_var);
406
407 default:
408 return false;
409 }
410 break;
411
412 case MYSQL_TYPE_GEOMETRY:
413 case MYSQL_TYPE_JSON:
414 case MYSQL_TYPE_TIMESTAMP:
415 case MYSQL_TYPE_DATE:
416 case MYSQL_TYPE_TIME:
417 case MYSQL_TYPE_DATETIME:
418 case MYSQL_TYPE_YEAR:
419 case MYSQL_TYPE_NEWDATE:
420 case MYSQL_TYPE_NULL:
421 case MYSQL_TYPE_ENUM:
422 case MYSQL_TYPE_SET:
423 case MYSQL_TYPE_TIMESTAMP2:
424 case MYSQL_TYPE_DATETIME2:
425 case MYSQL_TYPE_TIME2:
426 case MYSQL_TYPE_TYPED_ARRAY:
427 return false;
428 }
429 return false; // To keep GCC happy
430 }
431
432 /**
433 Is the definition compatible with a table when it does not belong to
434 the data dictionary?
435
436 This function first finds out whether the table belongs to the data
437 dictionary. When not, it will compare the master table with an existing
438 table on the slave and see if they are compatible with respect to the
439 current settings of @c SLAVE_TYPE_CONVERSIONS.
440
441 If the tables are compatible and conversions are required, @c
442 *tmp_table_var will be set to a virtual temporary table with field
443 pointers for the fields that require conversions. This allow simple
444 checking of whether a conversion are to be applied or not.
445
446 If tables are compatible, but no conversions are necessary, @c
447 *tmp_table_var will be set to NULL.
448
449 @param [in] rli Relay log info, for error reporting.
450
451 @param [in] table Table to compare with
452
453 @param [out] conv_table_var Virtual temporary table for performing
454 conversions, if necessary.
455
456 @retval true Master table is compatible with slave table.
457 @retval false When the table belongs to the data dictionary or
458 master table is not compatible with slave table.
459 */
compatible_with(THD * thd,Relay_log_info * rli,TABLE * table,TABLE ** conv_table_var) const460 bool table_def::compatible_with(THD *thd, Relay_log_info *rli, TABLE *table,
461 TABLE **conv_table_var) const {
462 /*
463 Prohibit replication into dictionary internal tables. We know this is
464 not DDL (which will be replicated as statements, and rejected by the
465 corresponding check for SQL statements), thus 'false' in the call below.
466 Also sserting that this is not a DD system thread.
467 */
468 DBUG_ASSERT(!thd->is_dd_system_thread());
469 const dd::Dictionary *dictionary = dd::get_dictionary();
470 if (dictionary && !dictionary->is_dd_table_access_allowed(
471 false, false, table->s->db.str, table->s->db.length,
472 table->s->table_name.str)) {
473 DBUG_PRINT("debug", ("Access to dictionary table %s.%s is prohibited",
474 table->s->db.str, table->s->table_name.str));
475 rli->report(
476 ERROR_LEVEL, ER_SERVER_NO_SYSTEM_TABLE_ACCESS,
477 ER_THD(thd, ER_SERVER_NO_SYSTEM_TABLE_ACCESS),
478 ER_THD_NONCONST(thd, dictionary->table_type_error_code(
479 table->s->db.str, table->s->table_name.str)),
480 table->s->db.str, table->s->table_name.str);
481 return false;
482 }
483
484 /*
485 We only check the initial columns for the tables.
486 */
487 Replicated_columns_view fields{table, Replicated_columns_view::INBOUND, thd};
488 uint const cols_to_check = min<ulong>(fields.filtered_size(), size());
489 TABLE *tmp_table = nullptr;
490
491 for (auto it = fields.begin(); it.filtered_pos() < cols_to_check; ++it) {
492 Field *const field = *it;
493 size_t col = it.filtered_pos();
494 int order;
495 if (can_convert_field_to(field, type(col), field_metadata(col),
496 is_array(col), rli, m_flags, &order)) {
497 DBUG_PRINT("debug", ("Checking column %lu -"
498 " field '%s' can be converted - order: %d",
499 static_cast<long unsigned int>(col),
500 field->field_name, order));
501 DBUG_ASSERT(order >= -1 && order <= 1);
502
503 /*
504 If order is not 0, a conversion is required, so we need to set
505 up the conversion table.
506 */
507 if (order != 0 && tmp_table == nullptr) {
508 /*
509 This will create the full table with all fields. This is
510 necessary to ge the correct field lengths for the record.
511 */
512 tmp_table = create_conversion_table(thd, rli, table);
513 if (tmp_table == nullptr) return false;
514 /*
515 Clear all fields up to, but not including, this column.
516 */
517 for (unsigned int i = 0; i < col; ++i) tmp_table->field[i] = nullptr;
518 }
519
520 if (order == 0 && tmp_table != nullptr) tmp_table->field[col] = nullptr;
521 } else {
522 DBUG_PRINT("debug",
523 ("Checking column %lu -"
524 " field '%s' can not be converted",
525 static_cast<long unsigned int>(col), field->field_name));
526 DBUG_ASSERT(col < size() && col < table->s->fields);
527 DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
528 const char *db_name = table->s->db.str;
529 const char *tbl_name = table->s->table_name.str;
530 char source_buf[MAX_FIELD_WIDTH];
531 char target_buf[MAX_FIELD_WIDTH];
532 String field_sql_type;
533 enum loglevel report_level = INFORMATION_LEVEL;
534 String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
535 String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
536 show_sql_type(type(col), is_array(col), field_metadata(col),
537 &source_type);
538 field->sql_type(target_type);
539 if (!ignored_error_code(ER_SERVER_SLAVE_CONVERSION_FAILED)) {
540 report_level = ERROR_LEVEL;
541 thd->is_slave_error = true;
542 } else if (log_error_verbosity >= 2)
543 report_level = WARNING_LEVEL;
544
545 if (field->has_charset() && (field->type() == MYSQL_TYPE_VARCHAR ||
546 field->type() == MYSQL_TYPE_STRING)) {
547 field_sql_type.append((field->type() == MYSQL_TYPE_VARCHAR) ? "varchar"
548 : "char");
549 const CHARSET_INFO *cs = field->charset();
550 size_t length = cs->cset->snprintf(
551 cs, target_type.ptr(), target_type.alloced_length(),
552 "%s(%u(bytes) %s)", field_sql_type.c_ptr_safe(),
553 field->field_length, field->charset()->csname);
554 target_type.length(length);
555 } else
556 field->sql_type(target_type);
557
558 if (report_level != INFORMATION_LEVEL)
559 rli->report(report_level, ER_SERVER_SLAVE_CONVERSION_FAILED,
560 ER_THD(thd, ER_SERVER_SLAVE_CONVERSION_FAILED), col,
561 db_name, tbl_name, source_type.c_ptr_safe(),
562 target_type.c_ptr_safe());
563 return false;
564 }
565 }
566
567 #ifndef DBUG_OFF
568 if (tmp_table) {
569 for (unsigned int col = 0; col < tmp_table->s->fields; ++col)
570 if (tmp_table->field[col]) {
571 char source_buf[MAX_FIELD_WIDTH];
572 char target_buf[MAX_FIELD_WIDTH];
573 String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
574 String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
575 tmp_table->field[col]->sql_type(source_type);
576 table->field[col]->sql_type(target_type);
577 DBUG_PRINT("debug",
578 ("Field %s - conversion required."
579 " Source type: '%s', Target type: '%s'",
580 tmp_table->field[col]->field_name, source_type.c_ptr_safe(),
581 target_type.c_ptr_safe()));
582 }
583 }
584 #endif
585
586 *conv_table_var = tmp_table;
587 return true;
588 }
589
590 /**
591 Create a conversion table.
592
593 If the function is unable to create the conversion table, an error
594 will be printed and NULL will be returned.
595
596 @return Pointer to conversion table, or NULL if unable to create
597 conversion table.
598 */
599
create_conversion_table(THD * thd,Relay_log_info * rli,TABLE * target_table) const600 TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli,
601 TABLE *target_table) const {
602 DBUG_TRACE;
603
604 List<Create_field> field_list;
605 TABLE *conv_table = nullptr;
606 /*
607 At slave, columns may differ. So we should create
608 min(columns@master, columns@slave) columns in the
609 conversion table.
610 */
611 uint const cols_to_create = min<ulong>(target_table->s->fields, size());
612
613 // Default value : treat all values signed
614 bool unsigned_flag = false;
615
616 // Check if slave_type_conversions contains ALL_UNSIGNED
617 unsigned_flag = slave_type_conversions_options &
618 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_UNSIGNED);
619
620 // Check if slave_type_conversions contains ALL_SIGNED
621 unsigned_flag =
622 unsigned_flag && !(slave_type_conversions_options &
623 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_SIGNED));
624
625 for (uint col = 0; col < cols_to_create; ++col) {
626 Create_field *field_def = new (thd->mem_root) Create_field();
627 if (field_list.push_back(field_def)) return nullptr;
628
629 uint decimals = 0;
630 TYPELIB *interval = nullptr;
631 uint pack_length_override = 0; // 0 => NA. Only assigned below when needed.
632 enum_field_types field_type = type(col);
633 uint32 max_length =
634 max_display_length_for_field(field_type, field_metadata(col));
635
636 switch (field_type) {
637 uint precision;
638 case MYSQL_TYPE_ENUM:
639 case MYSQL_TYPE_SET:
640 interval = static_cast<Field_enum *>(target_table->field[col])->typelib;
641 /*
642 Number of elements in interval on master and slave might differ.
643 Use pack length from binary log instead of one calculated from
644 number of interval elements on slave.
645 */
646 pack_length_override = field_metadata(col) & 0x00ff;
647 break;
648
649 case MYSQL_TYPE_NEWDECIMAL:
650 /*
651 The display length of a DECIMAL type is not the same as the
652 length that should be supplied to make_field, so we correct
653 the length here.
654 */
655 precision = field_metadata(col) >> 8;
656 decimals = field_metadata(col) & 0x00ff;
657 max_length = my_decimal_precision_to_length(precision, decimals, false);
658 break;
659
660 case MYSQL_TYPE_DECIMAL:
661 LogErr(ERROR_LEVEL, ER_RPL_INCOMPATIBLE_DECIMAL_IN_RBR,
662 target_table->s->db.str, target_table->s->table_name.str,
663 target_table->field[col]->field_name);
664 goto err;
665
666 case MYSQL_TYPE_BLOB:
667 /*
668 Blobs are binlogged as MYSQL_TYPE_BLOB, even when pack_length
669 != 2. Need the exact blob type for the call to
670 Create_field::init_for_tmp_table() below. Note that
671 pack_length is NOT assigned to pack_length_override here, as
672 this should only be used when the pack_length cannot be
673 derived from the exact type, i.e. for ENUM and SET (see
674 above).
675 */
676 field_type = blob_type_from_pack_length(field_metadata(col) & 0x00ff);
677 break;
678
679 default:
680 break;
681 }
682
683 DBUG_PRINT(
684 "debug",
685 ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
686 " maybe_null: %d, unsigned_flag: %d",
687 binlog_type(col), target_table->field[col]->field_name, max_length,
688 decimals, true, unsigned_flag));
689 field_def->init_for_tmp_table(field_type, max_length, decimals,
690 true, // maybe_null
691 unsigned_flag, // unsigned_flag
692 pack_length_override);
693 field_def->charset = target_table->field[col]->charset();
694 field_def->interval = interval;
695 }
696
697 conv_table = DBUG_EVALUATE_IF(
698 "simulate_out_of_memory_while_creating_temp_table_for_conversion",
699 nullptr, create_tmp_table_from_fields(thd, field_list));
700 err:
701 if (conv_table == nullptr) {
702 enum loglevel report_level = INFORMATION_LEVEL;
703 if (!ignored_error_code(ER_SLAVE_CANT_CREATE_CONVERSION)) {
704 report_level = ERROR_LEVEL;
705 thd->is_slave_error = true;
706 } else if (log_error_verbosity >= 2)
707 report_level = WARNING_LEVEL;
708
709 if (report_level != INFORMATION_LEVEL)
710 rli->report(report_level, ER_SLAVE_CANT_CREATE_CONVERSION,
711 ER_THD(thd, ER_SLAVE_CANT_CREATE_CONVERSION),
712 target_table->s->db.str, target_table->s->table_name.str);
713 }
714 return conv_table;
715 }
716
717 #endif /* MYSQL_SERVER */
718
719 /**
720 Decode field metadata from a char buffer (serialized form) into an int
721 (packed form).
722
723 @note On little-endian platforms (e.g Intel) this function effectively
724 inverts order of bytes compared to what Field::save_field_metadata()
725 writes. E.g for MYSQL_TYPE_NEWDECIMAL save_field_metadata writes precision
726 into the first byte and decimals into the second, this function puts
727 precision into the second byte and decimals into the first. This layout
728 is expected by replication code that reads metadata in the uint form.
729 Due to this design feature show_sql_type() can't correctly print
730 immediate output of save_field_metadata(), this function have to be used
731 as translator.
732
733 @param buffer Field metadata, in the character stream form produced by
734 save_field_metadata.
735 @param binlog_type The type of the field, in the form returned by
736 Field::binlog_type and stored in Table_map_log_event.
737 @retval pair where:
738 - the first component is the length of the metadata within 'buffer',
739 i.e., how much the buffer pointer should move forward in order to skip it.
740 - the second component is pair containing:
741 - the metadata, encoded as an 'uint', in the form required by e.g.
742 show_sql_type.
743 - bool indicating whether the field is array (true) or a scalar (false)
744 */
745
read_field_metadata(const uchar * buffer,enum_field_types binlog_type)746 std::pair<my_off_t, std::pair<uint, bool>> read_field_metadata(
747 const uchar *buffer, enum_field_types binlog_type) {
748 bool is_array = false;
749 uint metadata = 0;
750 uint index = 0;
751 if (binlog_type == MYSQL_TYPE_TYPED_ARRAY) {
752 binlog_type = static_cast<enum_field_types>(buffer[index++]);
753 is_array = true;
754 }
755 switch (binlog_type) {
756 case MYSQL_TYPE_TINY_BLOB:
757 case MYSQL_TYPE_BLOB:
758 case MYSQL_TYPE_MEDIUM_BLOB:
759 case MYSQL_TYPE_LONG_BLOB:
760 case MYSQL_TYPE_DOUBLE:
761 case MYSQL_TYPE_FLOAT:
762 case MYSQL_TYPE_GEOMETRY:
763 case MYSQL_TYPE_TIME2:
764 case MYSQL_TYPE_DATETIME2:
765 case MYSQL_TYPE_TIMESTAMP2:
766 case MYSQL_TYPE_JSON: {
767 /*
768 These types store a single byte.
769 */
770 metadata = buffer[index++];
771 break;
772 }
773 case MYSQL_TYPE_SET:
774 case MYSQL_TYPE_ENUM:
775 case MYSQL_TYPE_STRING: {
776 metadata = buffer[index++] << 8U; // real_type
777 metadata += buffer[index++]; // pack or field length
778 break;
779 }
780 case MYSQL_TYPE_BIT: {
781 metadata = buffer[index++];
782 metadata += (buffer[index++] << 8U);
783 break;
784 }
785 case MYSQL_TYPE_VARCHAR: {
786 /*
787 These types store two bytes.
788 */
789 if (is_array) {
790 metadata = uint3korr(buffer + index);
791 index = index + 3;
792 } else {
793 metadata = uint2korr(buffer + index);
794 index = index + 2;
795 }
796 break;
797 }
798 case MYSQL_TYPE_NEWDECIMAL: {
799 metadata = buffer[index++] << 8U; // precision
800 metadata += buffer[index++]; // decimals
801 break;
802 }
803 default:
804 metadata = 0;
805 break;
806 }
807 return std::make_pair(index, std::make_pair(metadata, is_array));
808 }
809
810 PSI_memory_key key_memory_table_def_memory;
811
table_def(unsigned char * types,ulong size,uchar * field_metadata,int metadata_size,uchar * null_bitmap,uint16 flags)812 table_def::table_def(unsigned char *types, ulong size, uchar *field_metadata,
813 int metadata_size, uchar *null_bitmap, uint16 flags)
814 : m_size(size),
815 m_type(nullptr),
816 m_field_metadata_size(metadata_size),
817 m_field_metadata(nullptr),
818 m_null_bits(nullptr),
819 m_flags(flags),
820 m_memory(nullptr),
821 m_json_column_count(-1),
822 m_is_array(nullptr) {
823 m_memory = (uchar *)my_multi_malloc(
824 key_memory_table_def_memory, MYF(MY_WME), &m_type, size,
825 &m_field_metadata, size * sizeof(uint), &m_is_array, size * sizeof(bool),
826 &m_null_bits, (size + 7) / 8, nullptr);
827
828 memset(m_field_metadata, 0, size * sizeof(uint));
829 memset(m_is_array, 0, size * sizeof(bool));
830
831 if (m_type)
832 memcpy(m_type, types, size);
833 else
834 m_size = 0;
835 /*
836 Extract the data from the table map into the field metadata array
837 iff there is field metadata. The variable metadata_size will be
838 0 if we are replicating from an older version server since no field
839 metadata was written to the table map. This can also happen if
840 there were no fields in the master that needed extra metadata.
841 */
842 if (m_size && metadata_size) {
843 int index = 0;
844 for (unsigned int i = 0; i < m_size; i++) {
845 std::pair<my_off_t, std::pair<uint, bool>> pack = read_field_metadata(
846 static_cast<const uchar *>(field_metadata + index), binlog_type(i));
847 // Update type of the typed array
848 if (binlog_type(i) == MYSQL_TYPE_TYPED_ARRAY)
849 m_type[i] = static_cast<enum_field_types>(field_metadata[index]);
850 // Fill in read metadata
851 m_field_metadata[i] = pack.second.first;
852 m_is_array[i] = pack.second.second;
853 index += pack.first;
854 DBUG_ASSERT(index <= metadata_size);
855 }
856 }
857 if (m_size && null_bitmap) memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
858 }
859
~table_def()860 table_def::~table_def() {
861 my_free(m_memory);
862 #ifndef DBUG_OFF
863 m_type = nullptr;
864 m_size = 0;
865 #endif
866 }
867
868 #ifdef MYSQL_SERVER
869
870 #define HASH_ROWS_POS_SEARCH_INVALID -1
871
872 /**
873 Utility methods for handling row based operations.
874 */
875
operator ()(HASH_ROW_ENTRY * entry) const876 void hash_slave_rows_free_entry::operator()(HASH_ROW_ENTRY *entry) const {
877 DBUG_TRACE;
878 if (entry) {
879 if (entry->preamble) {
880 entry->preamble->~HASH_ROW_PREAMBLE();
881 my_free(entry->preamble);
882 }
883 if (entry->positions) my_free(entry->positions);
884 my_free(entry);
885 }
886 }
887
is_empty(void)888 bool Hash_slave_rows::is_empty(void) { return m_hash.empty(); }
889
890 /**
891 Hashing commodity structures and functions.
892 */
893
init(void)894 bool Hash_slave_rows::init(void) { return false; }
895
deinit(void)896 bool Hash_slave_rows::deinit(void) {
897 DBUG_TRACE;
898 m_hash.clear();
899 return false;
900 }
901
size()902 int Hash_slave_rows::size() { return m_hash.size(); }
903
make_entry()904 HASH_ROW_ENTRY *Hash_slave_rows::make_entry() {
905 return make_entry(nullptr, nullptr);
906 }
907
make_entry(const uchar * bi_start,const uchar * bi_ends)908 HASH_ROW_ENTRY *Hash_slave_rows::make_entry(const uchar *bi_start,
909 const uchar *bi_ends) {
910 DBUG_TRACE;
911
912 HASH_ROW_ENTRY *entry = (HASH_ROW_ENTRY *)my_malloc(
913 key_memory_HASH_ROW_ENTRY, sizeof(HASH_ROW_ENTRY), MYF(0));
914 HASH_ROW_PREAMBLE *preamble = (HASH_ROW_PREAMBLE *)my_malloc(
915 key_memory_HASH_ROW_ENTRY, sizeof(HASH_ROW_PREAMBLE), MYF(0));
916 HASH_ROW_POS *pos = (HASH_ROW_POS *)my_malloc(key_memory_HASH_ROW_ENTRY,
917 sizeof(HASH_ROW_POS), MYF(0));
918
919 if (!entry || !preamble || !pos) goto err;
920
921 /**
922 Filling in the preamble.
923 */
924 new (preamble) HASH_ROW_PREAMBLE();
925 preamble->hash_value = 0;
926 preamble->search_state = m_hash.end();
927 preamble->is_search_state_inited = false;
928
929 /**
930 Filling in the positions.
931 */
932 pos->bi_start = bi_start;
933 pos->bi_ends = bi_ends;
934
935 /**
936 Filling in the entry
937 */
938 entry->preamble = preamble;
939 entry->positions = pos;
940
941 return entry;
942
943 err:
944 DBUG_PRINT("info", ("Hash_slave_rows::make_entry - malloc error"));
945 if (entry) my_free(entry);
946 if (preamble) {
947 preamble->~HASH_ROW_PREAMBLE();
948 my_free(preamble);
949 }
950 if (pos) my_free(pos);
951 return nullptr;
952 }
953
put(TABLE * table,MY_BITMAP * cols,HASH_ROW_ENTRY * entry)954 bool Hash_slave_rows::put(TABLE *table, MY_BITMAP *cols,
955 HASH_ROW_ENTRY *entry) {
956 DBUG_TRACE;
957
958 HASH_ROW_PREAMBLE *preamble = entry->preamble;
959
960 /**
961 Skip blobs and BIT fields from key calculation.
962 Handle X bits.
963 Handle nulled fields.
964 Handled fields not signaled.
965 */
966 preamble->hash_value = make_hash_key(table, cols);
967
968 m_hash.emplace(preamble->hash_value,
969 unique_ptr<HASH_ROW_ENTRY, hash_slave_rows_free_entry>(entry));
970 DBUG_PRINT("debug",
971 ("Added record to hash with key=%u", preamble->hash_value));
972 return false;
973 }
974
get(TABLE * table,MY_BITMAP * cols)975 HASH_ROW_ENTRY *Hash_slave_rows::get(TABLE *table, MY_BITMAP *cols) {
976 DBUG_TRACE;
977 uint key;
978 HASH_ROW_ENTRY *entry = nullptr;
979
980 key = make_hash_key(table, cols);
981
982 DBUG_PRINT("debug", ("Looking for record with key=%u in the hash.", key));
983
984 const auto it = m_hash.find(key);
985 if (it != m_hash.end()) {
986 DBUG_PRINT("debug", ("Found record with key=%u in the hash.", key));
987
988 /**
989 Save the search state in case we need to go through entries for
990 the given key.
991 */
992 entry = it->second.get();
993 entry->preamble->search_state = it;
994 entry->preamble->is_search_state_inited = true;
995 }
996
997 return entry;
998 }
999
next(HASH_ROW_ENTRY ** entry)1000 bool Hash_slave_rows::next(HASH_ROW_ENTRY **entry) {
1001 DBUG_TRACE;
1002 DBUG_ASSERT(*entry);
1003
1004 if (*entry == nullptr) return true;
1005
1006 HASH_ROW_PREAMBLE *preamble = (*entry)->preamble;
1007
1008 if (!preamble->is_search_state_inited) return true;
1009
1010 uint key = preamble->hash_value;
1011 const auto it = std::next(preamble->search_state);
1012
1013 /*
1014 Invalidate search for current preamble, because it is going to be
1015 used in the search below (and search state is used in a
1016 one-time-only basis).
1017 */
1018 preamble->search_state = m_hash.end();
1019 preamble->is_search_state_inited = false;
1020
1021 DBUG_PRINT("debug",
1022 ("Looking for record with key=%u in the hash (next).", key));
1023
1024 if (it != m_hash.end() && it->first == key) {
1025 DBUG_PRINT("debug", ("Found record with key=%u in the hash (next).", key));
1026 *entry = it->second.get();
1027 preamble = (*entry)->preamble;
1028
1029 /**
1030 Save the search state for next iteration (if any).
1031 */
1032 preamble->search_state = it;
1033 preamble->is_search_state_inited = true;
1034 } else {
1035 *entry = nullptr;
1036 }
1037
1038 return false;
1039 }
1040
del(HASH_ROW_ENTRY * entry)1041 bool Hash_slave_rows::del(HASH_ROW_ENTRY *entry) {
1042 DBUG_TRACE;
1043 DBUG_ASSERT(entry);
1044
1045 erase_specific_element(&m_hash, entry->preamble->hash_value, entry);
1046 return false;
1047 }
1048
make_hash_key(TABLE * table,MY_BITMAP * cols)1049 uint Hash_slave_rows::make_hash_key(TABLE *table, MY_BITMAP *cols) {
1050 DBUG_TRACE;
1051 ha_checksum crc = 0L;
1052
1053 uchar *record = table->record[0];
1054 uchar saved_x = 0, saved_filler = 0;
1055
1056 if (table->s->null_bytes > 0) {
1057 /*
1058 If we have an X bit then we need to take care of it.
1059 */
1060 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD)) {
1061 saved_x = record[0];
1062 record[0] |= 1U;
1063 }
1064
1065 /*
1066 If (last_null_bit_pos == 0 && null_bytes > 1), then:
1067 X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
1068 Ie, the entire byte is used.
1069 */
1070 if (table->s->last_null_bit_pos > 0) {
1071 saved_filler = record[table->s->null_bytes - 1];
1072 record[table->s->null_bytes - 1] |=
1073 256U - (1U << table->s->last_null_bit_pos);
1074 }
1075 }
1076
1077 /*
1078 We can only checksum the bytes if all fields have been signaled
1079 in the before image. Otherwise, unpack_row will not have set the
1080 null_flags correctly (because it only unpacks those fields and
1081 their flags that were actually in the before image).
1082
1083 @c record_compare, as it also skips null_flags if the read_set
1084 was not marked completely.
1085 */
1086 if (bitmap_is_set_all(cols) && cols->n_bits == table->s->fields) {
1087 crc = checksum_crc32(crc, table->null_flags, table->s->null_bytes);
1088 DBUG_PRINT("debug", ("make_hash_entry: hash after null_flags: %u", crc));
1089 }
1090
1091 for (Field **ptr = table->field;
1092 *ptr && ((*ptr)->field_index() < cols->n_bits); ptr++) {
1093 Field *f = (*ptr);
1094
1095 /*
1096 Field is set in the read_set and is isn't NULL.
1097 */
1098 if (bitmap_is_set(cols, f->field_index()) &&
1099 !f->is_virtual_gcol() && // Avoid virtual generated columns on hashes
1100 !f->is_null()) {
1101 /*
1102 BLOB and VARCHAR have pointers in their field, we must convert
1103 to string; GEOMETRY and JSON are implemented on top of BLOB.
1104 BIT may store its data among NULL bits, convert as well.
1105 */
1106 switch (f->type()) {
1107 case MYSQL_TYPE_BLOB:
1108 case MYSQL_TYPE_VARCHAR:
1109 case MYSQL_TYPE_GEOMETRY:
1110 case MYSQL_TYPE_JSON:
1111 case MYSQL_TYPE_BIT: {
1112 String tmp;
1113 f->val_str(&tmp);
1114 crc = checksum_crc32(crc, pointer_cast<const uchar *>(tmp.ptr()),
1115 tmp.length());
1116 break;
1117 }
1118 default:
1119 crc = checksum_crc32(crc, f->field_ptr(), f->data_length());
1120 break;
1121 }
1122 #ifndef DBUG_OFF
1123 String tmp;
1124 f->val_str(&tmp);
1125 DBUG_PRINT("debug", ("make_hash_entry: hash after field %s=%s: %u",
1126 f->field_name, tmp.c_ptr_safe(), crc));
1127 #endif
1128 }
1129 }
1130
1131 /*
1132 Restore the saved bytes.
1133
1134 TODO[record format ndb]: Remove this code once NDB returns the
1135 correct record format.
1136 */
1137 if (table->s->null_bytes > 0) {
1138 if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
1139 record[0] = saved_x;
1140
1141 if (table->s->last_null_bit_pos)
1142 record[table->s->null_bytes - 1] = saved_filler;
1143 }
1144
1145 DBUG_PRINT("debug", ("Created key=%u", crc));
1146 return crc;
1147 }
1148
1149 #endif
1150
1151 #if defined(MYSQL_SERVER)
1152
Deferred_log_events()1153 Deferred_log_events::Deferred_log_events()
1154 : m_array(key_memory_table_def_memory) {}
1155
~Deferred_log_events()1156 Deferred_log_events::~Deferred_log_events() { m_array.clear(); }
1157
add(Log_event * ev)1158 int Deferred_log_events::add(Log_event *ev) {
1159 m_array.push_back(ev);
1160 ev->worker = nullptr; // to mark event busy avoiding deletion
1161 return 0;
1162 }
1163
is_empty()1164 bool Deferred_log_events::is_empty() { return m_array.empty(); }
1165
execute(Relay_log_info * rli)1166 bool Deferred_log_events::execute(Relay_log_info *rli) {
1167 bool res = false;
1168
1169 DBUG_ASSERT(rli->deferred_events_collecting);
1170
1171 rli->deferred_events_collecting = false;
1172 for (Log_event **it = m_array.begin(); !res && it != m_array.end(); ++it) {
1173 Log_event *ev = *it;
1174 res = ev->apply_event(rli);
1175 }
1176 rli->deferred_events_collecting = true;
1177 return res;
1178 }
1179
rewind()1180 void Deferred_log_events::rewind() {
1181 /*
1182 Reset preceding Query log event events which execution was
1183 deferred because of slave side filtering.
1184 */
1185 delete_container_pointers(m_array);
1186 m_array.shrink_to_fit();
1187 }
1188
replace_all_in_str(std::string from,std::string find,std::string replace)1189 std::string replace_all_in_str(std::string from, std::string find,
1190 std::string replace) {
1191 std::string to{from.data()};
1192 if (to.length() == 0) {
1193 return to;
1194 }
1195
1196 size_t start{0};
1197 while ((start = to.find(find, start)) != std::string::npos) {
1198 to.replace(start, find.size(), replace);
1199 start += replace.length();
1200 }
1201
1202 return to;
1203 }
1204
1205 #endif
1206
1207 #ifdef MYSQL_SERVER
THD_instance_guard(THD * thd)1208 THD_instance_guard::THD_instance_guard(THD *thd)
1209 : m_is_locally_initialized{thd == nullptr} {
1210 if (this->m_is_locally_initialized) {
1211 this->m_target = new THD;
1212 this->m_target->thread_stack = (char *)&this->m_target;
1213 this->m_target->store_globals();
1214 this->m_target->security_context()->skip_grants();
1215 } else {
1216 this->m_target = thd;
1217 }
1218 }
1219
~THD_instance_guard()1220 THD_instance_guard::~THD_instance_guard() {
1221 if (this->m_is_locally_initialized) {
1222 delete this->m_target;
1223 }
1224 }
1225
operator THD*()1226 THD_instance_guard::operator THD *() { return this->m_target; }
1227
evaluate_command_row_only_restrictions(THD * thd)1228 bool evaluate_command_row_only_restrictions(THD *thd) {
1229 LEX *const lex = thd->lex;
1230
1231 switch (lex->sql_command) {
1232 case SQLCOM_UPDATE:
1233 case SQLCOM_INSERT:
1234 case SQLCOM_INSERT_SELECT:
1235 case SQLCOM_DELETE:
1236 case SQLCOM_LOAD:
1237 case SQLCOM_REPLACE:
1238 case SQLCOM_REPLACE_SELECT:
1239 case SQLCOM_DELETE_MULTI:
1240 case SQLCOM_UPDATE_MULTI: {
1241 return true;
1242 }
1243 case SQLCOM_CREATE_TABLE: {
1244 return (lex->create_info->options & HA_LEX_CREATE_TMP_TABLE);
1245 }
1246 case SQLCOM_DROP_TABLE: {
1247 return (lex->drop_temporary);
1248 }
1249 default:
1250 break;
1251 }
1252
1253 return false;
1254 }
1255
1256 #endif // MYSQL_SERVER
1257