1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/dd_table_share.h"
24 
25 #include "my_config.h"
26 
27 #include <string.h>
28 #include <algorithm>
29 #include <string>
30 #include <type_traits>
31 
32 #include "lex_string.h"
33 #include "m_string.h"
34 #include "map_helpers.h"
35 #include "my_alloc.h"
36 #include "my_base.h"
37 #include "my_bitmap.h"
38 #include "my_compare.h"
39 #include "my_compiler.h"
40 #include "my_dbug.h"
41 #include "my_loglevel.h"
42 #include "my_macros.h"
43 #include "mysql/components/services/log_builtins.h"
44 #include "mysql/components/services/log_shared.h"
45 #include "mysql/plugin.h"
46 #include "mysql/psi/psi_base.h"
47 #include "mysql/udf_registration_types.h"
48 #include "mysql_com.h"
49 #include "mysqld_error.h"
50 #include "nullable.h"
51 #include "sql/dd/collection.h"
52 #include "sql/dd/dd_table.h"       // dd::FIELD_NAME_SEPARATOR_CHAR
53 #include "sql/dd/dd_tablespace.h"  // dd::get_tablespace_name
54 // TODO: Avoid exposing dd/impl headers in public files.
55 #include "sql/dd/impl/utils.h"  // dd::eat_str
56 #include "sql/dd/properties.h"  // dd::Properties
57 #include "sql/dd/string_type.h"
58 #include "sql/dd/types/check_constraint.h"     // dd::Check_constraint
59 #include "sql/dd/types/column.h"               // dd::enum_column_types
60 #include "sql/dd/types/column_type_element.h"  // dd::Column_type_element
61 #include "sql/dd/types/foreign_key.h"
62 #include "sql/dd/types/foreign_key_element.h"  // dd::Foreign_key_element
63 #include "sql/dd/types/index.h"                // dd::Index
64 #include "sql/dd/types/index_element.h"        // dd::Index_element
65 #include "sql/dd/types/partition.h"            // dd::Partition
66 #include "sql/dd/types/partition_value.h"      // dd::Partition_value
67 #include "sql/dd/types/table.h"                // dd::Table
68 #include "sql/default_values.h"  // prepare_default_value_buffer...
69 #include "sql/error_handler.h"   // Internal_error_handler
70 #include "sql/field.h"
71 #include "sql/gis/srid.h"
72 #include "sql/handler.h"
73 #include "sql/key.h"
74 #include "sql/log.h"
75 #include "sql/partition_element.h"  // partition_element
76 #include "sql/partition_info.h"     // partition_info
77 #include "sql/sql_bitmap.h"
78 #include "sql/sql_check_constraint.h"  // Sql_check_constraint_share_list
79 #include "sql/sql_class.h"             // THD
80 #include "sql/sql_const.h"
81 #include "sql/sql_error.h"
82 #include "sql/sql_list.h"
83 #include "sql/sql_partition.h"  // generate_partition_syntax
84 #include "sql/sql_plugin.h"     // plugin_unlock
85 #include "sql/sql_plugin_ref.h"
86 #include "sql/sql_table.h"  // primary_key_name
87 #include "sql/strfunc.h"    // lex_cstring_handle
88 #include "sql/system_variables.h"
89 #include "sql/table.h"
90 #include "sql/thd_raii.h"
91 #include "typelib.h"
92 
93 namespace histograms {
94 class Histogram;
95 }  // namespace histograms
96 
dd_get_old_field_type(dd::enum_column_types type)97 enum_field_types dd_get_old_field_type(dd::enum_column_types type) {
98   switch (type) {
99     case dd::enum_column_types::DECIMAL:
100       return MYSQL_TYPE_DECIMAL;
101 
102     case dd::enum_column_types::TINY:
103       return MYSQL_TYPE_TINY;
104 
105     case dd::enum_column_types::SHORT:
106       return MYSQL_TYPE_SHORT;
107 
108     case dd::enum_column_types::LONG:
109       return MYSQL_TYPE_LONG;
110 
111     case dd::enum_column_types::FLOAT:
112       return MYSQL_TYPE_FLOAT;
113 
114     case dd::enum_column_types::DOUBLE:
115       return MYSQL_TYPE_DOUBLE;
116 
117     case dd::enum_column_types::TYPE_NULL:
118       return MYSQL_TYPE_NULL;
119 
120     case dd::enum_column_types::TIMESTAMP:
121       return MYSQL_TYPE_TIMESTAMP;
122 
123     case dd::enum_column_types::LONGLONG:
124       return MYSQL_TYPE_LONGLONG;
125 
126     case dd::enum_column_types::INT24:
127       return MYSQL_TYPE_INT24;
128 
129     case dd::enum_column_types::DATE:
130       return MYSQL_TYPE_DATE;
131 
132     case dd::enum_column_types::TIME:
133       return MYSQL_TYPE_TIME;
134 
135     case dd::enum_column_types::DATETIME:
136       return MYSQL_TYPE_DATETIME;
137 
138     case dd::enum_column_types::YEAR:
139       return MYSQL_TYPE_YEAR;
140 
141     case dd::enum_column_types::NEWDATE:
142       return MYSQL_TYPE_NEWDATE;
143 
144     case dd::enum_column_types::VARCHAR:
145       return MYSQL_TYPE_VARCHAR;
146 
147     case dd::enum_column_types::BIT:
148       return MYSQL_TYPE_BIT;
149 
150     case dd::enum_column_types::TIMESTAMP2:
151       return MYSQL_TYPE_TIMESTAMP2;
152 
153     case dd::enum_column_types::DATETIME2:
154       return MYSQL_TYPE_DATETIME2;
155 
156     case dd::enum_column_types::TIME2:
157       return MYSQL_TYPE_TIME2;
158 
159     case dd::enum_column_types::NEWDECIMAL:
160       return MYSQL_TYPE_NEWDECIMAL;
161 
162     case dd::enum_column_types::ENUM:
163       return MYSQL_TYPE_ENUM;
164 
165     case dd::enum_column_types::SET:
166       return MYSQL_TYPE_SET;
167 
168     case dd::enum_column_types::TINY_BLOB:
169       return MYSQL_TYPE_TINY_BLOB;
170 
171     case dd::enum_column_types::MEDIUM_BLOB:
172       return MYSQL_TYPE_MEDIUM_BLOB;
173 
174     case dd::enum_column_types::LONG_BLOB:
175       return MYSQL_TYPE_LONG_BLOB;
176 
177     case dd::enum_column_types::BLOB:
178       return MYSQL_TYPE_BLOB;
179 
180     case dd::enum_column_types::VAR_STRING:
181       return MYSQL_TYPE_VAR_STRING;
182 
183     case dd::enum_column_types::STRING:
184       return MYSQL_TYPE_STRING;
185 
186     case dd::enum_column_types::GEOMETRY:
187       return MYSQL_TYPE_GEOMETRY;
188 
189     case dd::enum_column_types::JSON:
190       return MYSQL_TYPE_JSON;
191 
192     default:
193       DBUG_ASSERT(!"Should not hit here"); /* purecov: deadcode */
194   }
195 
196   return MYSQL_TYPE_LONG;
197 }
198 
199 /** For enum in dd::Index */
dd_get_old_index_algorithm_type(dd::Index::enum_index_algorithm type)200 static enum ha_key_alg dd_get_old_index_algorithm_type(
201     dd::Index::enum_index_algorithm type) {
202   switch (type) {
203     case dd::Index::IA_SE_SPECIFIC:
204       return HA_KEY_ALG_SE_SPECIFIC;
205 
206     case dd::Index::IA_BTREE:
207       return HA_KEY_ALG_BTREE;
208 
209     case dd::Index::IA_RTREE:
210       return HA_KEY_ALG_RTREE;
211 
212     case dd::Index::IA_HASH:
213       return HA_KEY_ALG_HASH;
214 
215     case dd::Index::IA_FULLTEXT:
216       return HA_KEY_ALG_FULLTEXT;
217 
218     default:
219       DBUG_ASSERT(!"Should not hit here"); /* purecov: deadcode */
220   }
221 
222   return HA_KEY_ALG_SE_SPECIFIC;
223 }
224 
225 /*
226   Check if the given key_part is suitable to be promoted as part of
227   primary key.
228 */
is_suitable_for_primary_key(KEY_PART_INFO * key_part,Field * table_field)229 bool is_suitable_for_primary_key(KEY_PART_INFO *key_part, Field *table_field) {
230   // Index on virtual generated columns is not allowed to be PK
231   // even when the conditions below are true, so this case must be
232   // rejected here.
233   if (table_field->is_virtual_gcol()) return false;
234 
235   /*
236     If the key column is of NOT NULL BLOB type, then it
237     will definitly have key prefix. And if key part prefix size
238     is equal to the BLOB column max size, then we can promote
239     it to primary key.
240    */
241   if (!table_field->is_nullable() && table_field->type() == MYSQL_TYPE_BLOB &&
242       table_field->field_length == key_part->length)
243     return true;
244 
245   if (table_field->is_nullable() ||
246       table_field->key_length() != key_part->length)
247     return false;
248 
249   return true;
250 }
251 
252 /**
253   Finalize preparation of TABLE_SHARE from dd::Table object by filling
254   in remaining info about columns and keys.
255 
256   This code similar to code in open_binary_frm(). Can be re-written
257   independent to other efforts later.
258 */
259 
prepare_share(THD * thd,TABLE_SHARE * share,const dd::Table * table_def)260 static bool prepare_share(THD *thd, TABLE_SHARE *share,
261                           const dd::Table *table_def) {
262   my_bitmap_map *bitmaps;
263   bool use_hash;
264   handler *handler_file = nullptr;
265 
266   // Mark 'system' tables (tables with one row) to help the Optimizer.
267   share->system =
268       ((share->max_rows == 1) && (share->min_rows == 1) && (share->keys == 0));
269 
270   bool use_extended_sk = ha_check_storage_engine_flag(
271       share->db_type(), HTON_SUPPORTS_EXTENDED_KEYS);
272   // Setup name_hash for quick look-up
273   use_hash = share->fields >= MAX_FIELDS_BEFORE_HASH;
274   if (use_hash) {
275     Field **field_ptr = share->field;
276     share->name_hash = new collation_unordered_map<std::string, Field **>(
277         system_charset_info, PSI_INSTRUMENT_ME);
278     share->name_hash->reserve(share->fields);
279 
280     for (uint i = 0; i < share->fields; i++, field_ptr++) {
281       share->name_hash->emplace((*field_ptr)->field_name, field_ptr);
282     }
283   }
284 
285   share->m_histograms =
286       new malloc_unordered_map<uint, const histograms::Histogram *>(
287           PSI_INSTRUMENT_ME);
288 
289   // Setup other fields =====================================================
290   /* Allocate handler */
291   if (!(handler_file = get_new_handler(share, (share->m_part_info != nullptr),
292                                        &share->mem_root, share->db_type()))) {
293     my_error(ER_INVALID_DD_OBJECT, MYF(0), share->path.str,
294              "Failed to initialize handler.");
295     return true;
296   }
297 
298   if (handler_file->set_ha_share_ref(&share->ha_share)) {
299     my_error(ER_INVALID_DD_OBJECT, MYF(0), share->path.str, "");
300     return true;
301   }
302   share->db_low_byte_first = handler_file->low_byte_first();
303 
304   /* Fix key->name and key_part->field */
305   if (share->keys) {
306     KEY *keyinfo;
307     KEY_PART_INFO *key_part;
308     uint primary_key = (uint)(
309         find_type(primary_key_name, &share->keynames, FIND_TYPE_NO_PREFIX) - 1);
310     longlong ha_option = handler_file->ha_table_flags();
311     keyinfo = share->key_info;
312     key_part = keyinfo->key_part;
313 
314     dd::Table::Index_collection::const_iterator idx_it(
315         table_def->indexes().begin());
316 
317     for (uint key = 0; key < share->keys; key++, keyinfo++) {
318       /*
319         Skip hidden dd::Index objects so idx_it is in sync with key index
320         and keyinfo pointer.
321       */
322       while ((*idx_it)->is_hidden()) {
323         ++idx_it;
324         continue;
325       }
326 
327       uint usable_parts = 0;
328       keyinfo->name = share->keynames.type_names[key];
329 
330       /* Check that fulltext and spatial keys have correct algorithm set. */
331       DBUG_ASSERT(!(share->key_info[key].flags & HA_FULLTEXT) ||
332                   share->key_info[key].algorithm == HA_KEY_ALG_FULLTEXT);
333       DBUG_ASSERT(!(share->key_info[key].flags & HA_SPATIAL) ||
334                   share->key_info[key].algorithm == HA_KEY_ALG_RTREE);
335 
336       if (primary_key >= MAX_KEY && (keyinfo->flags & HA_NOSAME)) {
337         /*
338            If the UNIQUE key doesn't have NULL columns and is not a part key
339            declare this as a primary key.
340         */
341         primary_key = key;
342         for (uint i = 0; i < keyinfo->user_defined_key_parts; i++) {
343           Field *table_field = key_part[i].field;
344 
345           if (is_suitable_for_primary_key(&key_part[i], table_field) == false) {
346             primary_key = MAX_KEY;
347             break;
348           }
349         }
350 
351         /*
352           Check that dd::Index::is_candidate_key() used by SEs works in
353           the same way as above call to is_suitable_for_primary_key().
354         */
355         DBUG_ASSERT((primary_key == key) == (*idx_it)->is_candidate_key());
356       }
357 
358       dd::Index::Index_elements::const_iterator idx_el_it(
359           (*idx_it)->elements().begin());
360 
361       for (uint i = 0; i < keyinfo->user_defined_key_parts; key_part++, i++) {
362         /*
363           Skip hidden Index_element objects so idx_el_it is in sync with
364           i and key_part pointer.
365         */
366         while ((*idx_el_it)->is_hidden()) {
367           ++idx_el_it;
368           continue;
369         }
370 
371         Field *field = key_part->field;
372 
373         key_part->type = field->key_type();
374         if (field->is_nullable()) {
375           key_part->null_offset = field->null_offset(share->default_values);
376           key_part->null_bit = field->null_bit;
377           key_part->store_length += HA_KEY_NULL_LENGTH;
378           keyinfo->flags |= HA_NULL_PART_KEY;
379           keyinfo->key_length += HA_KEY_NULL_LENGTH;
380         }
381         if (field->type() == MYSQL_TYPE_BLOB ||
382             field->real_type() == MYSQL_TYPE_VARCHAR ||
383             field->type() == MYSQL_TYPE_GEOMETRY) {
384           key_part->store_length += HA_KEY_BLOB_LENGTH;
385           if (i + 1 <= keyinfo->user_defined_key_parts)
386             keyinfo->key_length += HA_KEY_BLOB_LENGTH;
387         }
388         key_part->init_flags();
389 
390         if (field->is_virtual_gcol()) keyinfo->flags |= HA_VIRTUAL_GEN_KEY;
391 
392         setup_key_part_field(share, handler_file, primary_key, keyinfo, key, i,
393                              &usable_parts, true);
394 
395         field->set_flag(PART_KEY_FLAG);
396         if (key == primary_key) {
397           field->set_flag(PRI_KEY_FLAG);
398           /*
399              If this field is part of the primary key and all keys contains
400              the primary key, then we can use any key to find this column
401              */
402           if (ha_option & HA_PRIMARY_KEY_IN_READ_INDEX) {
403             if (field->key_length() == key_part->length &&
404                 !field->is_flag_set(BLOB_FLAG))
405               field->part_of_key = share->keys_in_use;
406             if (field->part_of_sortkey.is_set(key))
407               field->part_of_sortkey = share->keys_in_use;
408           }
409         }
410         if (field->key_length() != key_part->length) {
411 #ifndef TO_BE_DELETED_ON_PRODUCTION
412           if (field->type() == MYSQL_TYPE_NEWDECIMAL) {
413             /*
414                Fix a fatal error in decimal key handling that causes crashes
415                on Innodb. We fix it by reducing the key length so that
416                InnoDB never gets a too big key when searching.
417                This allows the end user to do an ALTER TABLE to fix the
418                error.
419                */
420             keyinfo->key_length -= (key_part->length - field->key_length());
421             key_part->store_length -=
422                 (uint16)(key_part->length - field->key_length());
423             key_part->length = (uint16)field->key_length();
424             LogErr(ERROR_LEVEL, ER_TABLE_WRONG_KEY_DEFINITION,
425                    share->table_name.str, share->table_name.str);
426             push_warning_printf(thd, Sql_condition::SL_WARNING,
427                                 ER_CRASHED_ON_USAGE,
428                                 "Found wrong key definition in %s; "
429                                 "Please do \"ALTER TABLE `%s` FORCE\" to fix "
430                                 "it!",
431                                 share->table_name.str, share->table_name.str);
432             share->crashed = true;  // Marker for CHECK TABLE
433             continue;
434           }
435 #endif
436           key_part->key_part_flag |= HA_PART_KEY_SEG;
437         }
438 
439         /*
440           Check that dd::Index_element::is_prefix() used by SEs works in
441           the same way as code which sets HA_PART_KEY_SEG flag.
442         */
443         DBUG_ASSERT(
444             (*idx_el_it)->is_prefix() ==
445             static_cast<bool>(key_part->key_part_flag & HA_PART_KEY_SEG));
446         ++idx_el_it;
447       }
448 
449       /*
450         KEY::flags is fully set-up at this point so we can copy it to
451         KEY::actual_flags.
452       */
453       keyinfo->actual_flags = keyinfo->flags;
454 
455       if (use_extended_sk && primary_key < MAX_KEY && key &&
456           !(keyinfo->flags & HA_NOSAME))
457         key_part +=
458             add_pk_parts_to_sk(keyinfo, key, share->key_info, primary_key,
459                                share, handler_file, &usable_parts);
460 
461       /* Skip unused key parts if they exist */
462       key_part += keyinfo->unused_key_parts;
463 
464       keyinfo->usable_key_parts = usable_parts;  // Filesort
465 
466       share->max_key_length =
467           std::max(share->max_key_length,
468                    keyinfo->key_length + keyinfo->user_defined_key_parts);
469       share->total_key_length += keyinfo->key_length;
470       /*
471          MERGE tables do not have unique indexes. But every key could be
472          an unique index on the underlying MyISAM table. (Bug #10400)
473          */
474       if ((keyinfo->flags & HA_NOSAME) ||
475           (ha_option & HA_ANY_INDEX_MAY_BE_UNIQUE))
476         share->max_unique_length =
477             std::max(share->max_unique_length, keyinfo->key_length);
478 
479       ++idx_it;
480     }
481     if (primary_key < MAX_KEY && (share->keys_in_use.is_set(primary_key))) {
482       share->primary_key = primary_key;
483       /*
484          If we are using an integer as the primary key then allow the user to
485          refer to it as '_rowid'
486          */
487       if (share->key_info[primary_key].user_defined_key_parts == 1) {
488         Field *field = share->key_info[primary_key].key_part[0].field;
489         if (field && field->result_type() == INT_RESULT) {
490           /* note that fieldnr here (and rowid_field_offset) starts from 1 */
491           share->rowid_field_offset =
492               (share->key_info[primary_key].key_part[0].fieldnr);
493         }
494       }
495     } else
496       share->primary_key = MAX_KEY;  // we do not have a primary key
497   } else
498     share->primary_key = MAX_KEY;
499   destroy(handler_file);
500 
501   if (share->found_next_number_field) {
502     Field *reg_field = *share->found_next_number_field;
503     /* Check that the auto-increment column is the first column of some key. */
504     if ((int)(share->next_number_index = (uint)find_ref_key(
505                   share->key_info, share->keys, share->default_values,
506                   reg_field, &share->next_number_key_offset,
507                   &share->next_number_keypart)) < 0) {
508       my_error(ER_INVALID_DD_OBJECT, MYF(0), share->path.str,
509                "Wrong field definition.");
510       return true;
511     } else
512       reg_field->set_flag(AUTO_INCREMENT_FLAG);
513   }
514 
515   if (share->blob_fields) {
516     Field **ptr;
517     uint k, *save;
518 
519     /* Store offsets to blob fields to find them fast */
520     if (!(share->blob_field = save = (uint *)share->mem_root.Alloc(
521               (uint)(share->blob_fields * sizeof(uint)))))
522       return true;  // OOM error message already reported
523     for (k = 0, ptr = share->field; *ptr; ptr++, k++) {
524       if ((*ptr)->is_flag_set(BLOB_FLAG) || (*ptr)->is_array()) (*save++) = k;
525     }
526   }
527 
528   share->column_bitmap_size = bitmap_buffer_size(share->fields);
529   if (!(bitmaps = (my_bitmap_map *)share->mem_root.Alloc(
530             share->column_bitmap_size))) {
531     // OOM error message already reported
532     return true; /* purecov: inspected */
533   }
534   bitmap_init(&share->all_set, bitmaps, share->fields);
535   bitmap_set_all(&share->all_set);
536 
537   return false;
538 }
539 
540 /** Fill tablespace name from dd::Tablespace. */
fill_tablespace_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)541 static bool fill_tablespace_from_dd(THD *thd, TABLE_SHARE *share,
542                                     const dd::Table *tab_obj) {
543   DBUG_TRACE;
544 
545   return dd::get_tablespace_name<dd::Table>(thd, tab_obj, &share->tablespace,
546                                             &share->mem_root);
547 }
548 
549 /**
550   Convert row format value used in DD to corresponding value in old
551   row_type enum.
552 */
553 
dd_get_old_row_format(dd::Table::enum_row_format new_format)554 static row_type dd_get_old_row_format(dd::Table::enum_row_format new_format) {
555   switch (new_format) {
556     case dd::Table::RF_FIXED:
557       return ROW_TYPE_FIXED;
558     case dd::Table::RF_DYNAMIC:
559       return ROW_TYPE_DYNAMIC;
560     case dd::Table::RF_COMPRESSED:
561       return ROW_TYPE_COMPRESSED;
562     case dd::Table::RF_REDUNDANT:
563       return ROW_TYPE_REDUNDANT;
564     case dd::Table::RF_COMPACT:
565       return ROW_TYPE_COMPACT;
566     case dd::Table::RF_PAGED:
567       return ROW_TYPE_PAGED;
568     default:
569       DBUG_ASSERT(0);
570       break;
571   }
572   return ROW_TYPE_FIXED;
573 }
574 
575 /** Fill TABLE_SHARE from dd::Table object */
fill_share_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)576 static bool fill_share_from_dd(THD *thd, TABLE_SHARE *share,
577                                const dd::Table *tab_obj) {
578   const dd::Properties &table_options = tab_obj->options();
579 
580   // Secondary storage engine.
581   if (table_options.exists("secondary_engine")) {
582     table_options.get("secondary_engine", &share->secondary_engine,
583                       &share->mem_root);
584   } else {
585     // If no secondary storage engine is set, the share cannot
586     // represent a table in a secondary engine.
587     DBUG_ASSERT(!share->is_secondary_engine());
588   }
589 
590   // Read table engine type
591   LEX_CSTRING engine_name = lex_cstring_handle(tab_obj->engine());
592   if (share->is_secondary_engine())
593     engine_name = {share->secondary_engine.str, share->secondary_engine.length};
594 
595   plugin_ref tmp_plugin = ha_resolve_by_name_raw(thd, engine_name);
596   if (tmp_plugin) {
597 #ifndef DBUG_OFF
598     handlerton *hton = plugin_data<handlerton *>(tmp_plugin);
599 #endif
600 
601     DBUG_ASSERT(hton && ha_storage_engine_is_enabled(hton));
602     DBUG_ASSERT(!ha_check_storage_engine_flag(hton, HTON_NOT_USER_SELECTABLE));
603 
604     plugin_unlock(nullptr, share->db_plugin);
605     share->db_plugin = my_plugin_lock(nullptr, &tmp_plugin);
606   } else {
607     my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), engine_name.str);
608     return true;
609   }
610 
611   // Set temporarily a good value for db_low_byte_first.
612   DBUG_ASSERT(ha_legacy_type(share->db_type()) != DB_TYPE_ISAM);
613   share->db_low_byte_first = true;
614 
615   // Read other table options
616   uint64 option_value = 0;
617   bool bool_opt = false;
618 
619   // Max rows
620   if (table_options.exists("max_rows"))
621     table_options.get("max_rows", &share->max_rows);
622 
623   // Min rows
624   if (table_options.exists("min_rows"))
625     table_options.get("min_rows", &share->min_rows);
626 
627   // Options from HA_CREATE_INFO::table_options/TABLE_SHARE::db_create_options.
628   share->db_create_options = 0;
629 
630   table_options.get("pack_record", &bool_opt);
631   if (bool_opt) share->db_create_options |= HA_OPTION_PACK_RECORD;
632 
633   if (table_options.exists("pack_keys")) {
634     table_options.get("pack_keys", &bool_opt);
635     share->db_create_options |=
636         bool_opt ? HA_OPTION_PACK_KEYS : HA_OPTION_NO_PACK_KEYS;
637   }
638 
639   if (table_options.exists("checksum")) {
640     table_options.get("checksum", &bool_opt);
641     if (bool_opt) share->db_create_options |= HA_OPTION_CHECKSUM;
642   }
643 
644   if (table_options.exists("delay_key_write")) {
645     table_options.get("delay_key_write", &bool_opt);
646     if (bool_opt) share->db_create_options |= HA_OPTION_DELAY_KEY_WRITE;
647   }
648 
649   if (table_options.exists("stats_persistent")) {
650     table_options.get("stats_persistent", &bool_opt);
651     share->db_create_options |=
652         bool_opt ? HA_OPTION_STATS_PERSISTENT : HA_OPTION_NO_STATS_PERSISTENT;
653   }
654 
655   share->db_options_in_use = share->db_create_options;
656 
657   // Average row length
658 
659   if (table_options.exists("avg_row_length")) {
660     table_options.get("avg_row_length", &option_value);
661     share->avg_row_length = static_cast<ulong>(option_value);
662   }
663 
664   // Collation ID
665   share->table_charset = dd_get_mysql_charset(tab_obj->collation_id());
666   if (!share->table_charset) {
667     // Unknown collation
668     if (use_mb(default_charset_info)) {
669       /* Warn that we may be changing the size of character columns */
670       LogErr(WARNING_LEVEL, ER_INVALID_CHARSET_AND_DEFAULT_IS_MB,
671              share->path.str);
672     }
673     share->table_charset = default_charset_info;
674   }
675 
676   // Row type. First one really used by the storage engine.
677   share->real_row_type = dd_get_old_row_format(tab_obj->row_format());
678 
679   // Then one which was explicitly specified by user for this table.
680   if (table_options.exists("row_type")) {
681     table_options.get("row_type", &option_value);
682     share->row_type =
683         dd_get_old_row_format((dd::Table::enum_row_format)option_value);
684   } else
685     share->row_type = ROW_TYPE_DEFAULT;
686 
687   // Stats_sample_pages
688   if (table_options.exists("stats_sample_pages"))
689     table_options.get("stats_sample_pages", &share->stats_sample_pages);
690 
691   // Stats_auto_recalc
692   if (table_options.exists("stats_auto_recalc")) {
693     table_options.get("stats_auto_recalc", &option_value);
694     share->stats_auto_recalc = (enum_stats_auto_recalc)option_value;
695   }
696 
697   // mysql version
698   share->mysql_version = tab_obj->mysql_version_id();
699 
700   // TODO-POST-MERGE-TO-TRUNK: Initialize new field
701   // share->last_checked_for_upgrade? Or access tab_obj directly where
702   // needed?
703 
704   // key block size
705   table_options.get("key_block_size", &share->key_block_size);
706 
707   // Prepare the default_value buffer.
708   if (prepare_default_value_buffer_and_table_share(thd, *tab_obj, share))
709     return true;
710 
711   // Storage media flags
712   if (table_options.exists("storage")) {
713     uint32 storage_option_value = 0;
714     table_options.get("storage", &storage_option_value);
715     share->default_storage_media =
716         static_cast<ha_storage_media>(storage_option_value);
717   } else
718     share->default_storage_media = HA_SM_DEFAULT;
719 
720   // Read tablespace name
721   if (fill_tablespace_from_dd(thd, share, tab_obj)) return true;
722 
723   // Read comment
724   dd::String_type comment = tab_obj->comment();
725   if (comment.length()) {
726     share->comment.str =
727         strmake_root(&share->mem_root, comment.c_str(), comment.length() + 1);
728     share->comment.length = comment.length();
729   }
730 
731   // Copy SE attributes into share's memroot
732   share->engine_attribute = LexStringDupRootUnlessEmpty(
733       &share->mem_root, tab_obj->engine_attribute());
734   share->secondary_engine_attribute = LexStringDupRootUnlessEmpty(
735       &share->mem_root, tab_obj->secondary_engine_attribute());
736 
737   // Read Connection strings
738   if (table_options.exists("connection_string"))
739     table_options.get("connection_string", &share->connect_string,
740                       &share->mem_root);
741 
742   // Read Compress string
743   if (table_options.exists("compress"))
744     table_options.get("compress", &share->compress, &share->mem_root);
745 
746   // Read Encrypt string
747   if (table_options.exists("encrypt_type"))
748     table_options.get("encrypt_type", &share->encrypt_type, &share->mem_root);
749 
750   return false;
751 }
752 
753 /**
754   Calculate number of bits used for the column in the record preamble
755   (aka null bits number).
756 */
757 
column_preamble_bits(const dd::Column * col_obj)758 static uint column_preamble_bits(const dd::Column *col_obj) {
759   uint result = 0;
760 
761   if (col_obj->is_nullable()) result++;
762 
763   if (col_obj->type() == dd::enum_column_types::BIT) {
764     bool treat_bit_as_char = false;
765     (void)col_obj->options().get("treat_bit_as_char", &treat_bit_as_char);
766 
767     if (!treat_bit_as_char) result += col_obj->char_length() & 7;
768   }
769   return result;
770 }
771 
get_auto_flags(const dd::Column & col_obj,uint & auto_flags)772 inline void get_auto_flags(const dd::Column &col_obj, uint &auto_flags) {
773   /*
774     For DEFAULT it is possible to have CURRENT_TIMESTAMP or a
775     generation expression.
776   */
777   if (!col_obj.default_option().empty()) {
778     // We're only matching the prefix because there may be parameters
779     // e.g. CURRENT_TIMESTAMP(6). Regular strings won't match as they
780     // are preceded by the charset and CURRENT_TIMESTAMP as a default
781     // expression gets converted to now().
782     if (col_obj.default_option().compare(0, 17, "CURRENT_TIMESTAMP") == 0) {
783       // The only allowed patterns are "CURRENT_TIMESTAMP" and
784       // "CURRENT_TIMESTAP(<integer>)". Stored functions with names
785       // starting with "CURRENT_TIMESTAMP" should be filtered out before
786       // we get here.
787       DBUG_ASSERT(
788           col_obj.default_option().size() == 17 ||
789           (col_obj.default_option().size() >= 20 &&
790            col_obj.default_option()[17] == '(' &&
791            col_obj.default_option()[col_obj.default_option().size() - 1] ==
792                ')'));
793 
794       auto_flags |= Field::DEFAULT_NOW;
795     } else {
796       auto_flags |= Field::GENERATED_FROM_EXPRESSION;
797     }
798   }
799 
800   /*
801     For ON UPDATE the only option which is supported
802     at this point is CURRENT_TIMESTAMP.
803   */
804   if (!col_obj.update_option().empty()) auto_flags |= Field::ON_UPDATE_NOW;
805 
806   if (col_obj.is_auto_increment()) auto_flags |= Field::NEXT_NUMBER;
807 
808   /*
809     Columns can't have AUTO_INCREMENT and DEFAULT/ON UPDATE CURRENT_TIMESTAMP at
810     the same time.
811   */
812   DBUG_ASSERT(!((auto_flags & (Field::DEFAULT_NOW | Field::ON_UPDATE_NOW |
813                                Field::GENERATED_FROM_EXPRESSION)) != 0 &&
814                 (auto_flags & Field::NEXT_NUMBER) != 0));
815 }
816 
make_field(const dd::Column & col_obj,const CHARSET_INFO * charset,TABLE_SHARE * share,uchar * ptr,uchar * null_pos,size_t null_bit)817 static Field *make_field(const dd::Column &col_obj, const CHARSET_INFO *charset,
818                          TABLE_SHARE *share, uchar *ptr, uchar *null_pos,
819                          size_t null_bit) {
820   auto field_type = dd_get_old_field_type(col_obj.type());
821   auto field_length = col_obj.char_length();
822 
823   const dd::Properties &column_options = col_obj.options();
824 
825   // Reconstruct auto_flags
826   auto auto_flags = static_cast<uint>(Field::NONE);
827   get_auto_flags(col_obj, auto_flags);
828 
829   // Read Interval TYPELIB
830   TYPELIB *interval = nullptr;
831 
832   if (field_type == MYSQL_TYPE_ENUM || field_type == MYSQL_TYPE_SET) {
833     //
834     // Allocate space for interval (column elements)
835     //
836     size_t interval_parts = col_obj.elements_count();
837 
838     interval = (TYPELIB *)share->mem_root.Alloc(sizeof(TYPELIB));
839     interval->type_names = (const char **)share->mem_root.Alloc(
840         sizeof(char *) * (interval_parts + 1));
841     interval->type_names[interval_parts] = nullptr;
842 
843     interval->type_lengths =
844         (uint *)share->mem_root.Alloc(sizeof(uint) * interval_parts);
845     interval->count = interval_parts;
846     interval->name = nullptr;
847 
848     //
849     // Iterate through all the column elements
850     //
851     for (const dd::Column_type_element *ce : col_obj.elements()) {
852       // Read the enum/set element name
853       dd::String_type element_name = ce->name();
854 
855       uint pos = ce->index() - 1;
856       interval->type_lengths[pos] = static_cast<uint>(element_name.length());
857       interval->type_names[pos] = strmake_root(
858           &share->mem_root, element_name.c_str(), element_name.length());
859     }
860   }
861 
862   // Column name
863   char *name = nullptr;
864   dd::String_type s = col_obj.name();
865   DBUG_ASSERT(!s.empty());
866   name = strmake_root(&share->mem_root, s.c_str(), s.length());
867   name[s.length()] = '\0';
868 
869   uint decimals;
870   // Decimals
871   if (field_type == MYSQL_TYPE_DECIMAL || field_type == MYSQL_TYPE_NEWDECIMAL) {
872     DBUG_ASSERT(col_obj.is_numeric_scale_null() == false);
873     decimals = col_obj.numeric_scale();
874   } else if (field_type == MYSQL_TYPE_FLOAT ||
875              field_type == MYSQL_TYPE_DOUBLE) {
876     decimals = col_obj.is_numeric_scale_null() ? DECIMAL_NOT_SPECIFIED
877                                                : col_obj.numeric_scale();
878   } else
879     decimals = 0;
880 
881   auto geom_type = Field::GEOM_GEOMETRY;
882   // Read geometry sub type
883   if (field_type == MYSQL_TYPE_GEOMETRY) {
884     uint32 sub_type = 0;
885     column_options.get("geom_type", &sub_type);
886     geom_type = static_cast<Field::geometry_type>(sub_type);
887   }
888 
889   bool treat_bit_as_char = false;
890   if (field_type == MYSQL_TYPE_BIT) {
891     column_options.get("treat_bit_as_char", &treat_bit_as_char);
892   }
893 
894   return make_field(*THR_MALLOC, share, ptr, field_length, null_pos, null_bit,
895                     field_type, charset, geom_type, auto_flags, interval, name,
896                     col_obj.is_nullable(), col_obj.is_zerofill(),
897                     col_obj.is_unsigned(), decimals, treat_bit_as_char, 0,
898                     col_obj.srs_id(), col_obj.is_array());
899 }
900 
901 /**
902   Add Field constructed according to column metadata from dd::Column
903   object to TABLE_SHARE.
904 */
905 
fill_column_from_dd(THD * thd,TABLE_SHARE * share,const dd::Column * col_obj,uchar * null_pos,uint null_bit_pos,uchar * rec_pos,uint field_nr)906 static bool fill_column_from_dd(THD *thd, TABLE_SHARE *share,
907                                 const dd::Column *col_obj, uchar *null_pos,
908                                 uint null_bit_pos, uchar *rec_pos,
909                                 uint field_nr) {
910   char *name = nullptr;
911   enum_field_types field_type;
912   const CHARSET_INFO *charset = nullptr;
913   Field *reg_field;
914   ha_storage_media field_storage;
915   column_format_type field_column_format;
916 
917   //
918   // Read column details from dd table
919   //
920 
921   // Column name
922   dd::String_type s = col_obj->name();
923   DBUG_ASSERT(!s.empty());
924   name = strmake_root(&share->mem_root, s.c_str(), s.length());
925   name[s.length()] = '\0';
926 
927   const dd::Properties *column_options = &col_obj->options();
928 
929   // Type
930   field_type = dd_get_old_field_type(col_obj->type());
931 
932   // Reconstruct auto_flags
933   auto auto_flags = static_cast<uint>(Field::NONE);
934   get_auto_flags(*col_obj, auto_flags);
935 
936   bool treat_bit_as_char = false;
937   if (field_type == MYSQL_TYPE_BIT)
938     column_options->get("treat_bit_as_char", &treat_bit_as_char);
939 
940   // Collation ID
941   charset = dd_get_mysql_charset(col_obj->collation_id());
942   if (charset == nullptr) {
943     my_printf_error(ER_UNKNOWN_COLLATION,
944                     "invalid collation id %llu for table %s, column %s", MYF(0),
945                     col_obj->collation_id(), share->table_name.str, name);
946     if (thd->is_error()) return true;
947     charset = default_charset_info;
948   }
949 
950   // Decimals
951   if (field_type == MYSQL_TYPE_DECIMAL || field_type == MYSQL_TYPE_NEWDECIMAL)
952     DBUG_ASSERT(col_obj->is_numeric_scale_null() == false);
953 
954   // Read geometry sub type
955   if (field_type == MYSQL_TYPE_GEOMETRY) {
956     uint32 sub_type;
957     column_options->get("geom_type", &sub_type);
958   }
959 
960   // Read values of storage media and column format options
961   if (column_options->exists("storage")) {
962     uint32 option_value = 0;
963     column_options->get("storage", &option_value);
964     field_storage = static_cast<ha_storage_media>(option_value);
965   } else
966     field_storage = HA_SM_DEFAULT;
967 
968   if (column_options->exists("column_format")) {
969     uint32 option_value = 0;
970     column_options->get("column_format", &option_value);
971     field_column_format = static_cast<column_format_type>(option_value);
972   } else
973     field_column_format = COLUMN_FORMAT_TYPE_DEFAULT;
974 
975   // Read Interval TYPELIB
976   TYPELIB *interval = nullptr;
977 
978   if (field_type == MYSQL_TYPE_ENUM || field_type == MYSQL_TYPE_SET) {
979     //
980     // Allocate space for interval (column elements)
981     //
982     size_t interval_parts = col_obj->elements_count();
983 
984     interval = (TYPELIB *)share->mem_root.Alloc(sizeof(TYPELIB));
985     interval->type_names = (const char **)share->mem_root.Alloc(
986         sizeof(char *) * (interval_parts + 1));
987     interval->type_names[interval_parts] = nullptr;
988 
989     interval->type_lengths =
990         (uint *)share->mem_root.Alloc(sizeof(uint) * interval_parts);
991     interval->count = interval_parts;
992     interval->name = nullptr;
993 
994     //
995     // Iterate through all the column elements
996     //
997     for (const dd::Column_type_element *ce : col_obj->elements()) {
998       // Read the enum/set element name
999       dd::String_type element_name = ce->name();
1000 
1001       uint pos = ce->index() - 1;
1002       interval->type_lengths[pos] = static_cast<uint>(element_name.length());
1003       interval->type_names[pos] = strmake_root(
1004           &share->mem_root, element_name.c_str(), element_name.length());
1005     }
1006   }
1007 
1008   //
1009   // Create FIELD
1010   //
1011   reg_field =
1012       make_field(*col_obj, charset, share, rec_pos, null_pos, null_bit_pos);
1013   reg_field->set_field_index(field_nr);
1014   reg_field->stored_in_db = true;
1015 
1016   // Handle generated columns
1017   if (!col_obj->is_generation_expression_null()) {
1018     Value_generator *gcol_info = new (&share->mem_root) Value_generator();
1019 
1020     // Set if GC is virtual or stored
1021     gcol_info->set_field_stored(!col_obj->is_virtual());
1022 
1023     // Read generation expression.
1024     dd::String_type gc_expr = col_obj->generation_expression();
1025 
1026     /*
1027       Place the expression's text into the TABLE_SHARE. Field objects of
1028       TABLE_SHARE only have that. They don't have a corresponding Item,
1029       which will be later created for the Field in TABLE, by
1030       fill_dd_columns_from_create_fields().
1031     */
1032     gcol_info->dup_expr_str(&share->mem_root, gc_expr.c_str(),
1033                             gc_expr.length());
1034     share->vfields++;
1035     reg_field->gcol_info = gcol_info;
1036     reg_field->stored_in_db = gcol_info->get_field_stored();
1037   }
1038 
1039   // Handle default values generated from expression
1040   if (auto_flags & Field::GENERATED_FROM_EXPRESSION) {
1041     Value_generator *default_val_expr =
1042         new (&share->mem_root) Value_generator();
1043 
1044     // DEFAULT GENERATED is always stored
1045     default_val_expr->set_field_stored(true);
1046 
1047     // Read generation expression.
1048     dd::String_type default_val_expr_str = col_obj->default_option();
1049 
1050     // Copy the expression's text into reg_field which is stored on TABLE_SHARE.
1051     default_val_expr->dup_expr_str(&share->mem_root,
1052                                    default_val_expr_str.c_str(),
1053                                    default_val_expr_str.length());
1054     share->gen_def_field_count++;
1055     reg_field->m_default_val_expr = default_val_expr;
1056   }
1057 
1058   if ((auto_flags & Field::NEXT_NUMBER) != 0)
1059     share->found_next_number_field = &share->field[field_nr];
1060 
1061   // Set field flags
1062   if (col_obj->has_no_default()) reg_field->set_flag(NO_DEFAULT_VALUE_FLAG);
1063 
1064   // Set default value or NULL. Reset required for e.g. CHAR.
1065   if (col_obj->is_default_value_null()) {
1066     reg_field->reset();
1067     reg_field->set_null();
1068   } else if (field_type == MYSQL_TYPE_BIT && !treat_bit_as_char &&
1069              (col_obj->char_length() & 7)) {
1070     // For bit fields with leftover bits, copy leftover bits into the preamble.
1071     Field_bit *bitfield = dynamic_cast<Field_bit *>(reg_field);
1072     const uchar leftover_bits =
1073         static_cast<uchar>(*col_obj->default_value()
1074                                 .substr(reg_field->pack_length() - 1, 1)
1075                                 .data());
1076     set_rec_bits(leftover_bits, bitfield->bit_ptr, bitfield->bit_ofs,
1077                  bitfield->bit_len);
1078     // Copy the main part of the bit field data into the record body.
1079     memcpy(rec_pos, col_obj->default_value().data(),
1080            reg_field->pack_length() - 1);
1081   } else {
1082     // For any other field with default data, copy the data into the record.
1083     memcpy(rec_pos, col_obj->default_value().data(), reg_field->pack_length());
1084   }
1085 
1086   reg_field->set_storage_type(field_storage);
1087   reg_field->set_column_format(field_column_format);
1088 
1089   // Comments
1090   dd::String_type comment = col_obj->comment();
1091   reg_field->comment.length = comment.length();
1092   if (reg_field->comment.length) {
1093     reg_field->comment.str =
1094         strmake_root(&share->mem_root, comment.c_str(), comment.length());
1095     reg_field->comment.length = comment.length();
1096   }
1097 
1098   // NOT SECONDARY column option.
1099   if (column_options->exists("not_secondary"))
1100     reg_field->set_flag(NOT_SECONDARY_FLAG);
1101 
1102   reg_field->set_hidden(col_obj->hidden());
1103 
1104   reg_field->m_engine_attribute = LexStringDupRootUnlessEmpty(
1105       &share->mem_root, col_obj->engine_attribute());
1106 
1107   reg_field->m_secondary_engine_attribute = LexStringDupRootUnlessEmpty(
1108       &share->mem_root, col_obj->secondary_engine_attribute());
1109 
1110   // Field is prepared. Store it in 'share'
1111   share->field[field_nr] = reg_field;
1112 
1113   return (false);
1114 }
1115 
1116 /**
1117   Populate TABLE_SHARE::field array according to column metadata
1118   from dd::Table object.
1119 */
1120 
fill_columns_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)1121 static bool fill_columns_from_dd(THD *thd, TABLE_SHARE *share,
1122                                  const dd::Table *tab_obj) {
1123   // Allocate space for fields in TABLE_SHARE.
1124   uint fields_size = ((share->fields + 1) * sizeof(Field *));
1125   share->field = (Field **)share->mem_root.Alloc((uint)fields_size);
1126   memset(share->field, 0, fields_size);
1127   share->vfields = 0;
1128   share->gen_def_field_count = 0;
1129 
1130   // Iterate through all the columns.
1131   uchar *null_flags MY_ATTRIBUTE((unused));
1132   uchar *null_pos, *rec_pos;
1133   null_flags = null_pos = share->default_values;
1134   rec_pos = share->default_values + share->null_bytes;
1135   uint null_bit_pos =
1136       (share->db_create_options & HA_OPTION_PACK_RECORD) ? 0 : 1;
1137   uint field_nr = 0;
1138   bool has_vgc = false;
1139   for (const dd::Column *col_obj : tab_obj->columns()) {
1140     // Skip hidden columns
1141     if (col_obj->is_se_hidden()) continue;
1142 
1143     /*
1144       Fill details of each column.
1145 
1146       Skip virtual generated columns at this point. They reside at the end of
1147       the record, so we need to do separate pass, to evaluate their offsets
1148       correctly.
1149     */
1150     if (!col_obj->is_virtual()) {
1151       if (fill_column_from_dd(thd, share, col_obj, null_pos, null_bit_pos,
1152                               rec_pos, field_nr))
1153         return true;
1154 
1155       rec_pos += share->field[field_nr]->pack_length_in_rec();
1156     } else
1157       has_vgc = true;
1158 
1159     /*
1160       Virtual generated columns still need to be accounted in null bits and
1161       field_nr calculations, since they reside at the normal place in record
1162       preamble and TABLE_SHARE::field array.
1163     */
1164     if ((null_bit_pos += column_preamble_bits(col_obj)) > 7) {
1165       null_pos++;
1166       null_bit_pos -= 8;
1167     }
1168     field_nr++;
1169   }
1170 
1171   if (has_vgc) {
1172     /*
1173       Additional pass to put virtual generated columns at the end of the
1174       record is required.
1175     */
1176     if (share->stored_rec_length >
1177         static_cast<ulong>(rec_pos - share->default_values))
1178       share->stored_rec_length = (rec_pos - share->default_values);
1179 
1180     null_pos = share->default_values;
1181     null_bit_pos = (share->db_create_options & HA_OPTION_PACK_RECORD) ? 0 : 1;
1182     field_nr = 0;
1183 
1184     for (const dd::Column *col_obj2 : tab_obj->columns()) {
1185       // Skip hidden columns
1186       if (col_obj2->is_se_hidden()) continue;
1187 
1188       if (col_obj2->is_virtual()) {
1189         // Fill details of each column.
1190         if (fill_column_from_dd(thd, share, col_obj2, null_pos, null_bit_pos,
1191                                 rec_pos, field_nr))
1192           return true;
1193 
1194         rec_pos += share->field[field_nr]->pack_length_in_rec();
1195       }
1196 
1197       /*
1198         Account for all columns while evaluating null_pos/null_bit_pos and
1199         field_nr.
1200       */
1201       if ((null_bit_pos += column_preamble_bits(col_obj2)) > 7) {
1202         null_pos++;
1203         null_bit_pos -= 8;
1204       }
1205       field_nr++;
1206     }
1207   }
1208 
1209   // Make sure the scan of the columns is consistent with other data.
1210   DBUG_ASSERT(share->null_bytes ==
1211               (null_pos - null_flags + (null_bit_pos + 7) / 8));
1212   DBUG_ASSERT(share->last_null_bit_pos == null_bit_pos);
1213   DBUG_ASSERT(share->fields == field_nr);
1214 
1215   return (false);
1216 }
1217 
1218 /** Fill KEY_INFO_PART from dd::Index_element object. */
fill_index_element_from_dd(TABLE_SHARE * share,const dd::Index_element * idx_elem_obj,KEY_PART_INFO * keypart)1219 static void fill_index_element_from_dd(TABLE_SHARE *share,
1220                                        const dd::Index_element *idx_elem_obj,
1221                                        KEY_PART_INFO *keypart) {
1222   //
1223   // Read index element details
1224   //
1225 
1226   keypart->length = idx_elem_obj->length();
1227   keypart->store_length = keypart->length;
1228 
1229   // fieldnr
1230   keypart->fieldnr = idx_elem_obj->column().ordinal_position();
1231 
1232   // field
1233   DBUG_ASSERT(keypart->fieldnr > 0);
1234   Field *field = keypart->field = share->field[keypart->fieldnr - 1];
1235 
1236   // offset
1237   keypart->offset = field->offset(share->default_values);
1238 
1239   // key type
1240   keypart->bin_cmp = ((field->real_type() != MYSQL_TYPE_VARCHAR &&
1241                        field->real_type() != MYSQL_TYPE_STRING) ||
1242                       (field->charset()->state & MY_CS_BINSORT));
1243   //
1244   // Read index order
1245   //
1246 
1247   // key part order
1248   if (idx_elem_obj->order() == dd::Index_element::ORDER_DESC)
1249     keypart->key_part_flag |= HA_REVERSE_SORT;
1250 
1251   // key_part->field=   (Field*) 0; // Will be fixed later
1252 }
1253 
1254 /** Fill KEY::key_part array according to metadata from dd::Index object. */
fill_index_elements_from_dd(TABLE_SHARE * share,const dd::Index * idx_obj,int key_nr)1255 static void fill_index_elements_from_dd(TABLE_SHARE *share,
1256                                         const dd::Index *idx_obj, int key_nr) {
1257   //
1258   // Iterate through all index elements
1259   //
1260 
1261   uint i = 0;
1262   KEY *keyinfo = share->key_info + key_nr;
1263   for (const dd::Index_element *idx_elem_obj : idx_obj->elements()) {
1264     // Skip hidden index elements
1265     if (idx_elem_obj->is_hidden()) continue;
1266 
1267     //
1268     // Read index element details
1269     //
1270 
1271     fill_index_element_from_dd(share, idx_elem_obj, keyinfo->key_part + i);
1272     if (keyinfo->key_part[i].field->is_array())
1273       keyinfo->flags |= HA_MULTI_VALUED_KEY;
1274     i++;
1275   }
1276 }
1277 
1278 /**
1279   Add KEY constructed according to index metadata from dd::Index object to
1280   the TABLE_SHARE.
1281 */
1282 
fill_index_from_dd(THD * thd,TABLE_SHARE * share,const dd::Index * idx_obj,uint key_nr)1283 static bool fill_index_from_dd(THD *thd, TABLE_SHARE *share,
1284                                const dd::Index *idx_obj, uint key_nr) {
1285   //
1286   // Read index details
1287   //
1288 
1289   // Get the keyinfo that we will prepare now
1290   KEY *keyinfo = share->key_info + key_nr;
1291 
1292   // Read index name
1293   const dd::String_type &name = idx_obj->name();
1294   if (!name.empty()) {
1295     if (name.length()) {
1296       keyinfo->name =
1297           strmake_root(&share->mem_root, name.c_str(), name.length());
1298       share->keynames.type_names[key_nr] = keyinfo->name;  // Post processing ??
1299     } else
1300       share->keynames.type_names[key_nr] = nullptr;
1301     // share->keynames.count= key_nr+1;
1302   }
1303 
1304   // Index algorithm
1305   keyinfo->algorithm = dd_get_old_index_algorithm_type(idx_obj->algorithm());
1306   keyinfo->is_algorithm_explicit = idx_obj->is_algorithm_explicit();
1307 
1308   // Visibility
1309   keyinfo->is_visible = idx_obj->is_visible();
1310 
1311   // user defined key parts
1312   keyinfo->user_defined_key_parts = 0;
1313   for (const dd::Index_element *idx_ele : idx_obj->elements()) {
1314     // Skip hidden index elements
1315     if (!idx_ele->is_hidden()) keyinfo->user_defined_key_parts++;
1316   }
1317 
1318   // flags
1319   switch (idx_obj->type()) {
1320     case dd::Index::IT_MULTIPLE:
1321       keyinfo->flags = 0;
1322       break;
1323     case dd::Index::IT_FULLTEXT:
1324       keyinfo->flags = HA_FULLTEXT;
1325       break;
1326     case dd::Index::IT_SPATIAL:
1327       keyinfo->flags = HA_SPATIAL;
1328       break;
1329     case dd::Index::IT_PRIMARY:
1330     case dd::Index::IT_UNIQUE:
1331       keyinfo->flags = HA_NOSAME;
1332       break;
1333     default:
1334       DBUG_ASSERT(0); /* purecov: deadcode */
1335       keyinfo->flags = 0;
1336       break;
1337   }
1338 
1339   if (idx_obj->is_generated()) keyinfo->flags |= HA_GENERATED_KEY;
1340 
1341   /*
1342     The remaining important SQL-layer flags are set later - either we directly
1343     store and read them from DD (HA_PACK_KEY, HA_BINARY_PACK_KEY), or calculate
1344     while handling other key options (HA_USES_COMMENT, HA_USES_PARSER,
1345     HA_USES_BLOCK_SIZE), or during post-processing step (HA_NULL_PART_KEY).
1346   */
1347 
1348   // key length
1349   keyinfo->key_length = 0;
1350   for (const dd::Index_element *idx_elem : idx_obj->elements()) {
1351     // Skip hidden index elements
1352     if (!idx_elem->is_hidden()) keyinfo->key_length += idx_elem->length();
1353   }
1354 
1355   //
1356   // Read index options
1357   //
1358 
1359   const dd::Properties &idx_options = idx_obj->options();
1360 
1361   /*
1362     Restore flags indicating that key packing optimization was suggested to SE.
1363     See fill_dd_indexes_for_keyinfo() for explanation why we store these flags
1364     explicitly.
1365   */
1366   uint32 stored_flags = 0;
1367   idx_options.get("flags", &stored_flags);
1368   DBUG_ASSERT((stored_flags & ~(HA_PACK_KEY | HA_BINARY_PACK_KEY)) == 0);
1369 
1370   //  Beginning in 8.0.12 HA_PACK_KEY and HA_BINARY_PACK_KEY are no longer set
1371   //  if the SE does not support it. If the index was created prior to 8.0.12
1372   //  these bits may be set in the flags option, and when being added to
1373   //  key_info->flags, the ALTER algorithm analysis will be broken because the
1374   //  intermediate table is created without these flags, and hence, the
1375   //  analysis will incorrectly conclude that all indexes have changed.
1376   //
1377   //  To workaround this issue, we remove the flags below, depending on the SE
1378   //  capabilities, when preparing the table share. Thus, if we ALTER the table
1379   //  at a later stage, indexes not being touched by the ALTER statement will
1380   //  have the same flags both in the source table and the intermediate table,
1381   //  and hence, the algorithm analysis will come to the right conclusion.
1382   if (ha_check_storage_engine_flag(share->db_type(),
1383                                    HTON_SUPPORTS_PACKED_KEYS) == 0) {
1384     // Given the assert above, we could just have set stored_flags to 0 here,
1385     // but keep it like this in case new flags are added.
1386     stored_flags &= ~(HA_PACK_KEY | HA_BINARY_PACK_KEY);
1387   }
1388   keyinfo->flags |= stored_flags;
1389 
1390   // Block size
1391   if (idx_options.exists("block_size")) {
1392     idx_options.get("block_size", &keyinfo->block_size);
1393 
1394     DBUG_ASSERT(keyinfo->block_size);
1395 
1396     keyinfo->flags |= HA_USES_BLOCK_SIZE;
1397   }
1398 
1399   // Read field parser
1400   if (idx_options.exists("parser_name")) {
1401     LEX_CSTRING parser_name;
1402     if (idx_options.get("parser_name", &parser_name, &share->mem_root))
1403       DBUG_ASSERT(false);
1404 
1405     keyinfo->parser =
1406         my_plugin_lock_by_name(nullptr, parser_name, MYSQL_FTPARSER_PLUGIN);
1407     if (!keyinfo->parser) {
1408       my_error(ER_PLUGIN_IS_NOT_LOADED, MYF(0), parser_name.str);
1409       if (thd->is_error()) return true;
1410     }
1411 
1412     keyinfo->flags |= HA_USES_PARSER;
1413   }
1414 
1415   // Read comment
1416   dd::String_type comment = idx_obj->comment();
1417   keyinfo->comment.length = comment.length();
1418 
1419   if (keyinfo->comment.length) {
1420     keyinfo->comment.str =
1421         strmake_root(&share->mem_root, comment.c_str(), comment.length());
1422     keyinfo->comment.length = comment.length();
1423 
1424     keyinfo->flags |= HA_USES_COMMENT;
1425   }
1426   keyinfo->engine_attribute = LexStringDupRootUnlessEmpty(
1427       &share->mem_root, idx_obj->engine_attribute());
1428   if (keyinfo->engine_attribute.length > 0)
1429     keyinfo->flags |= HA_INDEX_USES_ENGINE_ATTRIBUTE;
1430 
1431   keyinfo->secondary_engine_attribute = LexStringDupRootUnlessEmpty(
1432       &share->mem_root, idx_obj->secondary_engine_attribute());
1433   if (keyinfo->secondary_engine_attribute.length > 0)
1434     keyinfo->flags |= HA_INDEX_USES_SECONDARY_ENGINE_ATTRIBUTE;
1435   return (false);
1436 }
1437 
1438 /**
1439   Check if this is a spatial index that can be used. That is, if there is
1440   a spatial index on a geometry column without the SRID specified, we will
1441   hide the index so that the optimizer won't consider the index during
1442   optimization/execution.
1443 
1444   @param index The index to verify
1445 
1446   @retval true if the index is an usable spatial index, or if it isn't a
1447           spatial index.
1448   @retval false if the index is a spatial index on a geometry column without
1449           an SRID specified.
1450 */
is_spatial_index_usable(const dd::Index & index)1451 static bool is_spatial_index_usable(const dd::Index &index) {
1452   if (index.type() == dd::Index::IT_SPATIAL) {
1453     /*
1454       We have already checked for hidden indexes before we get here. But we
1455       still play safe since the check is very cheap.
1456     */
1457     if (index.is_hidden()) return false; /* purecov: deadcode */
1458 
1459     // Check that none of the parts references a column with SRID == NULL
1460     for (const auto element : index.elements()) {
1461       if (!element->is_hidden() && !element->column().srs_id().has_value())
1462         return false;
1463     }
1464   }
1465 
1466   return true;
1467 }
1468 
1469 /**
1470   Fill TABLE_SHARE::key_info array according to index metadata
1471   from dd::Table object.
1472 */
1473 
fill_indexes_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)1474 static bool fill_indexes_from_dd(THD *thd, TABLE_SHARE *share,
1475                                  const dd::Table *tab_obj) {
1476   share->keys_for_keyread.init(0);
1477   share->keys_in_use.init();
1478   share->visible_indexes.init();
1479 
1480   uint32 primary_key_parts = 0;
1481 
1482   bool use_extended_sk = ha_check_storage_engine_flag(
1483       share->db_type(), HTON_SUPPORTS_EXTENDED_KEYS);
1484 
1485   // Count number of keys and total number of key parts in the table.
1486 
1487   DBUG_ASSERT(share->keys == 0 && share->key_parts == 0);
1488 
1489   for (const dd::Index *idx_obj : tab_obj->indexes()) {
1490     // Skip hidden indexes
1491     if (idx_obj->is_hidden()) continue;
1492 
1493     share->keys++;
1494     uint key_parts = 0;
1495     for (const dd::Index_element *idx_ele : idx_obj->elements()) {
1496       // Skip hidden index elements
1497       if (!idx_ele->is_hidden()) key_parts++;
1498     }
1499     share->key_parts += key_parts;
1500 
1501     // Primary key (or candidate key replacing it) is always first if exists.
1502     // If such key doesn't exist (e.g. there are no unique keys in the table)
1503     // we will simply waste some memory.
1504     if (idx_obj->ordinal_position() == 1) primary_key_parts = key_parts;
1505   }
1506 
1507   // Allocate and fill KEY objects.
1508   if (share->keys) {
1509     KEY_PART_INFO *key_part;
1510     ulong *rec_per_key;
1511     rec_per_key_t *rec_per_key_float;
1512     uint total_key_parts = share->key_parts;
1513 
1514     if (use_extended_sk)
1515       total_key_parts += (primary_key_parts * (share->keys - 1));
1516 
1517     //
1518     // Alloc rec_per_key buffer
1519     //
1520     if (!(rec_per_key =
1521               (ulong *)share->mem_root.Alloc(total_key_parts * sizeof(ulong))))
1522       return true; /* purecov: inspected */
1523 
1524     //
1525     // Alloc rec_per_key_float buffer
1526     //
1527     if (!(rec_per_key_float = (rec_per_key_t *)share->mem_root.Alloc(
1528               total_key_parts * sizeof(rec_per_key_t))))
1529       return true; /* purecov: inspected */
1530 
1531     //
1532     // Alloc buffer to hold keys and key_parts
1533     //
1534 
1535     if (!(share->key_info = (KEY *)share->mem_root.Alloc(
1536               share->keys * sizeof(KEY) +
1537               total_key_parts * sizeof(KEY_PART_INFO))))
1538       return true; /* purecov: inspected */
1539 
1540     memset(
1541         share->key_info, 0,
1542         (share->keys * sizeof(KEY) + total_key_parts * sizeof(KEY_PART_INFO)));
1543     key_part = (KEY_PART_INFO *)(share->key_info + share->keys);
1544 
1545     //
1546     // Alloc buffer to hold keynames
1547     //
1548 
1549     if (!(share->keynames.type_names = (const char **)share->mem_root.Alloc(
1550               (share->keys + 1) * sizeof(char *))))
1551       return true; /* purecov: inspected */
1552     memset(share->keynames.type_names, 0, ((share->keys + 1) * sizeof(char *)));
1553 
1554     share->keynames.type_names[share->keys] = nullptr;
1555     share->keynames.count = share->keys;
1556 
1557     // In first iteration get all the index_obj, so that we get all
1558     // user_defined_key_parts for each key. This is required to propertly
1559     // allocation key_part memory for keys.
1560     const dd::Index *index_at_pos[MAX_INDEXES];
1561     uint key_nr = 0;
1562     for (const dd::Index *idx_obj : tab_obj->indexes()) {
1563       // Skip hidden indexes
1564       if (idx_obj->is_hidden()) continue;
1565 
1566       if (fill_index_from_dd(thd, share, idx_obj, key_nr)) return true;
1567 
1568       index_at_pos[key_nr] = idx_obj;
1569 
1570       share->keys_in_use.set_bit(key_nr);
1571 
1572       if (idx_obj->is_visible() && is_spatial_index_usable(*idx_obj))
1573         share->visible_indexes.set_bit(key_nr);
1574 
1575       key_nr++;
1576     }
1577 
1578     // Update keyparts now
1579     key_nr = 0;
1580     do {
1581       // Assign the key_part_info buffer
1582       KEY *keyinfo = &share->key_info[key_nr];
1583       keyinfo->key_part = key_part;
1584       keyinfo->set_rec_per_key_array(rec_per_key, rec_per_key_float);
1585       keyinfo->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1586 
1587       fill_index_elements_from_dd(share, index_at_pos[key_nr], key_nr);
1588 
1589       key_part += keyinfo->user_defined_key_parts;
1590       rec_per_key += keyinfo->user_defined_key_parts;
1591       rec_per_key_float += keyinfo->user_defined_key_parts;
1592 
1593       // Post processing code ?
1594       /*
1595         Add PK parts if engine supports PK extension for secondary keys.
1596         Atm it works for Innodb only. Here we add unique first key parts
1597         to the end of secondary key parts array and increase actual number
1598         of key parts. Note that primary key is always first if exists.
1599         Later if there is no PK in the table then number of actual keys parts
1600         is set to user defined key parts.
1601         KEY::actual_flags can't be set until we fully set-up KEY::flags.
1602       */
1603       keyinfo->actual_key_parts = keyinfo->user_defined_key_parts;
1604       if (use_extended_sk && key_nr && !(keyinfo->flags & HA_NOSAME)) {
1605         keyinfo->unused_key_parts = primary_key_parts;
1606         key_part += primary_key_parts;
1607         rec_per_key += primary_key_parts;
1608         rec_per_key_float += primary_key_parts;
1609         share->key_parts += primary_key_parts;
1610       }
1611 
1612       // Initialize the rec per key arrays
1613       for (uint kp = 0; kp < keyinfo->actual_key_parts; ++kp) {
1614         keyinfo->rec_per_key[kp] = 0;
1615         keyinfo->set_records_per_key(kp, REC_PER_KEY_UNKNOWN);
1616       }
1617 
1618       key_nr++;
1619     } while (key_nr < share->keys);
1620   }
1621 
1622   return (false);
1623 }
1624 
copy_option_string(MEM_ROOT * mem_root,const dd::Properties & options,const dd::String_type & key)1625 static char *copy_option_string(MEM_ROOT *mem_root,
1626                                 const dd::Properties &options,
1627                                 const dd::String_type &key) {
1628   dd::String_type tmp_str;
1629   if (options.exists(key) && !options.get(key, &tmp_str) && tmp_str.length()) {
1630     return strdup_root(mem_root, tmp_str.c_str());
1631   }
1632   return nullptr;
1633 }
1634 
get_partition_options(MEM_ROOT * mem_root,partition_element * part_elem,const dd::Properties & part_options)1635 static void get_partition_options(MEM_ROOT *mem_root,
1636                                   partition_element *part_elem,
1637                                   const dd::Properties &part_options) {
1638   if (part_options.exists("max_rows"))
1639     part_options.get("max_rows", &part_elem->part_max_rows);
1640 
1641   if (part_options.exists("min_rows"))
1642     part_options.get("min_rows", &part_elem->part_min_rows);
1643 
1644   part_elem->data_file_name =
1645       copy_option_string(mem_root, part_options, "data_file_name");
1646   part_elem->index_file_name =
1647       copy_option_string(mem_root, part_options, "index_file_name");
1648 
1649   uint32 nodegroup_id = UNDEF_NODEGROUP;
1650   if (part_options.exists("nodegroup_id"))
1651     part_options.get("nodegroup_id", &nodegroup_id);
1652 
1653   DBUG_ASSERT(nodegroup_id <= 0xFFFF);
1654   part_elem->nodegroup_id = nodegroup_id;
1655 }
1656 
get_part_column_values(MEM_ROOT * mem_root,partition_info * part_info,partition_element * part_elem,const dd::Partition * part_obj)1657 static bool get_part_column_values(MEM_ROOT *mem_root,
1658                                    partition_info *part_info,
1659                                    partition_element *part_elem,
1660                                    const dd::Partition *part_obj) {
1661   part_elem_value *p_elem_values, *p_val;
1662   part_column_list_val *col_val_array, *col_vals;
1663   uint list_index = 0, entries = 0;
1664   uint max_column_id = 0, max_list_index = 0;
1665 
1666   for (const dd::Partition_value *part_value : part_obj->values()) {
1667     max_column_id = std::max(max_column_id, part_value->column_num());
1668     max_list_index = std::max(max_list_index, part_value->list_num());
1669     entries++;
1670   }
1671   if (entries != ((max_column_id + 1) * (max_list_index + 1))) {
1672     DBUG_ASSERT(0); /* purecov: deadcode */
1673     return true;
1674   }
1675 
1676   part_info->num_columns = max_column_id + 1;
1677 
1678   if (!multi_alloc_root(mem_root, &p_elem_values,
1679                         sizeof(*p_elem_values) * (max_list_index + 1),
1680                         &col_val_array,
1681                         sizeof(*col_val_array) * part_info->num_columns *
1682                             (max_list_index + 1),
1683                         NULL)) {
1684     return true; /* purecov: inspected */
1685   }
1686   memset(p_elem_values, 0, sizeof(*p_elem_values) * (max_list_index + 1));
1687   memset(
1688       col_val_array, 0,
1689       sizeof(*col_val_array) * part_info->num_columns * (max_list_index + 1));
1690   for (list_index = 0; list_index <= max_list_index; list_index++) {
1691     p_val = &p_elem_values[list_index];
1692     p_val->added_items = 1;
1693     p_val->col_val_array = &col_val_array[list_index * part_info->num_columns];
1694   }
1695 
1696   for (const dd::Partition_value *part_value : part_obj->values()) {
1697     p_val = &p_elem_values[part_value->list_num()];
1698     col_vals = p_val->col_val_array;
1699     if (part_value->is_value_null()) {
1700       col_vals[part_value->column_num()].null_value = true;
1701     } else if (part_value->max_value()) {
1702       col_vals[part_value->column_num()].max_value = true;
1703     } else {
1704       // TODO-PARTITION: Perhaps allocate on the heap instead and when the first
1705       // table instance is opened, free it and add the field image instead?
1706       // That way it can be reused for all other table instances.
1707       col_vals[part_value->column_num()].column_value.value_str =
1708           strmake_root(mem_root, part_value->value_utf8().c_str(),
1709                        part_value->value_utf8().length());
1710     }
1711   }
1712 
1713   for (list_index = 0; list_index <= max_list_index; list_index++) {
1714     p_val = &p_elem_values[list_index];
1715 #ifndef DBUG_OFF
1716     for (uint i = 0; i < part_info->num_columns; i++) {
1717       DBUG_ASSERT(p_val->col_val_array[i].null_value ||
1718                   p_val->col_val_array[i].max_value ||
1719                   p_val->col_val_array[i].column_value.value_str);
1720     }
1721 #endif
1722     if (part_elem->list_val_list.push_back(p_val, mem_root)) return true;
1723   }
1724 
1725   return false;
1726 }
1727 
setup_partition_from_dd(THD * thd,MEM_ROOT * mem_root,partition_info * part_info,partition_element * part_elem,const dd::Partition * part_obj,bool is_subpart)1728 static bool setup_partition_from_dd(THD *thd, MEM_ROOT *mem_root,
1729                                     partition_info *part_info,
1730                                     partition_element *part_elem,
1731                                     const dd::Partition *part_obj,
1732                                     bool is_subpart) {
1733   dd::String_type comment = part_obj->comment();
1734   if (comment.length()) {
1735     part_elem->part_comment = strdup_root(mem_root, comment.c_str());
1736     if (!part_elem->part_comment) return true;
1737   }
1738   part_elem->partition_name = strdup_root(mem_root, part_obj->name().c_str());
1739   if (!part_elem->partition_name) return true;
1740 
1741   part_elem->engine_type = part_info->default_engine_type;
1742 
1743   get_partition_options(mem_root, part_elem, part_obj->options());
1744 
1745   // Read tablespace name.
1746   if (dd::get_tablespace_name<dd::Partition>(
1747           thd, part_obj, &part_elem->tablespace_name, mem_root))
1748     return true;
1749 
1750   if (is_subpart) {
1751     /* Only HASH/KEY subpartitioning allowed, no values allowed, so return! */
1752     return false;
1753   }
1754   // Iterate over all possible values
1755   if (part_info->part_type == partition_type::RANGE) {
1756     if (part_info->column_list) {
1757       if (get_part_column_values(mem_root, part_info, part_elem, part_obj))
1758         return true;
1759     } else {
1760       DBUG_ASSERT(part_obj->values().size() == 1);
1761       const dd::Partition_value *part_value = *part_obj->values().begin();
1762       DBUG_ASSERT(part_value->list_num() == 0);
1763       DBUG_ASSERT(part_value->column_num() == 0);
1764       if (part_value->max_value()) {
1765         part_elem->max_value = true;
1766       } else {
1767         if (part_value->value_utf8()[0] == '-') {
1768           part_elem->signed_flag = true;
1769           if (dd::Properties::from_str(part_value->value_utf8(),
1770                                        &part_elem->range_value)) {
1771             return true;
1772           }
1773         } else {
1774           part_elem->signed_flag = false;
1775           if (dd::Properties::from_str(part_value->value_utf8(),
1776                                        (ulonglong *)&part_elem->range_value)) {
1777             return true;
1778           }
1779         }
1780       }
1781     }
1782   } else if (part_info->part_type == partition_type::LIST) {
1783     if (part_info->column_list) {
1784       if (get_part_column_values(mem_root, part_info, part_elem, part_obj))
1785         return true;
1786     } else {
1787       uint list_index = 0, max_index = 0, entries = 0, null_entry = 0;
1788       part_elem_value *list_val, *list_val_array = nullptr;
1789       for (const dd::Partition_value *part_value : part_obj->values()) {
1790         max_index = std::max(max_index, part_value->list_num());
1791         entries++;
1792         if (part_value->value_utf8().empty()) {
1793           DBUG_ASSERT(!part_elem->has_null_value);
1794           part_elem->has_null_value = true;
1795           null_entry = part_value->list_num();
1796         }
1797       }
1798       if (entries != (max_index + 1)) {
1799         DBUG_ASSERT(0); /* purecov: deadcode */
1800         return true;
1801       }
1802       /* If a list entry is NULL then it is only flagged on the part_elem. */
1803       if (part_elem->has_null_value) entries--;
1804 
1805       if (entries) {
1806         list_val_array = (part_elem_value *)mem_root->Alloc(
1807             sizeof(*list_val_array) * entries);
1808         if (!list_val_array) return true;
1809         memset(list_val_array, 0, sizeof(*list_val_array) * entries);
1810       }
1811 
1812       for (const dd::Partition_value *part_value : part_obj->values()) {
1813         DBUG_ASSERT(part_value->column_num() == 0);
1814         if (part_value->value_utf8().empty()) {
1815           DBUG_ASSERT(part_value->list_num() == null_entry);
1816           continue;
1817         }
1818         list_index = part_value->list_num();
1819         /*
1820           If there is a NULL value in the partition values in the DD it is
1821           marked directly on the partition_element and should not have an own
1822           list_val. So compact the list_index range by remove the list_index for
1823           the null_entry.
1824         */
1825         if (part_elem->has_null_value && list_index > null_entry) list_index--;
1826         list_val = &list_val_array[list_index];
1827         DBUG_ASSERT(!list_val->unsigned_flag && !list_val->value);
1828         if (part_value->value_utf8()[0] == '-') {
1829           list_val->unsigned_flag = false;
1830           if (dd::Properties::from_str(part_value->value_utf8(),
1831                                        &list_val->value))
1832             return true;
1833         } else {
1834           list_val->unsigned_flag = true;
1835           if (dd::Properties::from_str(part_value->value_utf8(),
1836                                        (ulonglong *)&list_val->value))
1837             return true;
1838         }
1839       }
1840       for (uint i = 0; i < entries; i++) {
1841         if (part_elem->list_val_list.push_back(&list_val_array[i], mem_root))
1842           return true;
1843       }
1844     }
1845   } else {
1846 #ifndef DBUG_OFF
1847     DBUG_ASSERT(part_info->part_type == partition_type::HASH);
1848     DBUG_ASSERT(part_obj->values().empty());
1849 #endif
1850   }
1851   return false;
1852 }
1853 
1854 /**
1855   Set field_list
1856 
1857   To append each field to the field_list it will parse the
1858   submitted partition_expression string.
1859 
1860   Must be in sync with get_field_list_str!
1861 
1862   @param[in]     mem_root   Where to allocate the memory for the list entries.
1863   @param[in]     str        String object containing the column names.
1864   @param[in,out] field_list List to add field names to.
1865 
1866   @return false on success, else true.
1867 */
1868 
set_field_list(MEM_ROOT * mem_root,dd::String_type & str,List<char> * field_list)1869 static bool set_field_list(MEM_ROOT *mem_root, dd::String_type &str,
1870                            List<char> *field_list) {
1871   dd::String_type field_name;
1872   dd::String_type::const_iterator it(str.begin());
1873   dd::String_type::const_iterator end(str.end());
1874 
1875   while (it != end) {
1876     if (dd::eat_str(field_name, it, end, dd::FIELD_NAME_SEPARATOR_CHAR))
1877       return true;
1878     size_t len = field_name.length();
1879     DBUG_ASSERT(len);
1880     char *name = static_cast<char *>(mem_root->Alloc(len + 1));
1881     if (!name) return true; /* purecov: inspected */
1882     memcpy(name, field_name.c_str(), len);
1883     name[len] = '\0';
1884 
1885     if (field_list->push_back(name, mem_root)) return true;
1886   }
1887   return false;
1888 }
1889 
1890 /**
1891   Fill TABLE_SHARE with partitioning details from dd::Partition.
1892 
1893   @details
1894   Set up as much as possible to ease creating new TABLE instances
1895   by copying from the TABLE_SHARE.
1896 
1897   Also to prevent future memory duplication partition definitions (names etc)
1898   are stored on the TABLE_SHARE and can be referenced from each TABLE instance.
1899 
1900   Note that [sub]part_expr still needs to be parsed from
1901   [sub]part_func_string for each TABLE instance to use the correct
1902   mem_root etc. To be as compatible with the .frm way to open a table
1903   as possible we currently generate the full partitioning clause which
1904   will be parsed for each new TABLE instance.
1905   TODO-PARTITION:
1906   - Create a way to handle Item expressions to be shared/copied
1907     from the TABLE_SHARE.
1908   - On the open of the first TABLE instance, copy the field images
1909     to the TABLE_SHARE::partition_info for each partition value.
1910 
1911   @param thd      Thread context.
1912   @param share    Share to be updated with partitioning details.
1913   @param tab_obj  dd::Table object to get partition info from.
1914 
1915   @return false if success, else true.
1916 */
1917 
fill_partitioning_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)1918 static bool fill_partitioning_from_dd(THD *thd, TABLE_SHARE *share,
1919                                       const dd::Table *tab_obj) {
1920   if (tab_obj->partition_type() == dd::Table::PT_NONE) return false;
1921 
1922   // The DD only has information about how the table is partitioned in
1923   // the primary storage engine, so don't use this information for
1924   // tables in a secondary storage engine.
1925   if (share->is_secondary_engine()) return false;
1926 
1927   partition_info *part_info;
1928   part_info = new (&share->mem_root) partition_info;
1929 
1930   handlerton *hton = plugin_data<handlerton *>(
1931       ha_resolve_by_name_raw(thd, lex_cstring_handle(tab_obj->engine())));
1932   DBUG_ASSERT(hton && ha_storage_engine_is_enabled(hton));
1933   part_info->default_engine_type = hton;
1934   if (!part_info->default_engine_type) return true;
1935 
1936   // TODO-PARTITION: change partition_info::part_type to same enum as below :)
1937   switch (tab_obj->partition_type()) {
1938     case dd::Table::PT_RANGE_COLUMNS:
1939       part_info->column_list = true;
1940       part_info->list_of_part_fields = true;
1941       // Fall through.
1942     case dd::Table::PT_RANGE:
1943       part_info->part_type = partition_type::RANGE;
1944       break;
1945     case dd::Table::PT_LIST_COLUMNS:
1946       part_info->column_list = true;
1947       part_info->list_of_part_fields = true;
1948       // Fall through.
1949     case dd::Table::PT_LIST:
1950       part_info->part_type = partition_type::LIST;
1951       break;
1952     case dd::Table::PT_LINEAR_HASH:
1953       part_info->linear_hash_ind = true;
1954       // Fall through.
1955     case dd::Table::PT_HASH:
1956       part_info->part_type = partition_type::HASH;
1957       break;
1958     case dd::Table::PT_LINEAR_KEY_51:
1959       part_info->linear_hash_ind = true;
1960       // Fall through.
1961     case dd::Table::PT_KEY_51:
1962       part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_51;
1963       part_info->list_of_part_fields = true;
1964       part_info->part_type = partition_type::HASH;
1965       break;
1966     case dd::Table::PT_LINEAR_KEY_55:
1967       part_info->linear_hash_ind = true;
1968       // Fall through.
1969     case dd::Table::PT_KEY_55:
1970       part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_55;
1971       part_info->list_of_part_fields = true;
1972       part_info->part_type = partition_type::HASH;
1973       break;
1974     case dd::Table::PT_AUTO_LINEAR:
1975       part_info->linear_hash_ind = true;
1976       // Fall through.
1977     case dd::Table::PT_AUTO:
1978       part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_55;
1979       part_info->part_type = partition_type::HASH;
1980       part_info->list_of_part_fields = true;
1981       part_info->is_auto_partitioned = true;
1982       share->auto_partitioned = true;
1983       break;
1984     default:
1985       // Unknown partitioning type!
1986       DBUG_ASSERT(0); /* purecov: deadcode */
1987       return true;
1988   }
1989   switch (tab_obj->subpartition_type()) {
1990     case dd::Table::ST_NONE:
1991       part_info->subpart_type = partition_type::NONE;
1992       break;
1993     case dd::Table::ST_LINEAR_HASH:
1994       part_info->linear_hash_ind = true;
1995       // Fall through.
1996     case dd::Table::ST_HASH:
1997       part_info->subpart_type = partition_type::HASH;
1998       break;
1999     case dd::Table::ST_LINEAR_KEY_51:
2000       part_info->linear_hash_ind = true;
2001       // Fall through.
2002     case dd::Table::ST_KEY_51:
2003       part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_51;
2004       part_info->list_of_subpart_fields = true;
2005       part_info->subpart_type = partition_type::HASH;
2006       break;
2007     case dd::Table::ST_LINEAR_KEY_55:
2008       part_info->linear_hash_ind = true;
2009       // Fall through.
2010     case dd::Table::ST_KEY_55:
2011       part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_55;
2012       part_info->list_of_subpart_fields = true;
2013       part_info->subpart_type = partition_type::HASH;
2014       break;
2015     default:
2016       // Unknown sub partitioning type!
2017       DBUG_ASSERT(0); /* purecov: deadcode */
2018       return true;
2019   }
2020 
2021   dd::String_type part_expr = tab_obj->partition_expression();
2022   if (part_info->list_of_part_fields) {
2023     if (set_field_list(&share->mem_root, part_expr,
2024                        &part_info->part_field_list)) {
2025       return true;
2026     }
2027     part_info->part_func_string = nullptr;
2028     part_info->part_func_len = 0;
2029   } else {
2030     part_info->part_func_string =
2031         strdup_root(&share->mem_root, part_expr.c_str());
2032     part_info->part_func_len = part_expr.length();
2033   }
2034   dd::String_type subpart_expr = tab_obj->subpartition_expression();
2035   part_info->subpart_func_len = subpart_expr.length();
2036   if (part_info->subpart_func_len) {
2037     if (part_info->list_of_subpart_fields) {
2038       if (set_field_list(&share->mem_root, subpart_expr,
2039                          &part_info->subpart_field_list)) {
2040         return true;
2041       }
2042       part_info->subpart_func_string = nullptr;
2043       part_info->subpart_func_len = 0;
2044     } else {
2045       part_info->subpart_func_string =
2046           strdup_root(&share->mem_root, subpart_expr.c_str());
2047     }
2048   }
2049 
2050   //
2051   // Iterate through all the partitions
2052   //
2053 
2054   partition_element *curr_part_elem;
2055   List_iterator<partition_element> part_elem_it;
2056 
2057   /* Partitions are sorted first on level and then on number. */
2058 
2059 #ifndef DBUG_OFF
2060   uint number = 0;
2061 #endif
2062   for (const dd::Partition *part_obj : tab_obj->partitions()) {
2063 #ifndef DBUG_OFF
2064     /* Must be in sorted order (sorted by level first and then on number). */
2065     DBUG_ASSERT(part_obj->number() >= number);
2066     number = part_obj->number();
2067 #endif
2068 
2069     DBUG_ASSERT(part_obj->parent_partition_id() == dd::INVALID_OBJECT_ID);
2070 
2071     curr_part_elem = new (&share->mem_root) partition_element;
2072     if (!curr_part_elem) {
2073       return true;
2074     }
2075 
2076     if (setup_partition_from_dd(thd, &share->mem_root, part_info,
2077                                 curr_part_elem, part_obj, false)) {
2078       return true;
2079     }
2080 
2081     if (part_info->partitions.push_back(curr_part_elem, &share->mem_root))
2082       return true;
2083 
2084     for (const dd::Partition *sub_part_obj : part_obj->subpartitions()) {
2085       partition_element *curr_sub_part_elem =
2086           new (&share->mem_root) partition_element;
2087       if (!curr_sub_part_elem) {
2088         return true;
2089       }
2090 
2091       if (setup_partition_from_dd(thd, &share->mem_root, part_info,
2092                                   curr_sub_part_elem, sub_part_obj, true)) {
2093         return true;
2094       }
2095 
2096       if (curr_part_elem->subpartitions.push_back(curr_sub_part_elem,
2097                                                   &share->mem_root))
2098         return true;
2099     }
2100   }
2101 
2102   // Get partition and sub_partition count.
2103   part_info->num_parts = part_info->partitions.elements;
2104   part_info->num_subparts = part_info->partitions[0]->subpartitions.elements;
2105 
2106   switch (tab_obj->default_partitioning()) {
2107     case dd::Table::DP_NO:
2108       part_info->use_default_partitions = false;
2109       part_info->use_default_num_partitions = false;
2110       break;
2111     case dd::Table::DP_YES:
2112       part_info->use_default_partitions = true;
2113       part_info->use_default_num_partitions = true;
2114       break;
2115     case dd::Table::DP_NUMBER:
2116       part_info->use_default_partitions = true;
2117       part_info->use_default_num_partitions = false;
2118       break;
2119     case dd::Table::DP_NONE:
2120     default:
2121       DBUG_ASSERT(0); /* purecov: deadcode */
2122   }
2123   switch (tab_obj->default_subpartitioning()) {
2124     case dd::Table::DP_NO:
2125       part_info->use_default_subpartitions = false;
2126       part_info->use_default_num_subpartitions = false;
2127       break;
2128     case dd::Table::DP_YES:
2129       part_info->use_default_subpartitions = true;
2130       part_info->use_default_num_subpartitions = true;
2131       break;
2132     case dd::Table::DP_NUMBER:
2133       part_info->use_default_subpartitions = true;
2134       part_info->use_default_num_subpartitions = false;
2135       break;
2136     case dd::Table::DP_NONE:
2137       DBUG_ASSERT(!part_info->is_sub_partitioned());
2138       break;
2139     default:
2140       DBUG_ASSERT(0); /* purecov: deadcode */
2141   }
2142 
2143   char *buf;
2144   uint buf_len;
2145 
2146   // Turn off ANSI_QUOTES and other SQL modes which affect printing of
2147   // generated partitioning clause.
2148   Sql_mode_parse_guard parse_guard(thd);
2149 
2150   buf = generate_partition_syntax(part_info, &buf_len, true, true, false,
2151                                   nullptr);
2152 
2153   if (!buf) return true;
2154 
2155   share->partition_info_str = strmake_root(&share->mem_root, buf, buf_len);
2156   if (!share->partition_info_str) return true;
2157 
2158   share->partition_info_str_len = buf_len;
2159   share->m_part_info = part_info;
2160   return (false);
2161 }
2162 
2163 /**
2164   Fill TABLE_SHARE with information about foreign keys from dd::Table.
2165 */
2166 
fill_foreign_keys_from_dd(TABLE_SHARE * share,const dd::Table * tab_obj)2167 static bool fill_foreign_keys_from_dd(TABLE_SHARE *share,
2168                                       const dd::Table *tab_obj) {
2169   DBUG_ASSERT(share->foreign_keys == 0 && share->foreign_key_parents == 0);
2170 
2171   share->foreign_keys = tab_obj->foreign_keys().size();
2172   share->foreign_key_parents = tab_obj->foreign_key_parents().size();
2173 
2174   if (share->foreign_keys) {
2175     if (!(share->foreign_key =
2176               (TABLE_SHARE_FOREIGN_KEY_INFO *)share->mem_root.Alloc(
2177                   share->foreign_keys * sizeof(TABLE_SHARE_FOREIGN_KEY_INFO))))
2178       return true;
2179 
2180     uint i = 0;
2181 
2182     for (const dd::Foreign_key *fk : tab_obj->foreign_keys()) {
2183       if (lex_string_strmake(&share->mem_root,
2184                              &share->foreign_key[i].referenced_table_db,
2185                              fk->referenced_table_schema_name().c_str(),
2186                              fk->referenced_table_schema_name().length()))
2187         return true;
2188       if (lex_string_strmake(&share->mem_root,
2189                              &share->foreign_key[i].referenced_table_name,
2190                              fk->referenced_table_name().c_str(),
2191                              fk->referenced_table_name().length()))
2192         return true;
2193       if (lex_string_strmake(&share->mem_root,
2194                              &share->foreign_key[i].unique_constraint_name,
2195                              fk->unique_constraint_name().c_str(),
2196                              fk->unique_constraint_name().length()))
2197         return true;
2198 
2199       share->foreign_key[i].update_rule = fk->update_rule();
2200       share->foreign_key[i].delete_rule = fk->delete_rule();
2201 
2202       share->foreign_key[i].columns = fk->elements().size();
2203       if (!(share->foreign_key[i].column_name =
2204                 (LEX_CSTRING *)share->mem_root.Alloc(
2205                     share->foreign_key[i].columns * sizeof(LEX_CSTRING))))
2206         return true;
2207 
2208       uint j = 0;
2209 
2210       for (const dd::Foreign_key_element *fk_el : fk->elements()) {
2211         if (lex_string_strmake(&share->mem_root,
2212                                &share->foreign_key[i].column_name[j],
2213                                fk_el->column().name().c_str(),
2214                                fk_el->column().name().length()))
2215           return true;
2216 
2217         ++j;
2218       }
2219 
2220       ++i;
2221     }
2222   }
2223 
2224   if (share->foreign_key_parents) {
2225     if (!(share->foreign_key_parent =
2226               (TABLE_SHARE_FOREIGN_KEY_PARENT_INFO *)share->mem_root.Alloc(
2227                   share->foreign_key_parents *
2228                   sizeof(TABLE_SHARE_FOREIGN_KEY_PARENT_INFO))))
2229       return true;
2230 
2231     uint i = 0;
2232 
2233     for (const dd::Foreign_key_parent *fk_p : tab_obj->foreign_key_parents()) {
2234       if (lex_string_strmake(&share->mem_root,
2235                              &share->foreign_key_parent[i].referencing_table_db,
2236                              fk_p->child_schema_name().c_str(),
2237                              fk_p->child_schema_name().length()))
2238         return true;
2239       if (lex_string_strmake(
2240               &share->mem_root,
2241               &share->foreign_key_parent[i].referencing_table_name,
2242               fk_p->child_table_name().c_str(),
2243               fk_p->child_table_name().length()))
2244         return true;
2245       share->foreign_key_parent[i].update_rule = fk_p->update_rule();
2246       share->foreign_key_parent[i].delete_rule = fk_p->delete_rule();
2247       ++i;
2248     }
2249   }
2250   return false;
2251 }
2252 
2253 /**
2254   Fill check constraints from dd::Table object to the TABLE_SHARE.
2255 
2256   @param[in,out]      share              TABLE_SHARE instance.
2257   @param[in]          tab_obj            Table instance.
2258 
2259   @retval   false   On Success.
2260   @retval   true    On failure.
2261 */
fill_check_constraints_from_dd(TABLE_SHARE * share,const dd::Table * tab_obj)2262 static bool fill_check_constraints_from_dd(TABLE_SHARE *share,
2263                                            const dd::Table *tab_obj) {
2264   DBUG_ASSERT(share->check_constraint_share_list == nullptr);
2265 
2266   if (tab_obj->check_constraints().size() > 0) {
2267     share->check_constraint_share_list = new (&share->mem_root)
2268         Sql_check_constraint_share_list(&share->mem_root);
2269     if (share->check_constraint_share_list == nullptr) return true;  // OOM
2270 
2271     if (share->check_constraint_share_list->reserve(
2272             tab_obj->check_constraints().size()))
2273       return true;  // OOM
2274 
2275     for (auto &cc : tab_obj->check_constraints()) {
2276       // Check constraint name.
2277       LEX_CSTRING name;
2278       if (lex_string_strmake(&share->mem_root, &name, cc->name().c_str(),
2279                              cc->name().length()))
2280         return true;  // OOM
2281 
2282       // Check constraint expression (clause).
2283       LEX_CSTRING check_clause;
2284       if (lex_string_strmake(&share->mem_root, &check_clause,
2285                              cc->check_clause().c_str(),
2286                              cc->check_clause().length()))
2287         return true;  // OOM
2288 
2289       // Check constraint state.
2290       bool is_cc_enforced =
2291           (cc->constraint_state() == dd::Check_constraint::CS_ENFORCED);
2292 
2293       share->check_constraint_share_list->push_back(
2294           Sql_check_constraint_share(name, check_clause, is_cc_enforced));
2295     }
2296   }
2297 
2298   return false;
2299 }
2300 
open_table_def(THD * thd,TABLE_SHARE * share,const dd::Table & table_def)2301 bool open_table_def(THD *thd, TABLE_SHARE *share, const dd::Table &table_def) {
2302   DBUG_TRACE;
2303 
2304   MEM_ROOT *old_root = thd->mem_root;
2305   thd->mem_root = &share->mem_root;  // Needed for make_field()++
2306   share->blob_fields = 0;            // HACK
2307 
2308   // Fill the TABLE_SHARE with details.
2309   bool error = (fill_share_from_dd(thd, share, &table_def) ||
2310                 fill_columns_from_dd(thd, share, &table_def) ||
2311                 fill_indexes_from_dd(thd, share, &table_def) ||
2312                 fill_partitioning_from_dd(thd, share, &table_def) ||
2313                 fill_foreign_keys_from_dd(share, &table_def) ||
2314                 fill_check_constraints_from_dd(share, &table_def));
2315 
2316   thd->mem_root = old_root;
2317 
2318   if (!error) error = prepare_share(thd, share, &table_def);
2319 
2320   if (!error) {
2321     share->table_category = get_table_category(share->db, share->table_name);
2322     thd->status_var.opened_shares++;
2323     return false;
2324   }
2325   return true;
2326 }
2327 
2328 /*
2329   Ignore errors related to invalid collation and missing parser during
2330   open_table_def().
2331 */
2332 class Open_table_error_handler : public Internal_error_handler {
2333  public:
handle_condition(THD *,uint sql_errno,const char *,Sql_condition::enum_severity_level *,const char *)2334   virtual bool handle_condition(THD *, uint sql_errno, const char *,
2335                                 Sql_condition::enum_severity_level *,
2336                                 const char *) {
2337     return (sql_errno == ER_UNKNOWN_COLLATION ||
2338             sql_errno == ER_PLUGIN_IS_NOT_LOADED);
2339   }
2340 };
2341 
open_table_def_suppress_invalid_meta_data(THD * thd,TABLE_SHARE * share,const dd::Table & table_def)2342 bool open_table_def_suppress_invalid_meta_data(THD *thd, TABLE_SHARE *share,
2343                                                const dd::Table &table_def) {
2344   Open_table_error_handler error_handler;
2345   thd->push_internal_handler(&error_handler);
2346   bool error = open_table_def(thd, share, table_def);
2347   thd->pop_internal_handler();
2348   return error;
2349 }
2350 
2351 //////////////////////////////////////////////////////////////////////////
2352