1 /* Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/rpl_utility.h"
24 
25 #include <string.h>
26 #include <iterator>
27 #include <new>
28 #include <utility>
29 
30 #include "lex_string.h"
31 #include "libbinlogevents/export/binary_log_funcs.h"
32 #include "my_byteorder.h"
33 #include "my_dbug.h"
34 #include "my_loglevel.h"
35 #include "my_sys.h"
36 #include "mysql/components/services/log_builtins.h"
37 #include "mysql/service_mysql_alloc.h"
38 #include "sql/thr_malloc.h"
39 
40 struct TYPELIB;
41 
42 #ifdef MYSQL_SERVER
43 
44 #include <algorithm>
45 
46 #include "libbinlogevents/include/binlog_event.h"  // checksum_crv32
47 #include "m_ctype.h"
48 #include "m_string.h"
49 #include "my_base.h"
50 #include "my_bitmap.h"
51 #include "mysql/components/services/log_builtins.h"
52 #include "mysql/psi/psi_memory.h"
53 #include "mysqld_error.h"
54 #include "sql/create_field.h"
55 #include "sql/dd/dd.h"          // get_dictionary
56 #include "sql/dd/dictionary.h"  // is_dd_table_access_allowed
57 #include "sql/derror.h"         // ER_THD
58 #include "sql/field.h"          // Field
59 #include "sql/log.h"
60 #include "sql/log_event.h"  // Log_event
61 #include "sql/my_decimal.h"
62 #include "sql/mysqld.h"  // slave_type_conversions_options
63 #include "sql/psi_memory_key.h"
64 #include "sql/rpl_rli.h"  // Relay_log_info
65 #include "sql/rpl_slave.h"
66 #include "sql/sql_class.h"  // THD
67 #include "sql/sql_const.h"
68 #include "sql/sql_lex.h"  // LEX
69 #include "sql/sql_list.h"
70 #include "sql/sql_plugin_ref.h"
71 #include "sql/sql_tmp_table.h"  // create_tmp_table_from_fields
72 #include "sql_show.h"           // show_sql_type
73 #include "sql_string.h"
74 #include "template_utils.h"  // delete_container_pointers
75 #include "typelib.h"
76 
77 using binary_log::checksum_crc32;
78 using std::max;
79 using std::min;
80 using std::unique_ptr;
81 
82 #endif  // MYSQL_SERVER
83 
84 /*********************************************************************
85  *                   table_def member definitions                    *
86  *********************************************************************/
87 
88 /*
89   This function returns the field size in raw bytes based on the type
90   and the encoded field data from the master's raw data.
91 */
calc_field_size(uint col,const uchar * master_data) const92 uint32 table_def::calc_field_size(uint col, const uchar *master_data) const {
93   uint32 length =
94       ::calc_field_size(type(col), master_data, m_field_metadata[col]);
95   return length;
96 }
97 
98 #if defined(MYSQL_SERVER)
99 /**
100    Function to compare two size_t integers for their relative
101    order. Used below.
102  */
compare(size_t a,size_t b)103 static int compare(size_t a, size_t b) {
104   if (a < b) return -1;
105   if (b < a) return 1;
106   return 0;
107 }
108 
109 /*
110   Compare the pack lengths of a source field (on the master) and a
111   target field (on the slave).
112 
113   @param field    Target field.
114   @param type     Source field type.
115   @param metadata Source field metadata.
116 
117   @retval -1 The length of the source field is smaller than the target field.
118   @retval  0 The length of the source and target fields are the same.
119   @retval  1 The length of the source field is greater than the target field.
120  */
compare_lengths(Field * field,enum_field_types source_type,uint16 metadata)121 static int compare_lengths(Field *field, enum_field_types source_type,
122                            uint16 metadata) {
123   DBUG_TRACE;
124   size_t const source_length =
125       max_display_length_for_field(source_type, metadata);
126   size_t const target_length = field->max_display_length();
127   DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
128                        " target_length: %lu, target_type: %u",
129                        (unsigned long)source_length, source_type,
130                        (unsigned long)target_length, field->real_type()));
131   int result = compare(source_length, target_length);
132   DBUG_PRINT("result", ("%d", result));
133   return result;
134 }
135 
136 /**
137    Check the order variable and print errors if the order is not
138    acceptable according to the current settings.
139 
140    @param order  The computed order of the conversion needed.
141  */
is_conversion_ok(int order)142 static bool is_conversion_ok(int order) {
143   DBUG_TRACE;
144   bool allow_non_lossy, allow_lossy;
145 
146   allow_non_lossy = slave_type_conversions_options &
147                     (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
148   allow_lossy = slave_type_conversions_options &
149                 (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
150 
151   DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
152                        allow_non_lossy ? " ALL_NON_LOSSY" : "",
153                        allow_lossy ? " ALL_LOSSY" : ""));
154   if (order < 0 && !allow_non_lossy) {
155     /* !!! Add error message saying that non-lossy conversions need to be
156      * allowed. */
157     return false;
158   }
159 
160   if (order > 0 && !allow_lossy) {
161     /* !!! Add error message saying that lossy conversions need to be allowed.
162      */
163     return false;
164   }
165 
166   return true;
167 }
168 
169 /**
170   Check if the types are criss cross means type1 is MYSQL_TYPE_TIMESTAMP
171   and type2 as MYSQL_TYPE_TIMESTAMP2 or vice versa.
172 */
timestamp_cross_check(enum_field_types type1,enum_field_types type2)173 inline bool timestamp_cross_check(enum_field_types type1,
174                                   enum_field_types type2) {
175   return ((type1 == MYSQL_TYPE_TIMESTAMP && type2 == MYSQL_TYPE_TIMESTAMP2) ||
176           (type1 == MYSQL_TYPE_TIMESTAMP2 && type2 == MYSQL_TYPE_TIMESTAMP));
177 }
178 
179 /**
180   Check if the types are criss cross means type1 is MYSQL_TYPE_DATETIME
181   and type2 as MYSQL_TYPE_DATETIME or vice versa.
182 */
datetime_cross_check(enum_field_types type1,enum_field_types type2)183 inline bool datetime_cross_check(enum_field_types type1,
184                                  enum_field_types type2) {
185   return ((type1 == MYSQL_TYPE_DATETIME && type2 == MYSQL_TYPE_DATETIME2) ||
186           (type1 == MYSQL_TYPE_DATETIME2 && type2 == MYSQL_TYPE_DATETIME));
187 }
188 
189 /**
190   Check if the types are criss cross means type1 is MYSQL_TYPE_TIME
191   and type2 as MYSQL_TYPE_TIME2 or vice versa.
192 */
time_cross_check(enum_field_types type1,enum_field_types type2)193 inline bool time_cross_check(enum_field_types type1, enum_field_types type2) {
194   return ((type1 == MYSQL_TYPE_TIME && type2 == MYSQL_TYPE_TIME2) ||
195           (type1 == MYSQL_TYPE_TIME2 && type2 == MYSQL_TYPE_TIME));
196 }
197 
198 /**
199    Can a type potentially be converted to another type?
200 
201    This function check if the types are convertible and what
202    conversion is required.
203 
204    If conversion is not possible, and error is printed.
205 
206    If conversion is possible:
207 
208    - *order will be set to -1 if source type is smaller than target
209      type and a non-lossy conversion can be required. This includes
210      the case where the field types are different but types could
211      actually be converted in either direction.
212 
213    - *order will be set to 0 if no conversion is required.
214 
215    - *order will be set to 1 if the source type is strictly larger
216       than the target type and that conversion is potentially lossy.
217 
218    @param[in] field    Target field
219    @param[in] source_type Source field type
220    @param[in] metadata Source field metadata
221    @param[in] is_array Whether the source field is a typed array
222    @param[in] rli      Relay log info (for error reporting)
223    @param[in] mflags   Flags from the table map event
224    @param[out] order_var Order between source field and target field
225 
226    @return @c true if conversion is possible according to the current
227    settings, @c false if conversion is not possible according to the
228    current setting.
229  */
can_convert_field_to(Field * field,enum_field_types source_type,uint metadata,bool is_array,Relay_log_info * rli,uint16 mflags,int * order_var)230 static bool can_convert_field_to(Field *field, enum_field_types source_type,
231                                  uint metadata, bool is_array,
232                                  Relay_log_info *rli, uint16 mflags,
233                                  int *order_var) {
234   DBUG_TRACE;
235 #ifndef DBUG_OFF
236   char field_type_buf[MAX_FIELD_WIDTH];
237   String field_type(field_type_buf, sizeof(field_type_buf), &my_charset_latin1);
238   field->sql_type(field_type);
239   DBUG_PRINT("enter", ("field_type: %s, target_type: %d, source_type: %d, "
240                        "source_metadata: 0x%x",
241                        field_type.c_ptr_safe(), field->real_type(), source_type,
242                        metadata));
243 #endif
244   // Can't convert from scalar to array and vice versa
245   if (is_array != field->is_array()) return false;
246 
247   /*
248     If the real type is the same, we need to check the metadata to
249     decide if conversions are allowed.
250    */
251   if (field->real_type() == source_type) {
252     if (metadata == 0)  // Metadata can only be zero if no metadata was provided
253     {
254       /*
255         If there is no metadata, we either have an old event where no
256         metadata were supplied, or a type that does not require any
257         metadata. In either case, conversion can be done but no
258         conversion table is necessary.
259        */
260       DBUG_PRINT("debug",
261                  ("Base types are identical, but there is no metadata"));
262       *order_var = 0;
263       return true;
264     }
265 
266     DBUG_PRINT("debug",
267                ("Base types are identical, doing field size comparison"));
268     if (field->compatible_field_size(metadata, rli, mflags, order_var))
269       return is_conversion_ok(*order_var);
270     else
271       return false;
272   } else if (is_array) {
273     // Can't covert between typed array of different types
274     return false;
275   } else if (metadata == 0 &&
276              (timestamp_cross_check(field->real_type(), source_type) ||
277               datetime_cross_check(field->real_type(), source_type) ||
278               time_cross_check(field->real_type(), source_type))) {
279     /*
280       In the above condition, we are taking care
281       of case where
282       1) Master having old TIME, TIMESTAMP, DATETIME
283       and slave have new TIME2, TIMESTAMP2, DATETIME2
284       or
285       2) Master having new TIMESTAMP2, DATETIME2, TIME2
286       with fraction part zero and slave have TIME,
287       TIMESTAMP, DATETIME.
288       We need second condition, as when we are
289       upgrading from 5.5 to 5.6 TIME, TIMESTAMP,
290       DATETIME columns are not upgraded to TIME(0),
291       TIMESTAMP(0), DATETIME(0).
292       So to support these conversion we are putting this
293       condition.
294     */
295     /*
296       TS-TODO: conversion from FSP1>FSP2.
297       Can do non-lossy conversion
298       from old TIME, TIMESTAMP, DATETIME
299       to new TIME(0), TIMESTAMP(0), DATETIME(0).
300     */
301     *order_var = -1;
302     return true;
303   } else if (!slave_type_conversions_options)
304     return false;
305 
306   /*
307     Here, from and to will always be different. Since the types are
308     different, we cannot use the compatible_field_size() function, but
309     have to rely on hard-coded max-sizes for fields.
310   */
311 
312   DBUG_PRINT("debug", ("Base types are different, checking conversion"));
313   switch (source_type)  // Source type (on master)
314   {
315     case MYSQL_TYPE_DECIMAL:
316     case MYSQL_TYPE_NEWDECIMAL:
317     case MYSQL_TYPE_FLOAT:
318     case MYSQL_TYPE_DOUBLE:
319       switch (field->real_type()) {
320         case MYSQL_TYPE_NEWDECIMAL:
321           /*
322             Then the other type is either FLOAT, DOUBLE, or old style
323             DECIMAL, so we require lossy conversion.
324           */
325           *order_var = 1;
326           return is_conversion_ok(*order_var);
327 
328         case MYSQL_TYPE_DECIMAL:
329         case MYSQL_TYPE_FLOAT:
330         case MYSQL_TYPE_DOUBLE: {
331           if (source_type == MYSQL_TYPE_NEWDECIMAL ||
332               source_type == MYSQL_TYPE_DECIMAL)
333             *order_var = 1;  // Always require lossy conversions
334           else
335             *order_var = compare_lengths(field, source_type, metadata);
336           DBUG_ASSERT(*order_var != 0);
337           return is_conversion_ok(*order_var);
338         }
339 
340         default:
341           return false;
342       }
343       break;
344 
345     /*
346       The length comparison check will do the correct job of comparing
347       the field lengths (in bytes) of two integer types.
348     */
349     case MYSQL_TYPE_TINY:
350     case MYSQL_TYPE_SHORT:
351     case MYSQL_TYPE_INT24:
352     case MYSQL_TYPE_LONG:
353     case MYSQL_TYPE_LONGLONG:
354       switch (field->real_type()) {
355         case MYSQL_TYPE_TINY:
356         case MYSQL_TYPE_SHORT:
357         case MYSQL_TYPE_INT24:
358         case MYSQL_TYPE_LONG:
359         case MYSQL_TYPE_LONGLONG:
360           *order_var = compare_lengths(field, source_type, metadata);
361           DBUG_ASSERT(*order_var != 0);
362           return is_conversion_ok(*order_var);
363 
364         default:
365           return false;
366       }
367       break;
368 
369     /*
370       Since source and target type is different, and it is not possible
371       to convert bit types to anything else, this will return false.
372      */
373     case MYSQL_TYPE_BIT:
374       return false;
375 
376     /*
377       If all conversions are disabled, it is not allowed to convert
378       between these types. Since the TEXT vs. BINARY is distinguished by
379       the charset, and the charset is not replicated, we cannot
380       currently distinguish between , e.g., TEXT and BLOB.
381      */
382     case MYSQL_TYPE_TINY_BLOB:
383     case MYSQL_TYPE_MEDIUM_BLOB:
384     case MYSQL_TYPE_LONG_BLOB:
385     case MYSQL_TYPE_BLOB:
386     case MYSQL_TYPE_STRING:
387     case MYSQL_TYPE_VAR_STRING:
388     case MYSQL_TYPE_VARCHAR:
389       switch (field->real_type()) {
390         case MYSQL_TYPE_TINY_BLOB:
391         case MYSQL_TYPE_MEDIUM_BLOB:
392         case MYSQL_TYPE_LONG_BLOB:
393         case MYSQL_TYPE_BLOB:
394         case MYSQL_TYPE_STRING:
395         case MYSQL_TYPE_VAR_STRING:
396         case MYSQL_TYPE_VARCHAR:
397           *order_var = compare_lengths(field, source_type, metadata);
398           /*
399             Here we know that the types are different, so if the order
400             gives that they do not require any conversion, we still need
401             to have non-lossy conversion enabled to allow conversion
402             between different (string) types of the same length.
403            */
404           if (*order_var == 0) *order_var = -1;
405           return is_conversion_ok(*order_var);
406 
407         default:
408           return false;
409       }
410       break;
411 
412     case MYSQL_TYPE_GEOMETRY:
413     case MYSQL_TYPE_JSON:
414     case MYSQL_TYPE_TIMESTAMP:
415     case MYSQL_TYPE_DATE:
416     case MYSQL_TYPE_TIME:
417     case MYSQL_TYPE_DATETIME:
418     case MYSQL_TYPE_YEAR:
419     case MYSQL_TYPE_NEWDATE:
420     case MYSQL_TYPE_NULL:
421     case MYSQL_TYPE_ENUM:
422     case MYSQL_TYPE_SET:
423     case MYSQL_TYPE_TIMESTAMP2:
424     case MYSQL_TYPE_DATETIME2:
425     case MYSQL_TYPE_TIME2:
426     case MYSQL_TYPE_TYPED_ARRAY:
427       return false;
428   }
429   return false;  // To keep GCC happy
430 }
431 
432 /**
433   Is the definition compatible with a table when it does not belong to
434   the data dictionary?
435 
436   This function first finds out whether the table belongs to the data
437   dictionary. When not, it will compare the master table with an existing
438   table on the slave and see if they are compatible with respect to the
439   current settings of @c SLAVE_TYPE_CONVERSIONS.
440 
441   If the tables are compatible and conversions are required, @c
442   *tmp_table_var will be set to a virtual temporary table with field
443   pointers for the fields that require conversions.  This allow simple
444   checking of whether a conversion are to be applied or not.
445 
446   If tables are compatible, but no conversions are necessary, @c
447   *tmp_table_var will be set to NULL.
448 
449   @param [in] rli Relay log info, for error reporting.
450 
451   @param [in] table Table to compare with
452 
453   @param [out] conv_table_var Virtual temporary table for performing
454   conversions, if necessary.
455 
456   @retval true Master table is compatible with slave table.
457   @retval false When the table belongs to the data dictionary or
458                 master table is not compatible with slave table.
459 */
compatible_with(THD * thd,Relay_log_info * rli,TABLE * table,TABLE ** conv_table_var) const460 bool table_def::compatible_with(THD *thd, Relay_log_info *rli, TABLE *table,
461                                 TABLE **conv_table_var) const {
462   /*
463     Prohibit replication into dictionary internal tables. We know this is
464     not DDL (which will be replicated as statements, and rejected by the
465     corresponding check for SQL statements), thus 'false' in the call below.
466     Also sserting that this is not a DD system thread.
467   */
468   DBUG_ASSERT(!thd->is_dd_system_thread());
469   const dd::Dictionary *dictionary = dd::get_dictionary();
470   if (dictionary && !dictionary->is_dd_table_access_allowed(
471                         false, false, table->s->db.str, table->s->db.length,
472                         table->s->table_name.str)) {
473     DBUG_PRINT("debug", ("Access to dictionary table %s.%s is prohibited",
474                          table->s->db.str, table->s->table_name.str));
475     rli->report(
476         ERROR_LEVEL, ER_SERVER_NO_SYSTEM_TABLE_ACCESS,
477         ER_THD(thd, ER_SERVER_NO_SYSTEM_TABLE_ACCESS),
478         ER_THD_NONCONST(thd, dictionary->table_type_error_code(
479                                  table->s->db.str, table->s->table_name.str)),
480         table->s->db.str, table->s->table_name.str);
481     return false;
482   }
483 
484   /*
485     We only check the initial columns for the tables.
486   */
487   Replicated_columns_view fields{table, Replicated_columns_view::INBOUND, thd};
488   uint const cols_to_check = min<ulong>(fields.filtered_size(), size());
489   TABLE *tmp_table = nullptr;
490 
491   for (auto it = fields.begin(); it.filtered_pos() < cols_to_check; ++it) {
492     Field *const field = *it;
493     size_t col = it.filtered_pos();
494     int order;
495     if (can_convert_field_to(field, type(col), field_metadata(col),
496                              is_array(col), rli, m_flags, &order)) {
497       DBUG_PRINT("debug", ("Checking column %lu -"
498                            " field '%s' can be converted - order: %d",
499                            static_cast<long unsigned int>(col),
500                            field->field_name, order));
501       DBUG_ASSERT(order >= -1 && order <= 1);
502 
503       /*
504         If order is not 0, a conversion is required, so we need to set
505         up the conversion table.
506        */
507       if (order != 0 && tmp_table == nullptr) {
508         /*
509           This will create the full table with all fields. This is
510           necessary to ge the correct field lengths for the record.
511         */
512         tmp_table = create_conversion_table(thd, rli, table);
513         if (tmp_table == nullptr) return false;
514         /*
515           Clear all fields up to, but not including, this column.
516         */
517         for (unsigned int i = 0; i < col; ++i) tmp_table->field[i] = nullptr;
518       }
519 
520       if (order == 0 && tmp_table != nullptr) tmp_table->field[col] = nullptr;
521     } else {
522       DBUG_PRINT("debug",
523                  ("Checking column %lu -"
524                   " field '%s' can not be converted",
525                   static_cast<long unsigned int>(col), field->field_name));
526       DBUG_ASSERT(col < size() && col < table->s->fields);
527       DBUG_ASSERT(table->s->db.str && table->s->table_name.str);
528       const char *db_name = table->s->db.str;
529       const char *tbl_name = table->s->table_name.str;
530       char source_buf[MAX_FIELD_WIDTH];
531       char target_buf[MAX_FIELD_WIDTH];
532       String field_sql_type;
533       enum loglevel report_level = INFORMATION_LEVEL;
534       String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
535       String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
536       show_sql_type(type(col), is_array(col), field_metadata(col),
537                     &source_type);
538       field->sql_type(target_type);
539       if (!ignored_error_code(ER_SERVER_SLAVE_CONVERSION_FAILED)) {
540         report_level = ERROR_LEVEL;
541         thd->is_slave_error = true;
542       } else if (log_error_verbosity >= 2)
543         report_level = WARNING_LEVEL;
544 
545       if (field->has_charset() && (field->type() == MYSQL_TYPE_VARCHAR ||
546                                    field->type() == MYSQL_TYPE_STRING)) {
547         field_sql_type.append((field->type() == MYSQL_TYPE_VARCHAR) ? "varchar"
548                                                                     : "char");
549         const CHARSET_INFO *cs = field->charset();
550         size_t length = cs->cset->snprintf(
551             cs, target_type.ptr(), target_type.alloced_length(),
552             "%s(%u(bytes) %s)", field_sql_type.c_ptr_safe(),
553             field->field_length, field->charset()->csname);
554         target_type.length(length);
555       } else
556         field->sql_type(target_type);
557 
558       if (report_level != INFORMATION_LEVEL)
559         rli->report(report_level, ER_SERVER_SLAVE_CONVERSION_FAILED,
560                     ER_THD(thd, ER_SERVER_SLAVE_CONVERSION_FAILED), col,
561                     db_name, tbl_name, source_type.c_ptr_safe(),
562                     target_type.c_ptr_safe());
563       return false;
564     }
565   }
566 
567 #ifndef DBUG_OFF
568   if (tmp_table) {
569     for (unsigned int col = 0; col < tmp_table->s->fields; ++col)
570       if (tmp_table->field[col]) {
571         char source_buf[MAX_FIELD_WIDTH];
572         char target_buf[MAX_FIELD_WIDTH];
573         String source_type(source_buf, sizeof(source_buf), &my_charset_latin1);
574         String target_type(target_buf, sizeof(target_buf), &my_charset_latin1);
575         tmp_table->field[col]->sql_type(source_type);
576         table->field[col]->sql_type(target_type);
577         DBUG_PRINT("debug",
578                    ("Field %s - conversion required."
579                     " Source type: '%s', Target type: '%s'",
580                     tmp_table->field[col]->field_name, source_type.c_ptr_safe(),
581                     target_type.c_ptr_safe()));
582       }
583   }
584 #endif
585 
586   *conv_table_var = tmp_table;
587   return true;
588 }
589 
590 /**
591   Create a conversion table.
592 
593   If the function is unable to create the conversion table, an error
594   will be printed and NULL will be returned.
595 
596   @return Pointer to conversion table, or NULL if unable to create
597   conversion table.
598  */
599 
create_conversion_table(THD * thd,Relay_log_info * rli,TABLE * target_table) const600 TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli,
601                                           TABLE *target_table) const {
602   DBUG_TRACE;
603 
604   List<Create_field> field_list;
605   TABLE *conv_table = nullptr;
606   /*
607     At slave, columns may differ. So we should create
608     min(columns@master, columns@slave) columns in the
609     conversion table.
610   */
611   uint const cols_to_create = min<ulong>(target_table->s->fields, size());
612 
613   // Default value : treat all values signed
614   bool unsigned_flag = false;
615 
616   // Check if slave_type_conversions contains ALL_UNSIGNED
617   unsigned_flag = slave_type_conversions_options &
618                   (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_UNSIGNED);
619 
620   // Check if slave_type_conversions contains ALL_SIGNED
621   unsigned_flag =
622       unsigned_flag && !(slave_type_conversions_options &
623                          (1ULL << SLAVE_TYPE_CONVERSIONS_ALL_SIGNED));
624 
625   for (uint col = 0; col < cols_to_create; ++col) {
626     Create_field *field_def = new (thd->mem_root) Create_field();
627     if (field_list.push_back(field_def)) return nullptr;
628 
629     uint decimals = 0;
630     TYPELIB *interval = nullptr;
631     uint pack_length_override = 0;  // 0 => NA. Only assigned below when needed.
632     enum_field_types field_type = type(col);
633     uint32 max_length =
634         max_display_length_for_field(field_type, field_metadata(col));
635 
636     switch (field_type) {
637       uint precision;
638       case MYSQL_TYPE_ENUM:
639       case MYSQL_TYPE_SET:
640         interval = static_cast<Field_enum *>(target_table->field[col])->typelib;
641         /*
642           Number of elements in interval on master and slave might differ.
643           Use pack length from binary log instead of one calculated from
644           number of interval elements on slave.
645         */
646         pack_length_override = field_metadata(col) & 0x00ff;
647         break;
648 
649       case MYSQL_TYPE_NEWDECIMAL:
650         /*
651           The display length of a DECIMAL type is not the same as the
652           length that should be supplied to make_field, so we correct
653           the length here.
654          */
655         precision = field_metadata(col) >> 8;
656         decimals = field_metadata(col) & 0x00ff;
657         max_length = my_decimal_precision_to_length(precision, decimals, false);
658         break;
659 
660       case MYSQL_TYPE_DECIMAL:
661         LogErr(ERROR_LEVEL, ER_RPL_INCOMPATIBLE_DECIMAL_IN_RBR,
662                target_table->s->db.str, target_table->s->table_name.str,
663                target_table->field[col]->field_name);
664         goto err;
665 
666       case MYSQL_TYPE_BLOB:
667         /*
668           Blobs are binlogged as MYSQL_TYPE_BLOB, even when pack_length
669           != 2. Need the exact blob type for the call to
670           Create_field::init_for_tmp_table() below. Note that
671           pack_length is NOT assigned to pack_length_override here, as
672           this should only be used when the pack_length cannot be
673           derived from the exact type, i.e. for ENUM and SET (see
674           above).
675         */
676         field_type = blob_type_from_pack_length(field_metadata(col) & 0x00ff);
677         break;
678 
679       default:
680         break;
681     }
682 
683     DBUG_PRINT(
684         "debug",
685         ("sql_type: %d, target_field: '%s', max_length: %d, decimals: %d,"
686          " maybe_null: %d, unsigned_flag: %d",
687          binlog_type(col), target_table->field[col]->field_name, max_length,
688          decimals, true, unsigned_flag));
689     field_def->init_for_tmp_table(field_type, max_length, decimals,
690                                   true,           // maybe_null
691                                   unsigned_flag,  // unsigned_flag
692                                   pack_length_override);
693     field_def->charset = target_table->field[col]->charset();
694     field_def->interval = interval;
695   }
696 
697   conv_table = DBUG_EVALUATE_IF(
698       "simulate_out_of_memory_while_creating_temp_table_for_conversion",
699       nullptr, create_tmp_table_from_fields(thd, field_list));
700 err:
701   if (conv_table == nullptr) {
702     enum loglevel report_level = INFORMATION_LEVEL;
703     if (!ignored_error_code(ER_SLAVE_CANT_CREATE_CONVERSION)) {
704       report_level = ERROR_LEVEL;
705       thd->is_slave_error = true;
706     } else if (log_error_verbosity >= 2)
707       report_level = WARNING_LEVEL;
708 
709     if (report_level != INFORMATION_LEVEL)
710       rli->report(report_level, ER_SLAVE_CANT_CREATE_CONVERSION,
711                   ER_THD(thd, ER_SLAVE_CANT_CREATE_CONVERSION),
712                   target_table->s->db.str, target_table->s->table_name.str);
713   }
714   return conv_table;
715 }
716 
717 #endif /* MYSQL_SERVER */
718 
719 /**
720   Decode field metadata from a char buffer (serialized form) into an int
721   (packed form).
722 
723   @note On little-endian platforms (e.g Intel) this function effectively
724   inverts order of bytes compared to what Field::save_field_metadata()
725   writes. E.g for MYSQL_TYPE_NEWDECIMAL save_field_metadata writes precision
726   into the first byte and decimals into the second, this function puts
727   precision into the second byte and decimals into the first. This layout
728   is expected by replication code that reads metadata in the uint form.
729   Due to this design feature show_sql_type() can't correctly print
730   immediate output of save_field_metadata(), this function have to be used
731   as translator.
732 
733   @param buffer Field metadata, in the character stream form produced by
734                 save_field_metadata.
735   @param binlog_type The type of the field, in the form returned by
736                       Field::binlog_type and stored in Table_map_log_event.
737   @retval pair where:
738   - the first component is the length of the metadata within 'buffer',
739     i.e., how much the buffer pointer should move forward in order to skip it.
740   - the second component is pair containing:
741     - the metadata, encoded as an 'uint', in the form required by e.g.
742       show_sql_type.
743     - bool indicating whether the field is array (true) or a scalar (false)
744 */
745 
read_field_metadata(const uchar * buffer,enum_field_types binlog_type)746 std::pair<my_off_t, std::pair<uint, bool>> read_field_metadata(
747     const uchar *buffer, enum_field_types binlog_type) {
748   bool is_array = false;
749   uint metadata = 0;
750   uint index = 0;
751   if (binlog_type == MYSQL_TYPE_TYPED_ARRAY) {
752     binlog_type = static_cast<enum_field_types>(buffer[index++]);
753     is_array = true;
754   }
755   switch (binlog_type) {
756     case MYSQL_TYPE_TINY_BLOB:
757     case MYSQL_TYPE_BLOB:
758     case MYSQL_TYPE_MEDIUM_BLOB:
759     case MYSQL_TYPE_LONG_BLOB:
760     case MYSQL_TYPE_DOUBLE:
761     case MYSQL_TYPE_FLOAT:
762     case MYSQL_TYPE_GEOMETRY:
763     case MYSQL_TYPE_TIME2:
764     case MYSQL_TYPE_DATETIME2:
765     case MYSQL_TYPE_TIMESTAMP2:
766     case MYSQL_TYPE_JSON: {
767       /*
768         These types store a single byte.
769       */
770       metadata = buffer[index++];
771       break;
772     }
773     case MYSQL_TYPE_SET:
774     case MYSQL_TYPE_ENUM:
775     case MYSQL_TYPE_STRING: {
776       metadata = buffer[index++] << 8U;  // real_type
777       metadata += buffer[index++];       // pack or field length
778       break;
779     }
780     case MYSQL_TYPE_BIT: {
781       metadata = buffer[index++];
782       metadata += (buffer[index++] << 8U);
783       break;
784     }
785     case MYSQL_TYPE_VARCHAR: {
786       /*
787         These types store two bytes.
788       */
789       if (is_array) {
790         metadata = uint3korr(buffer + index);
791         index = index + 3;
792       } else {
793         metadata = uint2korr(buffer + index);
794         index = index + 2;
795       }
796       break;
797     }
798     case MYSQL_TYPE_NEWDECIMAL: {
799       metadata = buffer[index++] << 8U;  // precision
800       metadata += buffer[index++];       // decimals
801       break;
802     }
803     default:
804       metadata = 0;
805       break;
806   }
807   return std::make_pair(index, std::make_pair(metadata, is_array));
808 }
809 
810 PSI_memory_key key_memory_table_def_memory;
811 
table_def(unsigned char * types,ulong size,uchar * field_metadata,int metadata_size,uchar * null_bitmap,uint16 flags)812 table_def::table_def(unsigned char *types, ulong size, uchar *field_metadata,
813                      int metadata_size, uchar *null_bitmap, uint16 flags)
814     : m_size(size),
815       m_type(nullptr),
816       m_field_metadata_size(metadata_size),
817       m_field_metadata(nullptr),
818       m_null_bits(nullptr),
819       m_flags(flags),
820       m_memory(nullptr),
821       m_json_column_count(-1),
822       m_is_array(nullptr) {
823   m_memory = (uchar *)my_multi_malloc(
824       key_memory_table_def_memory, MYF(MY_WME), &m_type, size,
825       &m_field_metadata, size * sizeof(uint), &m_is_array, size * sizeof(bool),
826       &m_null_bits, (size + 7) / 8, nullptr);
827 
828   memset(m_field_metadata, 0, size * sizeof(uint));
829   memset(m_is_array, 0, size * sizeof(bool));
830 
831   if (m_type)
832     memcpy(m_type, types, size);
833   else
834     m_size = 0;
835   /*
836     Extract the data from the table map into the field metadata array
837     iff there is field metadata. The variable metadata_size will be
838     0 if we are replicating from an older version server since no field
839     metadata was written to the table map. This can also happen if
840     there were no fields in the master that needed extra metadata.
841   */
842   if (m_size && metadata_size) {
843     int index = 0;
844     for (unsigned int i = 0; i < m_size; i++) {
845       std::pair<my_off_t, std::pair<uint, bool>> pack = read_field_metadata(
846           static_cast<const uchar *>(field_metadata + index), binlog_type(i));
847       // Update type of the typed array
848       if (binlog_type(i) == MYSQL_TYPE_TYPED_ARRAY)
849         m_type[i] = static_cast<enum_field_types>(field_metadata[index]);
850       // Fill in read metadata
851       m_field_metadata[i] = pack.second.first;
852       m_is_array[i] = pack.second.second;
853       index += pack.first;
854       DBUG_ASSERT(index <= metadata_size);
855     }
856   }
857   if (m_size && null_bitmap) memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
858 }
859 
~table_def()860 table_def::~table_def() {
861   my_free(m_memory);
862 #ifndef DBUG_OFF
863   m_type = nullptr;
864   m_size = 0;
865 #endif
866 }
867 
868 #ifdef MYSQL_SERVER
869 
870 #define HASH_ROWS_POS_SEARCH_INVALID -1
871 
872 /**
873   Utility methods for handling row based operations.
874  */
875 
operator ()(HASH_ROW_ENTRY * entry) const876 void hash_slave_rows_free_entry::operator()(HASH_ROW_ENTRY *entry) const {
877   DBUG_TRACE;
878   if (entry) {
879     if (entry->preamble) {
880       entry->preamble->~HASH_ROW_PREAMBLE();
881       my_free(entry->preamble);
882     }
883     if (entry->positions) my_free(entry->positions);
884     my_free(entry);
885   }
886 }
887 
is_empty(void)888 bool Hash_slave_rows::is_empty(void) { return m_hash.empty(); }
889 
890 /**
891    Hashing commodity structures and functions.
892  */
893 
init(void)894 bool Hash_slave_rows::init(void) { return false; }
895 
deinit(void)896 bool Hash_slave_rows::deinit(void) {
897   DBUG_TRACE;
898   m_hash.clear();
899   return false;
900 }
901 
size()902 int Hash_slave_rows::size() { return m_hash.size(); }
903 
make_entry()904 HASH_ROW_ENTRY *Hash_slave_rows::make_entry() {
905   return make_entry(nullptr, nullptr);
906 }
907 
make_entry(const uchar * bi_start,const uchar * bi_ends)908 HASH_ROW_ENTRY *Hash_slave_rows::make_entry(const uchar *bi_start,
909                                             const uchar *bi_ends) {
910   DBUG_TRACE;
911 
912   HASH_ROW_ENTRY *entry = (HASH_ROW_ENTRY *)my_malloc(
913       key_memory_HASH_ROW_ENTRY, sizeof(HASH_ROW_ENTRY), MYF(0));
914   HASH_ROW_PREAMBLE *preamble = (HASH_ROW_PREAMBLE *)my_malloc(
915       key_memory_HASH_ROW_ENTRY, sizeof(HASH_ROW_PREAMBLE), MYF(0));
916   HASH_ROW_POS *pos = (HASH_ROW_POS *)my_malloc(key_memory_HASH_ROW_ENTRY,
917                                                 sizeof(HASH_ROW_POS), MYF(0));
918 
919   if (!entry || !preamble || !pos) goto err;
920 
921   /**
922      Filling in the preamble.
923    */
924   new (preamble) HASH_ROW_PREAMBLE();
925   preamble->hash_value = 0;
926   preamble->search_state = m_hash.end();
927   preamble->is_search_state_inited = false;
928 
929   /**
930      Filling in the positions.
931    */
932   pos->bi_start = bi_start;
933   pos->bi_ends = bi_ends;
934 
935   /**
936     Filling in the entry
937    */
938   entry->preamble = preamble;
939   entry->positions = pos;
940 
941   return entry;
942 
943 err:
944   DBUG_PRINT("info", ("Hash_slave_rows::make_entry - malloc error"));
945   if (entry) my_free(entry);
946   if (preamble) {
947     preamble->~HASH_ROW_PREAMBLE();
948     my_free(preamble);
949   }
950   if (pos) my_free(pos);
951   return nullptr;
952 }
953 
put(TABLE * table,MY_BITMAP * cols,HASH_ROW_ENTRY * entry)954 bool Hash_slave_rows::put(TABLE *table, MY_BITMAP *cols,
955                           HASH_ROW_ENTRY *entry) {
956   DBUG_TRACE;
957 
958   HASH_ROW_PREAMBLE *preamble = entry->preamble;
959 
960   /**
961      Skip blobs and BIT fields from key calculation.
962      Handle X bits.
963      Handle nulled fields.
964      Handled fields not signaled.
965   */
966   preamble->hash_value = make_hash_key(table, cols);
967 
968   m_hash.emplace(preamble->hash_value,
969                  unique_ptr<HASH_ROW_ENTRY, hash_slave_rows_free_entry>(entry));
970   DBUG_PRINT("debug",
971              ("Added record to hash with key=%u", preamble->hash_value));
972   return false;
973 }
974 
get(TABLE * table,MY_BITMAP * cols)975 HASH_ROW_ENTRY *Hash_slave_rows::get(TABLE *table, MY_BITMAP *cols) {
976   DBUG_TRACE;
977   uint key;
978   HASH_ROW_ENTRY *entry = nullptr;
979 
980   key = make_hash_key(table, cols);
981 
982   DBUG_PRINT("debug", ("Looking for record with key=%u in the hash.", key));
983 
984   const auto it = m_hash.find(key);
985   if (it != m_hash.end()) {
986     DBUG_PRINT("debug", ("Found record with key=%u in the hash.", key));
987 
988     /**
989        Save the search state in case we need to go through entries for
990        the given key.
991     */
992     entry = it->second.get();
993     entry->preamble->search_state = it;
994     entry->preamble->is_search_state_inited = true;
995   }
996 
997   return entry;
998 }
999 
next(HASH_ROW_ENTRY ** entry)1000 bool Hash_slave_rows::next(HASH_ROW_ENTRY **entry) {
1001   DBUG_TRACE;
1002   DBUG_ASSERT(*entry);
1003 
1004   if (*entry == nullptr) return true;
1005 
1006   HASH_ROW_PREAMBLE *preamble = (*entry)->preamble;
1007 
1008   if (!preamble->is_search_state_inited) return true;
1009 
1010   uint key = preamble->hash_value;
1011   const auto it = std::next(preamble->search_state);
1012 
1013   /*
1014     Invalidate search for current preamble, because it is going to be
1015     used in the search below (and search state is used in a
1016     one-time-only basis).
1017    */
1018   preamble->search_state = m_hash.end();
1019   preamble->is_search_state_inited = false;
1020 
1021   DBUG_PRINT("debug",
1022              ("Looking for record with key=%u in the hash (next).", key));
1023 
1024   if (it != m_hash.end() && it->first == key) {
1025     DBUG_PRINT("debug", ("Found record with key=%u in the hash (next).", key));
1026     *entry = it->second.get();
1027     preamble = (*entry)->preamble;
1028 
1029     /**
1030        Save the search state for next iteration (if any).
1031      */
1032     preamble->search_state = it;
1033     preamble->is_search_state_inited = true;
1034   } else {
1035     *entry = nullptr;
1036   }
1037 
1038   return false;
1039 }
1040 
del(HASH_ROW_ENTRY * entry)1041 bool Hash_slave_rows::del(HASH_ROW_ENTRY *entry) {
1042   DBUG_TRACE;
1043   DBUG_ASSERT(entry);
1044 
1045   erase_specific_element(&m_hash, entry->preamble->hash_value, entry);
1046   return false;
1047 }
1048 
make_hash_key(TABLE * table,MY_BITMAP * cols)1049 uint Hash_slave_rows::make_hash_key(TABLE *table, MY_BITMAP *cols) {
1050   DBUG_TRACE;
1051   ha_checksum crc = 0L;
1052 
1053   uchar *record = table->record[0];
1054   uchar saved_x = 0, saved_filler = 0;
1055 
1056   if (table->s->null_bytes > 0) {
1057     /*
1058       If we have an X bit then we need to take care of it.
1059     */
1060     if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD)) {
1061       saved_x = record[0];
1062       record[0] |= 1U;
1063     }
1064 
1065     /*
1066       If (last_null_bit_pos == 0 && null_bytes > 1), then:
1067       X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
1068       Ie, the entire byte is used.
1069     */
1070     if (table->s->last_null_bit_pos > 0) {
1071       saved_filler = record[table->s->null_bytes - 1];
1072       record[table->s->null_bytes - 1] |=
1073           256U - (1U << table->s->last_null_bit_pos);
1074     }
1075   }
1076 
1077   /*
1078     We can only checksum the bytes if all fields have been signaled
1079     in the before image. Otherwise, unpack_row will not have set the
1080     null_flags correctly (because it only unpacks those fields and
1081     their flags that were actually in the before image).
1082 
1083     @c record_compare, as it also skips null_flags if the read_set
1084     was not marked completely.
1085    */
1086   if (bitmap_is_set_all(cols) && cols->n_bits == table->s->fields) {
1087     crc = checksum_crc32(crc, table->null_flags, table->s->null_bytes);
1088     DBUG_PRINT("debug", ("make_hash_entry: hash after null_flags: %u", crc));
1089   }
1090 
1091   for (Field **ptr = table->field;
1092        *ptr && ((*ptr)->field_index() < cols->n_bits); ptr++) {
1093     Field *f = (*ptr);
1094 
1095     /*
1096       Field is set in the read_set and is isn't NULL.
1097      */
1098     if (bitmap_is_set(cols, f->field_index()) &&
1099         !f->is_virtual_gcol() &&  // Avoid virtual generated columns on hashes
1100         !f->is_null()) {
1101       /*
1102         BLOB and VARCHAR have pointers in their field, we must convert
1103         to string; GEOMETRY and JSON are implemented on top of BLOB.
1104         BIT may store its data among NULL bits, convert as well.
1105       */
1106       switch (f->type()) {
1107         case MYSQL_TYPE_BLOB:
1108         case MYSQL_TYPE_VARCHAR:
1109         case MYSQL_TYPE_GEOMETRY:
1110         case MYSQL_TYPE_JSON:
1111         case MYSQL_TYPE_BIT: {
1112           String tmp;
1113           f->val_str(&tmp);
1114           crc = checksum_crc32(crc, pointer_cast<const uchar *>(tmp.ptr()),
1115                                tmp.length());
1116           break;
1117         }
1118         default:
1119           crc = checksum_crc32(crc, f->field_ptr(), f->data_length());
1120           break;
1121       }
1122 #ifndef DBUG_OFF
1123       String tmp;
1124       f->val_str(&tmp);
1125       DBUG_PRINT("debug", ("make_hash_entry: hash after field %s=%s: %u",
1126                            f->field_name, tmp.c_ptr_safe(), crc));
1127 #endif
1128     }
1129   }
1130 
1131   /*
1132     Restore the saved bytes.
1133 
1134     TODO[record format ndb]: Remove this code once NDB returns the
1135     correct record format.
1136   */
1137   if (table->s->null_bytes > 0) {
1138     if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
1139       record[0] = saved_x;
1140 
1141     if (table->s->last_null_bit_pos)
1142       record[table->s->null_bytes - 1] = saved_filler;
1143   }
1144 
1145   DBUG_PRINT("debug", ("Created key=%u", crc));
1146   return crc;
1147 }
1148 
1149 #endif
1150 
1151 #if defined(MYSQL_SERVER)
1152 
Deferred_log_events()1153 Deferred_log_events::Deferred_log_events()
1154     : m_array(key_memory_table_def_memory) {}
1155 
~Deferred_log_events()1156 Deferred_log_events::~Deferred_log_events() { m_array.clear(); }
1157 
add(Log_event * ev)1158 int Deferred_log_events::add(Log_event *ev) {
1159   m_array.push_back(ev);
1160   ev->worker = nullptr;  // to mark event busy avoiding deletion
1161   return 0;
1162 }
1163 
is_empty()1164 bool Deferred_log_events::is_empty() { return m_array.empty(); }
1165 
execute(Relay_log_info * rli)1166 bool Deferred_log_events::execute(Relay_log_info *rli) {
1167   bool res = false;
1168 
1169   DBUG_ASSERT(rli->deferred_events_collecting);
1170 
1171   rli->deferred_events_collecting = false;
1172   for (Log_event **it = m_array.begin(); !res && it != m_array.end(); ++it) {
1173     Log_event *ev = *it;
1174     res = ev->apply_event(rli);
1175   }
1176   rli->deferred_events_collecting = true;
1177   return res;
1178 }
1179 
rewind()1180 void Deferred_log_events::rewind() {
1181   /*
1182     Reset preceding Query log event events which execution was
1183     deferred because of slave side filtering.
1184   */
1185   delete_container_pointers(m_array);
1186   m_array.shrink_to_fit();
1187 }
1188 
replace_all_in_str(std::string from,std::string find,std::string replace)1189 std::string replace_all_in_str(std::string from, std::string find,
1190                                std::string replace) {
1191   std::string to{from.data()};
1192   if (to.length() == 0) {
1193     return to;
1194   }
1195 
1196   size_t start{0};
1197   while ((start = to.find(find, start)) != std::string::npos) {
1198     to.replace(start, find.size(), replace);
1199     start += replace.length();
1200   }
1201 
1202   return to;
1203 }
1204 
1205 #endif
1206 
1207 #ifdef MYSQL_SERVER
THD_instance_guard(THD * thd)1208 THD_instance_guard::THD_instance_guard(THD *thd)
1209     : m_is_locally_initialized{thd == nullptr} {
1210   if (this->m_is_locally_initialized) {
1211     this->m_target = new THD;
1212     this->m_target->thread_stack = (char *)&this->m_target;
1213     this->m_target->store_globals();
1214     this->m_target->security_context()->skip_grants();
1215   } else {
1216     this->m_target = thd;
1217   }
1218 }
1219 
~THD_instance_guard()1220 THD_instance_guard::~THD_instance_guard() {
1221   if (this->m_is_locally_initialized) {
1222     delete this->m_target;
1223   }
1224 }
1225 
operator THD*()1226 THD_instance_guard::operator THD *() { return this->m_target; }
1227 
evaluate_command_row_only_restrictions(THD * thd)1228 bool evaluate_command_row_only_restrictions(THD *thd) {
1229   LEX *const lex = thd->lex;
1230 
1231   switch (lex->sql_command) {
1232     case SQLCOM_UPDATE:
1233     case SQLCOM_INSERT:
1234     case SQLCOM_INSERT_SELECT:
1235     case SQLCOM_DELETE:
1236     case SQLCOM_LOAD:
1237     case SQLCOM_REPLACE:
1238     case SQLCOM_REPLACE_SELECT:
1239     case SQLCOM_DELETE_MULTI:
1240     case SQLCOM_UPDATE_MULTI: {
1241       return true;
1242     }
1243     case SQLCOM_CREATE_TABLE: {
1244       return (lex->create_info->options & HA_LEX_CREATE_TMP_TABLE);
1245     }
1246     case SQLCOM_DROP_TABLE: {
1247       return (lex->drop_temporary);
1248     }
1249     default:
1250       break;
1251   }
1252 
1253   return false;
1254 }
1255 
1256 #endif  // MYSQL_SERVER
1257