1 /* Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/dd_table_share.h"
24
25 #include "my_config.h"
26
27 #include <string.h>
28 #include <algorithm>
29 #include <string>
30 #include <type_traits>
31
32 #include "lex_string.h"
33 #include "m_string.h"
34 #include "map_helpers.h"
35 #include "my_alloc.h"
36 #include "my_base.h"
37 #include "my_bitmap.h"
38 #include "my_compare.h"
39 #include "my_compiler.h"
40 #include "my_dbug.h"
41 #include "my_loglevel.h"
42 #include "my_macros.h"
43 #include "mysql/components/services/log_builtins.h"
44 #include "mysql/components/services/log_shared.h"
45 #include "mysql/plugin.h"
46 #include "mysql/psi/psi_base.h"
47 #include "mysql/udf_registration_types.h"
48 #include "mysql_com.h"
49 #include "mysqld_error.h"
50 #include "nullable.h"
51 #include "sql/dd/collection.h"
52 #include "sql/dd/dd_table.h" // dd::FIELD_NAME_SEPARATOR_CHAR
53 #include "sql/dd/dd_tablespace.h" // dd::get_tablespace_name
54 // TODO: Avoid exposing dd/impl headers in public files.
55 #include "sql/dd/impl/utils.h" // dd::eat_str
56 #include "sql/dd/properties.h" // dd::Properties
57 #include "sql/dd/string_type.h"
58 #include "sql/dd/types/check_constraint.h" // dd::Check_constraint
59 #include "sql/dd/types/column.h" // dd::enum_column_types
60 #include "sql/dd/types/column_type_element.h" // dd::Column_type_element
61 #include "sql/dd/types/foreign_key.h"
62 #include "sql/dd/types/foreign_key_element.h" // dd::Foreign_key_element
63 #include "sql/dd/types/index.h" // dd::Index
64 #include "sql/dd/types/index_element.h" // dd::Index_element
65 #include "sql/dd/types/partition.h" // dd::Partition
66 #include "sql/dd/types/partition_value.h" // dd::Partition_value
67 #include "sql/dd/types/table.h" // dd::Table
68 #include "sql/default_values.h" // prepare_default_value_buffer...
69 #include "sql/error_handler.h" // Internal_error_handler
70 #include "sql/field.h"
71 #include "sql/gis/srid.h"
72 #include "sql/handler.h"
73 #include "sql/key.h"
74 #include "sql/log.h"
75 #include "sql/partition_element.h" // partition_element
76 #include "sql/partition_info.h" // partition_info
77 #include "sql/sql_bitmap.h"
78 #include "sql/sql_check_constraint.h" // Sql_check_constraint_share_list
79 #include "sql/sql_class.h" // THD
80 #include "sql/sql_const.h"
81 #include "sql/sql_error.h"
82 #include "sql/sql_list.h"
83 #include "sql/sql_partition.h" // generate_partition_syntax
84 #include "sql/sql_plugin.h" // plugin_unlock
85 #include "sql/sql_plugin_ref.h"
86 #include "sql/sql_table.h" // primary_key_name
87 #include "sql/strfunc.h" // lex_cstring_handle
88 #include "sql/system_variables.h"
89 #include "sql/table.h"
90 #include "sql/thd_raii.h"
91 #include "typelib.h"
92
93 namespace histograms {
94 class Histogram;
95 } // namespace histograms
96
dd_get_old_field_type(dd::enum_column_types type)97 enum_field_types dd_get_old_field_type(dd::enum_column_types type) {
98 switch (type) {
99 case dd::enum_column_types::DECIMAL:
100 return MYSQL_TYPE_DECIMAL;
101
102 case dd::enum_column_types::TINY:
103 return MYSQL_TYPE_TINY;
104
105 case dd::enum_column_types::SHORT:
106 return MYSQL_TYPE_SHORT;
107
108 case dd::enum_column_types::LONG:
109 return MYSQL_TYPE_LONG;
110
111 case dd::enum_column_types::FLOAT:
112 return MYSQL_TYPE_FLOAT;
113
114 case dd::enum_column_types::DOUBLE:
115 return MYSQL_TYPE_DOUBLE;
116
117 case dd::enum_column_types::TYPE_NULL:
118 return MYSQL_TYPE_NULL;
119
120 case dd::enum_column_types::TIMESTAMP:
121 return MYSQL_TYPE_TIMESTAMP;
122
123 case dd::enum_column_types::LONGLONG:
124 return MYSQL_TYPE_LONGLONG;
125
126 case dd::enum_column_types::INT24:
127 return MYSQL_TYPE_INT24;
128
129 case dd::enum_column_types::DATE:
130 return MYSQL_TYPE_DATE;
131
132 case dd::enum_column_types::TIME:
133 return MYSQL_TYPE_TIME;
134
135 case dd::enum_column_types::DATETIME:
136 return MYSQL_TYPE_DATETIME;
137
138 case dd::enum_column_types::YEAR:
139 return MYSQL_TYPE_YEAR;
140
141 case dd::enum_column_types::NEWDATE:
142 return MYSQL_TYPE_NEWDATE;
143
144 case dd::enum_column_types::VARCHAR:
145 return MYSQL_TYPE_VARCHAR;
146
147 case dd::enum_column_types::BIT:
148 return MYSQL_TYPE_BIT;
149
150 case dd::enum_column_types::TIMESTAMP2:
151 return MYSQL_TYPE_TIMESTAMP2;
152
153 case dd::enum_column_types::DATETIME2:
154 return MYSQL_TYPE_DATETIME2;
155
156 case dd::enum_column_types::TIME2:
157 return MYSQL_TYPE_TIME2;
158
159 case dd::enum_column_types::NEWDECIMAL:
160 return MYSQL_TYPE_NEWDECIMAL;
161
162 case dd::enum_column_types::ENUM:
163 return MYSQL_TYPE_ENUM;
164
165 case dd::enum_column_types::SET:
166 return MYSQL_TYPE_SET;
167
168 case dd::enum_column_types::TINY_BLOB:
169 return MYSQL_TYPE_TINY_BLOB;
170
171 case dd::enum_column_types::MEDIUM_BLOB:
172 return MYSQL_TYPE_MEDIUM_BLOB;
173
174 case dd::enum_column_types::LONG_BLOB:
175 return MYSQL_TYPE_LONG_BLOB;
176
177 case dd::enum_column_types::BLOB:
178 return MYSQL_TYPE_BLOB;
179
180 case dd::enum_column_types::VAR_STRING:
181 return MYSQL_TYPE_VAR_STRING;
182
183 case dd::enum_column_types::STRING:
184 return MYSQL_TYPE_STRING;
185
186 case dd::enum_column_types::GEOMETRY:
187 return MYSQL_TYPE_GEOMETRY;
188
189 case dd::enum_column_types::JSON:
190 return MYSQL_TYPE_JSON;
191
192 default:
193 DBUG_ASSERT(!"Should not hit here"); /* purecov: deadcode */
194 }
195
196 return MYSQL_TYPE_LONG;
197 }
198
199 /** For enum in dd::Index */
dd_get_old_index_algorithm_type(dd::Index::enum_index_algorithm type)200 static enum ha_key_alg dd_get_old_index_algorithm_type(
201 dd::Index::enum_index_algorithm type) {
202 switch (type) {
203 case dd::Index::IA_SE_SPECIFIC:
204 return HA_KEY_ALG_SE_SPECIFIC;
205
206 case dd::Index::IA_BTREE:
207 return HA_KEY_ALG_BTREE;
208
209 case dd::Index::IA_RTREE:
210 return HA_KEY_ALG_RTREE;
211
212 case dd::Index::IA_HASH:
213 return HA_KEY_ALG_HASH;
214
215 case dd::Index::IA_FULLTEXT:
216 return HA_KEY_ALG_FULLTEXT;
217
218 default:
219 DBUG_ASSERT(!"Should not hit here"); /* purecov: deadcode */
220 }
221
222 return HA_KEY_ALG_SE_SPECIFIC;
223 }
224
225 /*
226 Check if the given key_part is suitable to be promoted as part of
227 primary key.
228 */
is_suitable_for_primary_key(KEY_PART_INFO * key_part,Field * table_field)229 bool is_suitable_for_primary_key(KEY_PART_INFO *key_part, Field *table_field) {
230 // Index on virtual generated columns is not allowed to be PK
231 // even when the conditions below are true, so this case must be
232 // rejected here.
233 if (table_field->is_virtual_gcol()) return false;
234
235 /*
236 If the key column is of NOT NULL BLOB type, then it
237 will definitly have key prefix. And if key part prefix size
238 is equal to the BLOB column max size, then we can promote
239 it to primary key.
240 */
241 if (!table_field->is_nullable() && table_field->type() == MYSQL_TYPE_BLOB &&
242 table_field->field_length == key_part->length)
243 return true;
244
245 if (table_field->is_nullable() ||
246 table_field->key_length() != key_part->length)
247 return false;
248
249 return true;
250 }
251
252 /**
253 Finalize preparation of TABLE_SHARE from dd::Table object by filling
254 in remaining info about columns and keys.
255
256 This code similar to code in open_binary_frm(). Can be re-written
257 independent to other efforts later.
258 */
259
prepare_share(THD * thd,TABLE_SHARE * share,const dd::Table * table_def)260 static bool prepare_share(THD *thd, TABLE_SHARE *share,
261 const dd::Table *table_def) {
262 my_bitmap_map *bitmaps;
263 bool use_hash;
264 handler *handler_file = nullptr;
265
266 // Mark 'system' tables (tables with one row) to help the Optimizer.
267 share->system =
268 ((share->max_rows == 1) && (share->min_rows == 1) && (share->keys == 0));
269
270 bool use_extended_sk = ha_check_storage_engine_flag(
271 share->db_type(), HTON_SUPPORTS_EXTENDED_KEYS);
272 // Setup name_hash for quick look-up
273 use_hash = share->fields >= MAX_FIELDS_BEFORE_HASH;
274 if (use_hash) {
275 Field **field_ptr = share->field;
276 share->name_hash = new collation_unordered_map<std::string, Field **>(
277 system_charset_info, PSI_INSTRUMENT_ME);
278 share->name_hash->reserve(share->fields);
279
280 for (uint i = 0; i < share->fields; i++, field_ptr++) {
281 share->name_hash->emplace((*field_ptr)->field_name, field_ptr);
282 }
283 }
284
285 share->m_histograms =
286 new malloc_unordered_map<uint, const histograms::Histogram *>(
287 PSI_INSTRUMENT_ME);
288
289 // Setup other fields =====================================================
290 /* Allocate handler */
291 if (!(handler_file = get_new_handler(share, (share->m_part_info != nullptr),
292 &share->mem_root, share->db_type()))) {
293 my_error(ER_INVALID_DD_OBJECT, MYF(0), share->path.str,
294 "Failed to initialize handler.");
295 return true;
296 }
297
298 if (handler_file->set_ha_share_ref(&share->ha_share)) {
299 my_error(ER_INVALID_DD_OBJECT, MYF(0), share->path.str, "");
300 return true;
301 }
302 share->db_low_byte_first = handler_file->low_byte_first();
303
304 /* Fix key->name and key_part->field */
305 if (share->keys) {
306 KEY *keyinfo;
307 KEY_PART_INFO *key_part;
308 uint primary_key = (uint)(
309 find_type(primary_key_name, &share->keynames, FIND_TYPE_NO_PREFIX) - 1);
310 longlong ha_option = handler_file->ha_table_flags();
311 keyinfo = share->key_info;
312 key_part = keyinfo->key_part;
313
314 dd::Table::Index_collection::const_iterator idx_it(
315 table_def->indexes().begin());
316
317 for (uint key = 0; key < share->keys; key++, keyinfo++) {
318 /*
319 Skip hidden dd::Index objects so idx_it is in sync with key index
320 and keyinfo pointer.
321 */
322 while ((*idx_it)->is_hidden()) {
323 ++idx_it;
324 continue;
325 }
326
327 uint usable_parts = 0;
328 keyinfo->name = share->keynames.type_names[key];
329
330 /* Check that fulltext and spatial keys have correct algorithm set. */
331 DBUG_ASSERT(!(share->key_info[key].flags & HA_FULLTEXT) ||
332 share->key_info[key].algorithm == HA_KEY_ALG_FULLTEXT);
333 DBUG_ASSERT(!(share->key_info[key].flags & HA_SPATIAL) ||
334 share->key_info[key].algorithm == HA_KEY_ALG_RTREE);
335
336 if (primary_key >= MAX_KEY && (keyinfo->flags & HA_NOSAME)) {
337 /*
338 If the UNIQUE key doesn't have NULL columns and is not a part key
339 declare this as a primary key.
340 */
341 primary_key = key;
342 for (uint i = 0; i < keyinfo->user_defined_key_parts; i++) {
343 Field *table_field = key_part[i].field;
344
345 if (is_suitable_for_primary_key(&key_part[i], table_field) == false) {
346 primary_key = MAX_KEY;
347 break;
348 }
349 }
350
351 /*
352 Check that dd::Index::is_candidate_key() used by SEs works in
353 the same way as above call to is_suitable_for_primary_key().
354 */
355 DBUG_ASSERT((primary_key == key) == (*idx_it)->is_candidate_key());
356 }
357
358 dd::Index::Index_elements::const_iterator idx_el_it(
359 (*idx_it)->elements().begin());
360
361 for (uint i = 0; i < keyinfo->user_defined_key_parts; key_part++, i++) {
362 /*
363 Skip hidden Index_element objects so idx_el_it is in sync with
364 i and key_part pointer.
365 */
366 while ((*idx_el_it)->is_hidden()) {
367 ++idx_el_it;
368 continue;
369 }
370
371 Field *field = key_part->field;
372
373 key_part->type = field->key_type();
374 if (field->is_nullable()) {
375 key_part->null_offset = field->null_offset(share->default_values);
376 key_part->null_bit = field->null_bit;
377 key_part->store_length += HA_KEY_NULL_LENGTH;
378 keyinfo->flags |= HA_NULL_PART_KEY;
379 keyinfo->key_length += HA_KEY_NULL_LENGTH;
380 }
381 if (field->type() == MYSQL_TYPE_BLOB ||
382 field->real_type() == MYSQL_TYPE_VARCHAR ||
383 field->type() == MYSQL_TYPE_GEOMETRY) {
384 key_part->store_length += HA_KEY_BLOB_LENGTH;
385 if (i + 1 <= keyinfo->user_defined_key_parts)
386 keyinfo->key_length += HA_KEY_BLOB_LENGTH;
387 }
388 key_part->init_flags();
389
390 if (field->is_virtual_gcol()) keyinfo->flags |= HA_VIRTUAL_GEN_KEY;
391
392 setup_key_part_field(share, handler_file, primary_key, keyinfo, key, i,
393 &usable_parts, true);
394
395 field->set_flag(PART_KEY_FLAG);
396 if (key == primary_key) {
397 field->set_flag(PRI_KEY_FLAG);
398 /*
399 If this field is part of the primary key and all keys contains
400 the primary key, then we can use any key to find this column
401 */
402 if (ha_option & HA_PRIMARY_KEY_IN_READ_INDEX) {
403 if (field->key_length() == key_part->length &&
404 !field->is_flag_set(BLOB_FLAG))
405 field->part_of_key = share->keys_in_use;
406 if (field->part_of_sortkey.is_set(key))
407 field->part_of_sortkey = share->keys_in_use;
408 }
409 }
410 if (field->key_length() != key_part->length) {
411 #ifndef TO_BE_DELETED_ON_PRODUCTION
412 if (field->type() == MYSQL_TYPE_NEWDECIMAL) {
413 /*
414 Fix a fatal error in decimal key handling that causes crashes
415 on Innodb. We fix it by reducing the key length so that
416 InnoDB never gets a too big key when searching.
417 This allows the end user to do an ALTER TABLE to fix the
418 error.
419 */
420 keyinfo->key_length -= (key_part->length - field->key_length());
421 key_part->store_length -=
422 (uint16)(key_part->length - field->key_length());
423 key_part->length = (uint16)field->key_length();
424 LogErr(ERROR_LEVEL, ER_TABLE_WRONG_KEY_DEFINITION,
425 share->table_name.str, share->table_name.str);
426 push_warning_printf(thd, Sql_condition::SL_WARNING,
427 ER_CRASHED_ON_USAGE,
428 "Found wrong key definition in %s; "
429 "Please do \"ALTER TABLE `%s` FORCE\" to fix "
430 "it!",
431 share->table_name.str, share->table_name.str);
432 share->crashed = true; // Marker for CHECK TABLE
433 continue;
434 }
435 #endif
436 key_part->key_part_flag |= HA_PART_KEY_SEG;
437 }
438
439 /*
440 Check that dd::Index_element::is_prefix() used by SEs works in
441 the same way as code which sets HA_PART_KEY_SEG flag.
442 */
443 DBUG_ASSERT(
444 (*idx_el_it)->is_prefix() ==
445 static_cast<bool>(key_part->key_part_flag & HA_PART_KEY_SEG));
446 ++idx_el_it;
447 }
448
449 /*
450 KEY::flags is fully set-up at this point so we can copy it to
451 KEY::actual_flags.
452 */
453 keyinfo->actual_flags = keyinfo->flags;
454
455 if (use_extended_sk && primary_key < MAX_KEY && key &&
456 !(keyinfo->flags & HA_NOSAME))
457 key_part +=
458 add_pk_parts_to_sk(keyinfo, key, share->key_info, primary_key,
459 share, handler_file, &usable_parts);
460
461 /* Skip unused key parts if they exist */
462 key_part += keyinfo->unused_key_parts;
463
464 keyinfo->usable_key_parts = usable_parts; // Filesort
465
466 share->max_key_length =
467 std::max(share->max_key_length,
468 keyinfo->key_length + keyinfo->user_defined_key_parts);
469 share->total_key_length += keyinfo->key_length;
470 /*
471 MERGE tables do not have unique indexes. But every key could be
472 an unique index on the underlying MyISAM table. (Bug #10400)
473 */
474 if ((keyinfo->flags & HA_NOSAME) ||
475 (ha_option & HA_ANY_INDEX_MAY_BE_UNIQUE))
476 share->max_unique_length =
477 std::max(share->max_unique_length, keyinfo->key_length);
478
479 ++idx_it;
480 }
481 if (primary_key < MAX_KEY && (share->keys_in_use.is_set(primary_key))) {
482 share->primary_key = primary_key;
483 /*
484 If we are using an integer as the primary key then allow the user to
485 refer to it as '_rowid'
486 */
487 if (share->key_info[primary_key].user_defined_key_parts == 1) {
488 Field *field = share->key_info[primary_key].key_part[0].field;
489 if (field && field->result_type() == INT_RESULT) {
490 /* note that fieldnr here (and rowid_field_offset) starts from 1 */
491 share->rowid_field_offset =
492 (share->key_info[primary_key].key_part[0].fieldnr);
493 }
494 }
495 } else
496 share->primary_key = MAX_KEY; // we do not have a primary key
497 } else
498 share->primary_key = MAX_KEY;
499 destroy(handler_file);
500
501 if (share->found_next_number_field) {
502 Field *reg_field = *share->found_next_number_field;
503 /* Check that the auto-increment column is the first column of some key. */
504 if ((int)(share->next_number_index = (uint)find_ref_key(
505 share->key_info, share->keys, share->default_values,
506 reg_field, &share->next_number_key_offset,
507 &share->next_number_keypart)) < 0) {
508 my_error(ER_INVALID_DD_OBJECT, MYF(0), share->path.str,
509 "Wrong field definition.");
510 return true;
511 } else
512 reg_field->set_flag(AUTO_INCREMENT_FLAG);
513 }
514
515 if (share->blob_fields) {
516 Field **ptr;
517 uint k, *save;
518
519 /* Store offsets to blob fields to find them fast */
520 if (!(share->blob_field = save = (uint *)share->mem_root.Alloc(
521 (uint)(share->blob_fields * sizeof(uint)))))
522 return true; // OOM error message already reported
523 for (k = 0, ptr = share->field; *ptr; ptr++, k++) {
524 if ((*ptr)->is_flag_set(BLOB_FLAG) || (*ptr)->is_array()) (*save++) = k;
525 }
526 }
527
528 share->column_bitmap_size = bitmap_buffer_size(share->fields);
529 if (!(bitmaps = (my_bitmap_map *)share->mem_root.Alloc(
530 share->column_bitmap_size))) {
531 // OOM error message already reported
532 return true; /* purecov: inspected */
533 }
534 bitmap_init(&share->all_set, bitmaps, share->fields);
535 bitmap_set_all(&share->all_set);
536
537 return false;
538 }
539
540 /** Fill tablespace name from dd::Tablespace. */
fill_tablespace_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)541 static bool fill_tablespace_from_dd(THD *thd, TABLE_SHARE *share,
542 const dd::Table *tab_obj) {
543 DBUG_TRACE;
544
545 return dd::get_tablespace_name<dd::Table>(thd, tab_obj, &share->tablespace,
546 &share->mem_root);
547 }
548
549 /**
550 Convert row format value used in DD to corresponding value in old
551 row_type enum.
552 */
553
dd_get_old_row_format(dd::Table::enum_row_format new_format)554 static row_type dd_get_old_row_format(dd::Table::enum_row_format new_format) {
555 switch (new_format) {
556 case dd::Table::RF_FIXED:
557 return ROW_TYPE_FIXED;
558 case dd::Table::RF_DYNAMIC:
559 return ROW_TYPE_DYNAMIC;
560 case dd::Table::RF_COMPRESSED:
561 return ROW_TYPE_COMPRESSED;
562 case dd::Table::RF_REDUNDANT:
563 return ROW_TYPE_REDUNDANT;
564 case dd::Table::RF_COMPACT:
565 return ROW_TYPE_COMPACT;
566 case dd::Table::RF_PAGED:
567 return ROW_TYPE_PAGED;
568 default:
569 DBUG_ASSERT(0);
570 break;
571 }
572 return ROW_TYPE_FIXED;
573 }
574
575 /** Fill TABLE_SHARE from dd::Table object */
fill_share_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)576 static bool fill_share_from_dd(THD *thd, TABLE_SHARE *share,
577 const dd::Table *tab_obj) {
578 const dd::Properties &table_options = tab_obj->options();
579
580 // Secondary storage engine.
581 if (table_options.exists("secondary_engine")) {
582 table_options.get("secondary_engine", &share->secondary_engine,
583 &share->mem_root);
584 } else {
585 // If no secondary storage engine is set, the share cannot
586 // represent a table in a secondary engine.
587 DBUG_ASSERT(!share->is_secondary_engine());
588 }
589
590 // Read table engine type
591 LEX_CSTRING engine_name = lex_cstring_handle(tab_obj->engine());
592 if (share->is_secondary_engine())
593 engine_name = {share->secondary_engine.str, share->secondary_engine.length};
594
595 plugin_ref tmp_plugin = ha_resolve_by_name_raw(thd, engine_name);
596 if (tmp_plugin) {
597 #ifndef DBUG_OFF
598 handlerton *hton = plugin_data<handlerton *>(tmp_plugin);
599 #endif
600
601 DBUG_ASSERT(hton && ha_storage_engine_is_enabled(hton));
602 DBUG_ASSERT(!ha_check_storage_engine_flag(hton, HTON_NOT_USER_SELECTABLE));
603
604 plugin_unlock(nullptr, share->db_plugin);
605 share->db_plugin = my_plugin_lock(nullptr, &tmp_plugin);
606 } else {
607 my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), engine_name.str);
608 return true;
609 }
610
611 // Set temporarily a good value for db_low_byte_first.
612 DBUG_ASSERT(ha_legacy_type(share->db_type()) != DB_TYPE_ISAM);
613 share->db_low_byte_first = true;
614
615 // Read other table options
616 uint64 option_value = 0;
617 bool bool_opt = false;
618
619 // Max rows
620 if (table_options.exists("max_rows"))
621 table_options.get("max_rows", &share->max_rows);
622
623 // Min rows
624 if (table_options.exists("min_rows"))
625 table_options.get("min_rows", &share->min_rows);
626
627 // Options from HA_CREATE_INFO::table_options/TABLE_SHARE::db_create_options.
628 share->db_create_options = 0;
629
630 table_options.get("pack_record", &bool_opt);
631 if (bool_opt) share->db_create_options |= HA_OPTION_PACK_RECORD;
632
633 if (table_options.exists("pack_keys")) {
634 table_options.get("pack_keys", &bool_opt);
635 share->db_create_options |=
636 bool_opt ? HA_OPTION_PACK_KEYS : HA_OPTION_NO_PACK_KEYS;
637 }
638
639 if (table_options.exists("checksum")) {
640 table_options.get("checksum", &bool_opt);
641 if (bool_opt) share->db_create_options |= HA_OPTION_CHECKSUM;
642 }
643
644 if (table_options.exists("delay_key_write")) {
645 table_options.get("delay_key_write", &bool_opt);
646 if (bool_opt) share->db_create_options |= HA_OPTION_DELAY_KEY_WRITE;
647 }
648
649 if (table_options.exists("stats_persistent")) {
650 table_options.get("stats_persistent", &bool_opt);
651 share->db_create_options |=
652 bool_opt ? HA_OPTION_STATS_PERSISTENT : HA_OPTION_NO_STATS_PERSISTENT;
653 }
654
655 share->db_options_in_use = share->db_create_options;
656
657 // Average row length
658
659 if (table_options.exists("avg_row_length")) {
660 table_options.get("avg_row_length", &option_value);
661 share->avg_row_length = static_cast<ulong>(option_value);
662 }
663
664 // Collation ID
665 share->table_charset = dd_get_mysql_charset(tab_obj->collation_id());
666 if (!share->table_charset) {
667 // Unknown collation
668 if (use_mb(default_charset_info)) {
669 /* Warn that we may be changing the size of character columns */
670 LogErr(WARNING_LEVEL, ER_INVALID_CHARSET_AND_DEFAULT_IS_MB,
671 share->path.str);
672 }
673 share->table_charset = default_charset_info;
674 }
675
676 // Row type. First one really used by the storage engine.
677 share->real_row_type = dd_get_old_row_format(tab_obj->row_format());
678
679 // Then one which was explicitly specified by user for this table.
680 if (table_options.exists("row_type")) {
681 table_options.get("row_type", &option_value);
682 share->row_type =
683 dd_get_old_row_format((dd::Table::enum_row_format)option_value);
684 } else
685 share->row_type = ROW_TYPE_DEFAULT;
686
687 // Stats_sample_pages
688 if (table_options.exists("stats_sample_pages"))
689 table_options.get("stats_sample_pages", &share->stats_sample_pages);
690
691 // Stats_auto_recalc
692 if (table_options.exists("stats_auto_recalc")) {
693 table_options.get("stats_auto_recalc", &option_value);
694 share->stats_auto_recalc = (enum_stats_auto_recalc)option_value;
695 }
696
697 // mysql version
698 share->mysql_version = tab_obj->mysql_version_id();
699
700 // TODO-POST-MERGE-TO-TRUNK: Initialize new field
701 // share->last_checked_for_upgrade? Or access tab_obj directly where
702 // needed?
703
704 // key block size
705 table_options.get("key_block_size", &share->key_block_size);
706
707 // Prepare the default_value buffer.
708 if (prepare_default_value_buffer_and_table_share(thd, *tab_obj, share))
709 return true;
710
711 // Storage media flags
712 if (table_options.exists("storage")) {
713 uint32 storage_option_value = 0;
714 table_options.get("storage", &storage_option_value);
715 share->default_storage_media =
716 static_cast<ha_storage_media>(storage_option_value);
717 } else
718 share->default_storage_media = HA_SM_DEFAULT;
719
720 // Read tablespace name
721 if (fill_tablespace_from_dd(thd, share, tab_obj)) return true;
722
723 // Read comment
724 dd::String_type comment = tab_obj->comment();
725 if (comment.length()) {
726 share->comment.str =
727 strmake_root(&share->mem_root, comment.c_str(), comment.length() + 1);
728 share->comment.length = comment.length();
729 }
730
731 // Copy SE attributes into share's memroot
732 share->engine_attribute = LexStringDupRootUnlessEmpty(
733 &share->mem_root, tab_obj->engine_attribute());
734 share->secondary_engine_attribute = LexStringDupRootUnlessEmpty(
735 &share->mem_root, tab_obj->secondary_engine_attribute());
736
737 // Read Connection strings
738 if (table_options.exists("connection_string"))
739 table_options.get("connection_string", &share->connect_string,
740 &share->mem_root);
741
742 // Read Compress string
743 if (table_options.exists("compress"))
744 table_options.get("compress", &share->compress, &share->mem_root);
745
746 // Read Encrypt string
747 if (table_options.exists("encrypt_type"))
748 table_options.get("encrypt_type", &share->encrypt_type, &share->mem_root);
749
750 return false;
751 }
752
753 /**
754 Calculate number of bits used for the column in the record preamble
755 (aka null bits number).
756 */
757
column_preamble_bits(const dd::Column * col_obj)758 static uint column_preamble_bits(const dd::Column *col_obj) {
759 uint result = 0;
760
761 if (col_obj->is_nullable()) result++;
762
763 if (col_obj->type() == dd::enum_column_types::BIT) {
764 bool treat_bit_as_char = false;
765 (void)col_obj->options().get("treat_bit_as_char", &treat_bit_as_char);
766
767 if (!treat_bit_as_char) result += col_obj->char_length() & 7;
768 }
769 return result;
770 }
771
get_auto_flags(const dd::Column & col_obj,uint & auto_flags)772 inline void get_auto_flags(const dd::Column &col_obj, uint &auto_flags) {
773 /*
774 For DEFAULT it is possible to have CURRENT_TIMESTAMP or a
775 generation expression.
776 */
777 if (!col_obj.default_option().empty()) {
778 // We're only matching the prefix because there may be parameters
779 // e.g. CURRENT_TIMESTAMP(6). Regular strings won't match as they
780 // are preceded by the charset and CURRENT_TIMESTAMP as a default
781 // expression gets converted to now().
782 if (col_obj.default_option().compare(0, 17, "CURRENT_TIMESTAMP") == 0) {
783 // The only allowed patterns are "CURRENT_TIMESTAMP" and
784 // "CURRENT_TIMESTAP(<integer>)". Stored functions with names
785 // starting with "CURRENT_TIMESTAMP" should be filtered out before
786 // we get here.
787 DBUG_ASSERT(
788 col_obj.default_option().size() == 17 ||
789 (col_obj.default_option().size() >= 20 &&
790 col_obj.default_option()[17] == '(' &&
791 col_obj.default_option()[col_obj.default_option().size() - 1] ==
792 ')'));
793
794 auto_flags |= Field::DEFAULT_NOW;
795 } else {
796 auto_flags |= Field::GENERATED_FROM_EXPRESSION;
797 }
798 }
799
800 /*
801 For ON UPDATE the only option which is supported
802 at this point is CURRENT_TIMESTAMP.
803 */
804 if (!col_obj.update_option().empty()) auto_flags |= Field::ON_UPDATE_NOW;
805
806 if (col_obj.is_auto_increment()) auto_flags |= Field::NEXT_NUMBER;
807
808 /*
809 Columns can't have AUTO_INCREMENT and DEFAULT/ON UPDATE CURRENT_TIMESTAMP at
810 the same time.
811 */
812 DBUG_ASSERT(!((auto_flags & (Field::DEFAULT_NOW | Field::ON_UPDATE_NOW |
813 Field::GENERATED_FROM_EXPRESSION)) != 0 &&
814 (auto_flags & Field::NEXT_NUMBER) != 0));
815 }
816
make_field(const dd::Column & col_obj,const CHARSET_INFO * charset,TABLE_SHARE * share,uchar * ptr,uchar * null_pos,size_t null_bit)817 static Field *make_field(const dd::Column &col_obj, const CHARSET_INFO *charset,
818 TABLE_SHARE *share, uchar *ptr, uchar *null_pos,
819 size_t null_bit) {
820 auto field_type = dd_get_old_field_type(col_obj.type());
821 auto field_length = col_obj.char_length();
822
823 const dd::Properties &column_options = col_obj.options();
824
825 // Reconstruct auto_flags
826 auto auto_flags = static_cast<uint>(Field::NONE);
827 get_auto_flags(col_obj, auto_flags);
828
829 // Read Interval TYPELIB
830 TYPELIB *interval = nullptr;
831
832 if (field_type == MYSQL_TYPE_ENUM || field_type == MYSQL_TYPE_SET) {
833 //
834 // Allocate space for interval (column elements)
835 //
836 size_t interval_parts = col_obj.elements_count();
837
838 interval = (TYPELIB *)share->mem_root.Alloc(sizeof(TYPELIB));
839 interval->type_names = (const char **)share->mem_root.Alloc(
840 sizeof(char *) * (interval_parts + 1));
841 interval->type_names[interval_parts] = nullptr;
842
843 interval->type_lengths =
844 (uint *)share->mem_root.Alloc(sizeof(uint) * interval_parts);
845 interval->count = interval_parts;
846 interval->name = nullptr;
847
848 //
849 // Iterate through all the column elements
850 //
851 for (const dd::Column_type_element *ce : col_obj.elements()) {
852 // Read the enum/set element name
853 dd::String_type element_name = ce->name();
854
855 uint pos = ce->index() - 1;
856 interval->type_lengths[pos] = static_cast<uint>(element_name.length());
857 interval->type_names[pos] = strmake_root(
858 &share->mem_root, element_name.c_str(), element_name.length());
859 }
860 }
861
862 // Column name
863 char *name = nullptr;
864 dd::String_type s = col_obj.name();
865 DBUG_ASSERT(!s.empty());
866 name = strmake_root(&share->mem_root, s.c_str(), s.length());
867 name[s.length()] = '\0';
868
869 uint decimals;
870 // Decimals
871 if (field_type == MYSQL_TYPE_DECIMAL || field_type == MYSQL_TYPE_NEWDECIMAL) {
872 DBUG_ASSERT(col_obj.is_numeric_scale_null() == false);
873 decimals = col_obj.numeric_scale();
874 } else if (field_type == MYSQL_TYPE_FLOAT ||
875 field_type == MYSQL_TYPE_DOUBLE) {
876 decimals = col_obj.is_numeric_scale_null() ? DECIMAL_NOT_SPECIFIED
877 : col_obj.numeric_scale();
878 } else
879 decimals = 0;
880
881 auto geom_type = Field::GEOM_GEOMETRY;
882 // Read geometry sub type
883 if (field_type == MYSQL_TYPE_GEOMETRY) {
884 uint32 sub_type = 0;
885 column_options.get("geom_type", &sub_type);
886 geom_type = static_cast<Field::geometry_type>(sub_type);
887 }
888
889 bool treat_bit_as_char = false;
890 if (field_type == MYSQL_TYPE_BIT) {
891 column_options.get("treat_bit_as_char", &treat_bit_as_char);
892 }
893
894 return make_field(*THR_MALLOC, share, ptr, field_length, null_pos, null_bit,
895 field_type, charset, geom_type, auto_flags, interval, name,
896 col_obj.is_nullable(), col_obj.is_zerofill(),
897 col_obj.is_unsigned(), decimals, treat_bit_as_char, 0,
898 col_obj.srs_id(), col_obj.is_array());
899 }
900
901 /**
902 Add Field constructed according to column metadata from dd::Column
903 object to TABLE_SHARE.
904 */
905
fill_column_from_dd(THD * thd,TABLE_SHARE * share,const dd::Column * col_obj,uchar * null_pos,uint null_bit_pos,uchar * rec_pos,uint field_nr)906 static bool fill_column_from_dd(THD *thd, TABLE_SHARE *share,
907 const dd::Column *col_obj, uchar *null_pos,
908 uint null_bit_pos, uchar *rec_pos,
909 uint field_nr) {
910 char *name = nullptr;
911 enum_field_types field_type;
912 const CHARSET_INFO *charset = nullptr;
913 Field *reg_field;
914 ha_storage_media field_storage;
915 column_format_type field_column_format;
916
917 //
918 // Read column details from dd table
919 //
920
921 // Column name
922 dd::String_type s = col_obj->name();
923 DBUG_ASSERT(!s.empty());
924 name = strmake_root(&share->mem_root, s.c_str(), s.length());
925 name[s.length()] = '\0';
926
927 const dd::Properties *column_options = &col_obj->options();
928
929 // Type
930 field_type = dd_get_old_field_type(col_obj->type());
931
932 // Reconstruct auto_flags
933 auto auto_flags = static_cast<uint>(Field::NONE);
934 get_auto_flags(*col_obj, auto_flags);
935
936 bool treat_bit_as_char = false;
937 if (field_type == MYSQL_TYPE_BIT)
938 column_options->get("treat_bit_as_char", &treat_bit_as_char);
939
940 // Collation ID
941 charset = dd_get_mysql_charset(col_obj->collation_id());
942 if (charset == nullptr) {
943 my_printf_error(ER_UNKNOWN_COLLATION,
944 "invalid collation id %llu for table %s, column %s", MYF(0),
945 col_obj->collation_id(), share->table_name.str, name);
946 if (thd->is_error()) return true;
947 charset = default_charset_info;
948 }
949
950 // Decimals
951 if (field_type == MYSQL_TYPE_DECIMAL || field_type == MYSQL_TYPE_NEWDECIMAL)
952 DBUG_ASSERT(col_obj->is_numeric_scale_null() == false);
953
954 // Read geometry sub type
955 if (field_type == MYSQL_TYPE_GEOMETRY) {
956 uint32 sub_type;
957 column_options->get("geom_type", &sub_type);
958 }
959
960 // Read values of storage media and column format options
961 if (column_options->exists("storage")) {
962 uint32 option_value = 0;
963 column_options->get("storage", &option_value);
964 field_storage = static_cast<ha_storage_media>(option_value);
965 } else
966 field_storage = HA_SM_DEFAULT;
967
968 if (column_options->exists("column_format")) {
969 uint32 option_value = 0;
970 column_options->get("column_format", &option_value);
971 field_column_format = static_cast<column_format_type>(option_value);
972 } else
973 field_column_format = COLUMN_FORMAT_TYPE_DEFAULT;
974
975 // Read Interval TYPELIB
976 TYPELIB *interval = nullptr;
977
978 if (field_type == MYSQL_TYPE_ENUM || field_type == MYSQL_TYPE_SET) {
979 //
980 // Allocate space for interval (column elements)
981 //
982 size_t interval_parts = col_obj->elements_count();
983
984 interval = (TYPELIB *)share->mem_root.Alloc(sizeof(TYPELIB));
985 interval->type_names = (const char **)share->mem_root.Alloc(
986 sizeof(char *) * (interval_parts + 1));
987 interval->type_names[interval_parts] = nullptr;
988
989 interval->type_lengths =
990 (uint *)share->mem_root.Alloc(sizeof(uint) * interval_parts);
991 interval->count = interval_parts;
992 interval->name = nullptr;
993
994 //
995 // Iterate through all the column elements
996 //
997 for (const dd::Column_type_element *ce : col_obj->elements()) {
998 // Read the enum/set element name
999 dd::String_type element_name = ce->name();
1000
1001 uint pos = ce->index() - 1;
1002 interval->type_lengths[pos] = static_cast<uint>(element_name.length());
1003 interval->type_names[pos] = strmake_root(
1004 &share->mem_root, element_name.c_str(), element_name.length());
1005 }
1006 }
1007
1008 //
1009 // Create FIELD
1010 //
1011 reg_field =
1012 make_field(*col_obj, charset, share, rec_pos, null_pos, null_bit_pos);
1013 reg_field->set_field_index(field_nr);
1014 reg_field->stored_in_db = true;
1015
1016 // Handle generated columns
1017 if (!col_obj->is_generation_expression_null()) {
1018 Value_generator *gcol_info = new (&share->mem_root) Value_generator();
1019
1020 // Set if GC is virtual or stored
1021 gcol_info->set_field_stored(!col_obj->is_virtual());
1022
1023 // Read generation expression.
1024 dd::String_type gc_expr = col_obj->generation_expression();
1025
1026 /*
1027 Place the expression's text into the TABLE_SHARE. Field objects of
1028 TABLE_SHARE only have that. They don't have a corresponding Item,
1029 which will be later created for the Field in TABLE, by
1030 fill_dd_columns_from_create_fields().
1031 */
1032 gcol_info->dup_expr_str(&share->mem_root, gc_expr.c_str(),
1033 gc_expr.length());
1034 share->vfields++;
1035 reg_field->gcol_info = gcol_info;
1036 reg_field->stored_in_db = gcol_info->get_field_stored();
1037 }
1038
1039 // Handle default values generated from expression
1040 if (auto_flags & Field::GENERATED_FROM_EXPRESSION) {
1041 Value_generator *default_val_expr =
1042 new (&share->mem_root) Value_generator();
1043
1044 // DEFAULT GENERATED is always stored
1045 default_val_expr->set_field_stored(true);
1046
1047 // Read generation expression.
1048 dd::String_type default_val_expr_str = col_obj->default_option();
1049
1050 // Copy the expression's text into reg_field which is stored on TABLE_SHARE.
1051 default_val_expr->dup_expr_str(&share->mem_root,
1052 default_val_expr_str.c_str(),
1053 default_val_expr_str.length());
1054 share->gen_def_field_count++;
1055 reg_field->m_default_val_expr = default_val_expr;
1056 }
1057
1058 if ((auto_flags & Field::NEXT_NUMBER) != 0)
1059 share->found_next_number_field = &share->field[field_nr];
1060
1061 // Set field flags
1062 if (col_obj->has_no_default()) reg_field->set_flag(NO_DEFAULT_VALUE_FLAG);
1063
1064 // Set default value or NULL. Reset required for e.g. CHAR.
1065 if (col_obj->is_default_value_null()) {
1066 reg_field->reset();
1067 reg_field->set_null();
1068 } else if (field_type == MYSQL_TYPE_BIT && !treat_bit_as_char &&
1069 (col_obj->char_length() & 7)) {
1070 // For bit fields with leftover bits, copy leftover bits into the preamble.
1071 Field_bit *bitfield = dynamic_cast<Field_bit *>(reg_field);
1072 const uchar leftover_bits =
1073 static_cast<uchar>(*col_obj->default_value()
1074 .substr(reg_field->pack_length() - 1, 1)
1075 .data());
1076 set_rec_bits(leftover_bits, bitfield->bit_ptr, bitfield->bit_ofs,
1077 bitfield->bit_len);
1078 // Copy the main part of the bit field data into the record body.
1079 memcpy(rec_pos, col_obj->default_value().data(),
1080 reg_field->pack_length() - 1);
1081 } else {
1082 // For any other field with default data, copy the data into the record.
1083 memcpy(rec_pos, col_obj->default_value().data(), reg_field->pack_length());
1084 }
1085
1086 reg_field->set_storage_type(field_storage);
1087 reg_field->set_column_format(field_column_format);
1088
1089 // Comments
1090 dd::String_type comment = col_obj->comment();
1091 reg_field->comment.length = comment.length();
1092 if (reg_field->comment.length) {
1093 reg_field->comment.str =
1094 strmake_root(&share->mem_root, comment.c_str(), comment.length());
1095 reg_field->comment.length = comment.length();
1096 }
1097
1098 // NOT SECONDARY column option.
1099 if (column_options->exists("not_secondary"))
1100 reg_field->set_flag(NOT_SECONDARY_FLAG);
1101
1102 reg_field->set_hidden(col_obj->hidden());
1103
1104 reg_field->m_engine_attribute = LexStringDupRootUnlessEmpty(
1105 &share->mem_root, col_obj->engine_attribute());
1106
1107 reg_field->m_secondary_engine_attribute = LexStringDupRootUnlessEmpty(
1108 &share->mem_root, col_obj->secondary_engine_attribute());
1109
1110 // Field is prepared. Store it in 'share'
1111 share->field[field_nr] = reg_field;
1112
1113 return (false);
1114 }
1115
1116 /**
1117 Populate TABLE_SHARE::field array according to column metadata
1118 from dd::Table object.
1119 */
1120
fill_columns_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)1121 static bool fill_columns_from_dd(THD *thd, TABLE_SHARE *share,
1122 const dd::Table *tab_obj) {
1123 // Allocate space for fields in TABLE_SHARE.
1124 uint fields_size = ((share->fields + 1) * sizeof(Field *));
1125 share->field = (Field **)share->mem_root.Alloc((uint)fields_size);
1126 memset(share->field, 0, fields_size);
1127 share->vfields = 0;
1128 share->gen_def_field_count = 0;
1129
1130 // Iterate through all the columns.
1131 uchar *null_flags MY_ATTRIBUTE((unused));
1132 uchar *null_pos, *rec_pos;
1133 null_flags = null_pos = share->default_values;
1134 rec_pos = share->default_values + share->null_bytes;
1135 uint null_bit_pos =
1136 (share->db_create_options & HA_OPTION_PACK_RECORD) ? 0 : 1;
1137 uint field_nr = 0;
1138 bool has_vgc = false;
1139 for (const dd::Column *col_obj : tab_obj->columns()) {
1140 // Skip hidden columns
1141 if (col_obj->is_se_hidden()) continue;
1142
1143 /*
1144 Fill details of each column.
1145
1146 Skip virtual generated columns at this point. They reside at the end of
1147 the record, so we need to do separate pass, to evaluate their offsets
1148 correctly.
1149 */
1150 if (!col_obj->is_virtual()) {
1151 if (fill_column_from_dd(thd, share, col_obj, null_pos, null_bit_pos,
1152 rec_pos, field_nr))
1153 return true;
1154
1155 rec_pos += share->field[field_nr]->pack_length_in_rec();
1156 } else
1157 has_vgc = true;
1158
1159 /*
1160 Virtual generated columns still need to be accounted in null bits and
1161 field_nr calculations, since they reside at the normal place in record
1162 preamble and TABLE_SHARE::field array.
1163 */
1164 if ((null_bit_pos += column_preamble_bits(col_obj)) > 7) {
1165 null_pos++;
1166 null_bit_pos -= 8;
1167 }
1168 field_nr++;
1169 }
1170
1171 if (has_vgc) {
1172 /*
1173 Additional pass to put virtual generated columns at the end of the
1174 record is required.
1175 */
1176 if (share->stored_rec_length >
1177 static_cast<ulong>(rec_pos - share->default_values))
1178 share->stored_rec_length = (rec_pos - share->default_values);
1179
1180 null_pos = share->default_values;
1181 null_bit_pos = (share->db_create_options & HA_OPTION_PACK_RECORD) ? 0 : 1;
1182 field_nr = 0;
1183
1184 for (const dd::Column *col_obj2 : tab_obj->columns()) {
1185 // Skip hidden columns
1186 if (col_obj2->is_se_hidden()) continue;
1187
1188 if (col_obj2->is_virtual()) {
1189 // Fill details of each column.
1190 if (fill_column_from_dd(thd, share, col_obj2, null_pos, null_bit_pos,
1191 rec_pos, field_nr))
1192 return true;
1193
1194 rec_pos += share->field[field_nr]->pack_length_in_rec();
1195 }
1196
1197 /*
1198 Account for all columns while evaluating null_pos/null_bit_pos and
1199 field_nr.
1200 */
1201 if ((null_bit_pos += column_preamble_bits(col_obj2)) > 7) {
1202 null_pos++;
1203 null_bit_pos -= 8;
1204 }
1205 field_nr++;
1206 }
1207 }
1208
1209 // Make sure the scan of the columns is consistent with other data.
1210 DBUG_ASSERT(share->null_bytes ==
1211 (null_pos - null_flags + (null_bit_pos + 7) / 8));
1212 DBUG_ASSERT(share->last_null_bit_pos == null_bit_pos);
1213 DBUG_ASSERT(share->fields == field_nr);
1214
1215 return (false);
1216 }
1217
1218 /** Fill KEY_INFO_PART from dd::Index_element object. */
fill_index_element_from_dd(TABLE_SHARE * share,const dd::Index_element * idx_elem_obj,KEY_PART_INFO * keypart)1219 static void fill_index_element_from_dd(TABLE_SHARE *share,
1220 const dd::Index_element *idx_elem_obj,
1221 KEY_PART_INFO *keypart) {
1222 //
1223 // Read index element details
1224 //
1225
1226 keypart->length = idx_elem_obj->length();
1227 keypart->store_length = keypart->length;
1228
1229 // fieldnr
1230 keypart->fieldnr = idx_elem_obj->column().ordinal_position();
1231
1232 // field
1233 DBUG_ASSERT(keypart->fieldnr > 0);
1234 Field *field = keypart->field = share->field[keypart->fieldnr - 1];
1235
1236 // offset
1237 keypart->offset = field->offset(share->default_values);
1238
1239 // key type
1240 keypart->bin_cmp = ((field->real_type() != MYSQL_TYPE_VARCHAR &&
1241 field->real_type() != MYSQL_TYPE_STRING) ||
1242 (field->charset()->state & MY_CS_BINSORT));
1243 //
1244 // Read index order
1245 //
1246
1247 // key part order
1248 if (idx_elem_obj->order() == dd::Index_element::ORDER_DESC)
1249 keypart->key_part_flag |= HA_REVERSE_SORT;
1250
1251 // key_part->field= (Field*) 0; // Will be fixed later
1252 }
1253
1254 /** Fill KEY::key_part array according to metadata from dd::Index object. */
fill_index_elements_from_dd(TABLE_SHARE * share,const dd::Index * idx_obj,int key_nr)1255 static void fill_index_elements_from_dd(TABLE_SHARE *share,
1256 const dd::Index *idx_obj, int key_nr) {
1257 //
1258 // Iterate through all index elements
1259 //
1260
1261 uint i = 0;
1262 KEY *keyinfo = share->key_info + key_nr;
1263 for (const dd::Index_element *idx_elem_obj : idx_obj->elements()) {
1264 // Skip hidden index elements
1265 if (idx_elem_obj->is_hidden()) continue;
1266
1267 //
1268 // Read index element details
1269 //
1270
1271 fill_index_element_from_dd(share, idx_elem_obj, keyinfo->key_part + i);
1272 if (keyinfo->key_part[i].field->is_array())
1273 keyinfo->flags |= HA_MULTI_VALUED_KEY;
1274 i++;
1275 }
1276 }
1277
1278 /**
1279 Add KEY constructed according to index metadata from dd::Index object to
1280 the TABLE_SHARE.
1281 */
1282
fill_index_from_dd(THD * thd,TABLE_SHARE * share,const dd::Index * idx_obj,uint key_nr)1283 static bool fill_index_from_dd(THD *thd, TABLE_SHARE *share,
1284 const dd::Index *idx_obj, uint key_nr) {
1285 //
1286 // Read index details
1287 //
1288
1289 // Get the keyinfo that we will prepare now
1290 KEY *keyinfo = share->key_info + key_nr;
1291
1292 // Read index name
1293 const dd::String_type &name = idx_obj->name();
1294 if (!name.empty()) {
1295 if (name.length()) {
1296 keyinfo->name =
1297 strmake_root(&share->mem_root, name.c_str(), name.length());
1298 share->keynames.type_names[key_nr] = keyinfo->name; // Post processing ??
1299 } else
1300 share->keynames.type_names[key_nr] = nullptr;
1301 // share->keynames.count= key_nr+1;
1302 }
1303
1304 // Index algorithm
1305 keyinfo->algorithm = dd_get_old_index_algorithm_type(idx_obj->algorithm());
1306 keyinfo->is_algorithm_explicit = idx_obj->is_algorithm_explicit();
1307
1308 // Visibility
1309 keyinfo->is_visible = idx_obj->is_visible();
1310
1311 // user defined key parts
1312 keyinfo->user_defined_key_parts = 0;
1313 for (const dd::Index_element *idx_ele : idx_obj->elements()) {
1314 // Skip hidden index elements
1315 if (!idx_ele->is_hidden()) keyinfo->user_defined_key_parts++;
1316 }
1317
1318 // flags
1319 switch (idx_obj->type()) {
1320 case dd::Index::IT_MULTIPLE:
1321 keyinfo->flags = 0;
1322 break;
1323 case dd::Index::IT_FULLTEXT:
1324 keyinfo->flags = HA_FULLTEXT;
1325 break;
1326 case dd::Index::IT_SPATIAL:
1327 keyinfo->flags = HA_SPATIAL;
1328 break;
1329 case dd::Index::IT_PRIMARY:
1330 case dd::Index::IT_UNIQUE:
1331 keyinfo->flags = HA_NOSAME;
1332 break;
1333 default:
1334 DBUG_ASSERT(0); /* purecov: deadcode */
1335 keyinfo->flags = 0;
1336 break;
1337 }
1338
1339 if (idx_obj->is_generated()) keyinfo->flags |= HA_GENERATED_KEY;
1340
1341 /*
1342 The remaining important SQL-layer flags are set later - either we directly
1343 store and read them from DD (HA_PACK_KEY, HA_BINARY_PACK_KEY), or calculate
1344 while handling other key options (HA_USES_COMMENT, HA_USES_PARSER,
1345 HA_USES_BLOCK_SIZE), or during post-processing step (HA_NULL_PART_KEY).
1346 */
1347
1348 // key length
1349 keyinfo->key_length = 0;
1350 for (const dd::Index_element *idx_elem : idx_obj->elements()) {
1351 // Skip hidden index elements
1352 if (!idx_elem->is_hidden()) keyinfo->key_length += idx_elem->length();
1353 }
1354
1355 //
1356 // Read index options
1357 //
1358
1359 const dd::Properties &idx_options = idx_obj->options();
1360
1361 /*
1362 Restore flags indicating that key packing optimization was suggested to SE.
1363 See fill_dd_indexes_for_keyinfo() for explanation why we store these flags
1364 explicitly.
1365 */
1366 uint32 stored_flags = 0;
1367 idx_options.get("flags", &stored_flags);
1368 DBUG_ASSERT((stored_flags & ~(HA_PACK_KEY | HA_BINARY_PACK_KEY)) == 0);
1369
1370 // Beginning in 8.0.12 HA_PACK_KEY and HA_BINARY_PACK_KEY are no longer set
1371 // if the SE does not support it. If the index was created prior to 8.0.12
1372 // these bits may be set in the flags option, and when being added to
1373 // key_info->flags, the ALTER algorithm analysis will be broken because the
1374 // intermediate table is created without these flags, and hence, the
1375 // analysis will incorrectly conclude that all indexes have changed.
1376 //
1377 // To workaround this issue, we remove the flags below, depending on the SE
1378 // capabilities, when preparing the table share. Thus, if we ALTER the table
1379 // at a later stage, indexes not being touched by the ALTER statement will
1380 // have the same flags both in the source table and the intermediate table,
1381 // and hence, the algorithm analysis will come to the right conclusion.
1382 if (ha_check_storage_engine_flag(share->db_type(),
1383 HTON_SUPPORTS_PACKED_KEYS) == 0) {
1384 // Given the assert above, we could just have set stored_flags to 0 here,
1385 // but keep it like this in case new flags are added.
1386 stored_flags &= ~(HA_PACK_KEY | HA_BINARY_PACK_KEY);
1387 }
1388 keyinfo->flags |= stored_flags;
1389
1390 // Block size
1391 if (idx_options.exists("block_size")) {
1392 idx_options.get("block_size", &keyinfo->block_size);
1393
1394 DBUG_ASSERT(keyinfo->block_size);
1395
1396 keyinfo->flags |= HA_USES_BLOCK_SIZE;
1397 }
1398
1399 // Read field parser
1400 if (idx_options.exists("parser_name")) {
1401 LEX_CSTRING parser_name;
1402 if (idx_options.get("parser_name", &parser_name, &share->mem_root))
1403 DBUG_ASSERT(false);
1404
1405 keyinfo->parser =
1406 my_plugin_lock_by_name(nullptr, parser_name, MYSQL_FTPARSER_PLUGIN);
1407 if (!keyinfo->parser) {
1408 my_error(ER_PLUGIN_IS_NOT_LOADED, MYF(0), parser_name.str);
1409 if (thd->is_error()) return true;
1410 }
1411
1412 keyinfo->flags |= HA_USES_PARSER;
1413 }
1414
1415 // Read comment
1416 dd::String_type comment = idx_obj->comment();
1417 keyinfo->comment.length = comment.length();
1418
1419 if (keyinfo->comment.length) {
1420 keyinfo->comment.str =
1421 strmake_root(&share->mem_root, comment.c_str(), comment.length());
1422 keyinfo->comment.length = comment.length();
1423
1424 keyinfo->flags |= HA_USES_COMMENT;
1425 }
1426 keyinfo->engine_attribute = LexStringDupRootUnlessEmpty(
1427 &share->mem_root, idx_obj->engine_attribute());
1428 if (keyinfo->engine_attribute.length > 0)
1429 keyinfo->flags |= HA_INDEX_USES_ENGINE_ATTRIBUTE;
1430
1431 keyinfo->secondary_engine_attribute = LexStringDupRootUnlessEmpty(
1432 &share->mem_root, idx_obj->secondary_engine_attribute());
1433 if (keyinfo->secondary_engine_attribute.length > 0)
1434 keyinfo->flags |= HA_INDEX_USES_SECONDARY_ENGINE_ATTRIBUTE;
1435 return (false);
1436 }
1437
1438 /**
1439 Check if this is a spatial index that can be used. That is, if there is
1440 a spatial index on a geometry column without the SRID specified, we will
1441 hide the index so that the optimizer won't consider the index during
1442 optimization/execution.
1443
1444 @param index The index to verify
1445
1446 @retval true if the index is an usable spatial index, or if it isn't a
1447 spatial index.
1448 @retval false if the index is a spatial index on a geometry column without
1449 an SRID specified.
1450 */
is_spatial_index_usable(const dd::Index & index)1451 static bool is_spatial_index_usable(const dd::Index &index) {
1452 if (index.type() == dd::Index::IT_SPATIAL) {
1453 /*
1454 We have already checked for hidden indexes before we get here. But we
1455 still play safe since the check is very cheap.
1456 */
1457 if (index.is_hidden()) return false; /* purecov: deadcode */
1458
1459 // Check that none of the parts references a column with SRID == NULL
1460 for (const auto element : index.elements()) {
1461 if (!element->is_hidden() && !element->column().srs_id().has_value())
1462 return false;
1463 }
1464 }
1465
1466 return true;
1467 }
1468
1469 /**
1470 Fill TABLE_SHARE::key_info array according to index metadata
1471 from dd::Table object.
1472 */
1473
fill_indexes_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)1474 static bool fill_indexes_from_dd(THD *thd, TABLE_SHARE *share,
1475 const dd::Table *tab_obj) {
1476 share->keys_for_keyread.init(0);
1477 share->keys_in_use.init();
1478 share->visible_indexes.init();
1479
1480 uint32 primary_key_parts = 0;
1481
1482 bool use_extended_sk = ha_check_storage_engine_flag(
1483 share->db_type(), HTON_SUPPORTS_EXTENDED_KEYS);
1484
1485 // Count number of keys and total number of key parts in the table.
1486
1487 DBUG_ASSERT(share->keys == 0 && share->key_parts == 0);
1488
1489 for (const dd::Index *idx_obj : tab_obj->indexes()) {
1490 // Skip hidden indexes
1491 if (idx_obj->is_hidden()) continue;
1492
1493 share->keys++;
1494 uint key_parts = 0;
1495 for (const dd::Index_element *idx_ele : idx_obj->elements()) {
1496 // Skip hidden index elements
1497 if (!idx_ele->is_hidden()) key_parts++;
1498 }
1499 share->key_parts += key_parts;
1500
1501 // Primary key (or candidate key replacing it) is always first if exists.
1502 // If such key doesn't exist (e.g. there are no unique keys in the table)
1503 // we will simply waste some memory.
1504 if (idx_obj->ordinal_position() == 1) primary_key_parts = key_parts;
1505 }
1506
1507 // Allocate and fill KEY objects.
1508 if (share->keys) {
1509 KEY_PART_INFO *key_part;
1510 ulong *rec_per_key;
1511 rec_per_key_t *rec_per_key_float;
1512 uint total_key_parts = share->key_parts;
1513
1514 if (use_extended_sk)
1515 total_key_parts += (primary_key_parts * (share->keys - 1));
1516
1517 //
1518 // Alloc rec_per_key buffer
1519 //
1520 if (!(rec_per_key =
1521 (ulong *)share->mem_root.Alloc(total_key_parts * sizeof(ulong))))
1522 return true; /* purecov: inspected */
1523
1524 //
1525 // Alloc rec_per_key_float buffer
1526 //
1527 if (!(rec_per_key_float = (rec_per_key_t *)share->mem_root.Alloc(
1528 total_key_parts * sizeof(rec_per_key_t))))
1529 return true; /* purecov: inspected */
1530
1531 //
1532 // Alloc buffer to hold keys and key_parts
1533 //
1534
1535 if (!(share->key_info = (KEY *)share->mem_root.Alloc(
1536 share->keys * sizeof(KEY) +
1537 total_key_parts * sizeof(KEY_PART_INFO))))
1538 return true; /* purecov: inspected */
1539
1540 memset(
1541 share->key_info, 0,
1542 (share->keys * sizeof(KEY) + total_key_parts * sizeof(KEY_PART_INFO)));
1543 key_part = (KEY_PART_INFO *)(share->key_info + share->keys);
1544
1545 //
1546 // Alloc buffer to hold keynames
1547 //
1548
1549 if (!(share->keynames.type_names = (const char **)share->mem_root.Alloc(
1550 (share->keys + 1) * sizeof(char *))))
1551 return true; /* purecov: inspected */
1552 memset(share->keynames.type_names, 0, ((share->keys + 1) * sizeof(char *)));
1553
1554 share->keynames.type_names[share->keys] = nullptr;
1555 share->keynames.count = share->keys;
1556
1557 // In first iteration get all the index_obj, so that we get all
1558 // user_defined_key_parts for each key. This is required to propertly
1559 // allocation key_part memory for keys.
1560 const dd::Index *index_at_pos[MAX_INDEXES];
1561 uint key_nr = 0;
1562 for (const dd::Index *idx_obj : tab_obj->indexes()) {
1563 // Skip hidden indexes
1564 if (idx_obj->is_hidden()) continue;
1565
1566 if (fill_index_from_dd(thd, share, idx_obj, key_nr)) return true;
1567
1568 index_at_pos[key_nr] = idx_obj;
1569
1570 share->keys_in_use.set_bit(key_nr);
1571
1572 if (idx_obj->is_visible() && is_spatial_index_usable(*idx_obj))
1573 share->visible_indexes.set_bit(key_nr);
1574
1575 key_nr++;
1576 }
1577
1578 // Update keyparts now
1579 key_nr = 0;
1580 do {
1581 // Assign the key_part_info buffer
1582 KEY *keyinfo = &share->key_info[key_nr];
1583 keyinfo->key_part = key_part;
1584 keyinfo->set_rec_per_key_array(rec_per_key, rec_per_key_float);
1585 keyinfo->set_in_memory_estimate(IN_MEMORY_ESTIMATE_UNKNOWN);
1586
1587 fill_index_elements_from_dd(share, index_at_pos[key_nr], key_nr);
1588
1589 key_part += keyinfo->user_defined_key_parts;
1590 rec_per_key += keyinfo->user_defined_key_parts;
1591 rec_per_key_float += keyinfo->user_defined_key_parts;
1592
1593 // Post processing code ?
1594 /*
1595 Add PK parts if engine supports PK extension for secondary keys.
1596 Atm it works for Innodb only. Here we add unique first key parts
1597 to the end of secondary key parts array and increase actual number
1598 of key parts. Note that primary key is always first if exists.
1599 Later if there is no PK in the table then number of actual keys parts
1600 is set to user defined key parts.
1601 KEY::actual_flags can't be set until we fully set-up KEY::flags.
1602 */
1603 keyinfo->actual_key_parts = keyinfo->user_defined_key_parts;
1604 if (use_extended_sk && key_nr && !(keyinfo->flags & HA_NOSAME)) {
1605 keyinfo->unused_key_parts = primary_key_parts;
1606 key_part += primary_key_parts;
1607 rec_per_key += primary_key_parts;
1608 rec_per_key_float += primary_key_parts;
1609 share->key_parts += primary_key_parts;
1610 }
1611
1612 // Initialize the rec per key arrays
1613 for (uint kp = 0; kp < keyinfo->actual_key_parts; ++kp) {
1614 keyinfo->rec_per_key[kp] = 0;
1615 keyinfo->set_records_per_key(kp, REC_PER_KEY_UNKNOWN);
1616 }
1617
1618 key_nr++;
1619 } while (key_nr < share->keys);
1620 }
1621
1622 return (false);
1623 }
1624
copy_option_string(MEM_ROOT * mem_root,const dd::Properties & options,const dd::String_type & key)1625 static char *copy_option_string(MEM_ROOT *mem_root,
1626 const dd::Properties &options,
1627 const dd::String_type &key) {
1628 dd::String_type tmp_str;
1629 if (options.exists(key) && !options.get(key, &tmp_str) && tmp_str.length()) {
1630 return strdup_root(mem_root, tmp_str.c_str());
1631 }
1632 return nullptr;
1633 }
1634
get_partition_options(MEM_ROOT * mem_root,partition_element * part_elem,const dd::Properties & part_options)1635 static void get_partition_options(MEM_ROOT *mem_root,
1636 partition_element *part_elem,
1637 const dd::Properties &part_options) {
1638 if (part_options.exists("max_rows"))
1639 part_options.get("max_rows", &part_elem->part_max_rows);
1640
1641 if (part_options.exists("min_rows"))
1642 part_options.get("min_rows", &part_elem->part_min_rows);
1643
1644 part_elem->data_file_name =
1645 copy_option_string(mem_root, part_options, "data_file_name");
1646 part_elem->index_file_name =
1647 copy_option_string(mem_root, part_options, "index_file_name");
1648
1649 uint32 nodegroup_id = UNDEF_NODEGROUP;
1650 if (part_options.exists("nodegroup_id"))
1651 part_options.get("nodegroup_id", &nodegroup_id);
1652
1653 DBUG_ASSERT(nodegroup_id <= 0xFFFF);
1654 part_elem->nodegroup_id = nodegroup_id;
1655 }
1656
get_part_column_values(MEM_ROOT * mem_root,partition_info * part_info,partition_element * part_elem,const dd::Partition * part_obj)1657 static bool get_part_column_values(MEM_ROOT *mem_root,
1658 partition_info *part_info,
1659 partition_element *part_elem,
1660 const dd::Partition *part_obj) {
1661 part_elem_value *p_elem_values, *p_val;
1662 part_column_list_val *col_val_array, *col_vals;
1663 uint list_index = 0, entries = 0;
1664 uint max_column_id = 0, max_list_index = 0;
1665
1666 for (const dd::Partition_value *part_value : part_obj->values()) {
1667 max_column_id = std::max(max_column_id, part_value->column_num());
1668 max_list_index = std::max(max_list_index, part_value->list_num());
1669 entries++;
1670 }
1671 if (entries != ((max_column_id + 1) * (max_list_index + 1))) {
1672 DBUG_ASSERT(0); /* purecov: deadcode */
1673 return true;
1674 }
1675
1676 part_info->num_columns = max_column_id + 1;
1677
1678 if (!multi_alloc_root(mem_root, &p_elem_values,
1679 sizeof(*p_elem_values) * (max_list_index + 1),
1680 &col_val_array,
1681 sizeof(*col_val_array) * part_info->num_columns *
1682 (max_list_index + 1),
1683 NULL)) {
1684 return true; /* purecov: inspected */
1685 }
1686 memset(p_elem_values, 0, sizeof(*p_elem_values) * (max_list_index + 1));
1687 memset(
1688 col_val_array, 0,
1689 sizeof(*col_val_array) * part_info->num_columns * (max_list_index + 1));
1690 for (list_index = 0; list_index <= max_list_index; list_index++) {
1691 p_val = &p_elem_values[list_index];
1692 p_val->added_items = 1;
1693 p_val->col_val_array = &col_val_array[list_index * part_info->num_columns];
1694 }
1695
1696 for (const dd::Partition_value *part_value : part_obj->values()) {
1697 p_val = &p_elem_values[part_value->list_num()];
1698 col_vals = p_val->col_val_array;
1699 if (part_value->is_value_null()) {
1700 col_vals[part_value->column_num()].null_value = true;
1701 } else if (part_value->max_value()) {
1702 col_vals[part_value->column_num()].max_value = true;
1703 } else {
1704 // TODO-PARTITION: Perhaps allocate on the heap instead and when the first
1705 // table instance is opened, free it and add the field image instead?
1706 // That way it can be reused for all other table instances.
1707 col_vals[part_value->column_num()].column_value.value_str =
1708 strmake_root(mem_root, part_value->value_utf8().c_str(),
1709 part_value->value_utf8().length());
1710 }
1711 }
1712
1713 for (list_index = 0; list_index <= max_list_index; list_index++) {
1714 p_val = &p_elem_values[list_index];
1715 #ifndef DBUG_OFF
1716 for (uint i = 0; i < part_info->num_columns; i++) {
1717 DBUG_ASSERT(p_val->col_val_array[i].null_value ||
1718 p_val->col_val_array[i].max_value ||
1719 p_val->col_val_array[i].column_value.value_str);
1720 }
1721 #endif
1722 if (part_elem->list_val_list.push_back(p_val, mem_root)) return true;
1723 }
1724
1725 return false;
1726 }
1727
setup_partition_from_dd(THD * thd,MEM_ROOT * mem_root,partition_info * part_info,partition_element * part_elem,const dd::Partition * part_obj,bool is_subpart)1728 static bool setup_partition_from_dd(THD *thd, MEM_ROOT *mem_root,
1729 partition_info *part_info,
1730 partition_element *part_elem,
1731 const dd::Partition *part_obj,
1732 bool is_subpart) {
1733 dd::String_type comment = part_obj->comment();
1734 if (comment.length()) {
1735 part_elem->part_comment = strdup_root(mem_root, comment.c_str());
1736 if (!part_elem->part_comment) return true;
1737 }
1738 part_elem->partition_name = strdup_root(mem_root, part_obj->name().c_str());
1739 if (!part_elem->partition_name) return true;
1740
1741 part_elem->engine_type = part_info->default_engine_type;
1742
1743 get_partition_options(mem_root, part_elem, part_obj->options());
1744
1745 // Read tablespace name.
1746 if (dd::get_tablespace_name<dd::Partition>(
1747 thd, part_obj, &part_elem->tablespace_name, mem_root))
1748 return true;
1749
1750 if (is_subpart) {
1751 /* Only HASH/KEY subpartitioning allowed, no values allowed, so return! */
1752 return false;
1753 }
1754 // Iterate over all possible values
1755 if (part_info->part_type == partition_type::RANGE) {
1756 if (part_info->column_list) {
1757 if (get_part_column_values(mem_root, part_info, part_elem, part_obj))
1758 return true;
1759 } else {
1760 DBUG_ASSERT(part_obj->values().size() == 1);
1761 const dd::Partition_value *part_value = *part_obj->values().begin();
1762 DBUG_ASSERT(part_value->list_num() == 0);
1763 DBUG_ASSERT(part_value->column_num() == 0);
1764 if (part_value->max_value()) {
1765 part_elem->max_value = true;
1766 } else {
1767 if (part_value->value_utf8()[0] == '-') {
1768 part_elem->signed_flag = true;
1769 if (dd::Properties::from_str(part_value->value_utf8(),
1770 &part_elem->range_value)) {
1771 return true;
1772 }
1773 } else {
1774 part_elem->signed_flag = false;
1775 if (dd::Properties::from_str(part_value->value_utf8(),
1776 (ulonglong *)&part_elem->range_value)) {
1777 return true;
1778 }
1779 }
1780 }
1781 }
1782 } else if (part_info->part_type == partition_type::LIST) {
1783 if (part_info->column_list) {
1784 if (get_part_column_values(mem_root, part_info, part_elem, part_obj))
1785 return true;
1786 } else {
1787 uint list_index = 0, max_index = 0, entries = 0, null_entry = 0;
1788 part_elem_value *list_val, *list_val_array = nullptr;
1789 for (const dd::Partition_value *part_value : part_obj->values()) {
1790 max_index = std::max(max_index, part_value->list_num());
1791 entries++;
1792 if (part_value->value_utf8().empty()) {
1793 DBUG_ASSERT(!part_elem->has_null_value);
1794 part_elem->has_null_value = true;
1795 null_entry = part_value->list_num();
1796 }
1797 }
1798 if (entries != (max_index + 1)) {
1799 DBUG_ASSERT(0); /* purecov: deadcode */
1800 return true;
1801 }
1802 /* If a list entry is NULL then it is only flagged on the part_elem. */
1803 if (part_elem->has_null_value) entries--;
1804
1805 if (entries) {
1806 list_val_array = (part_elem_value *)mem_root->Alloc(
1807 sizeof(*list_val_array) * entries);
1808 if (!list_val_array) return true;
1809 memset(list_val_array, 0, sizeof(*list_val_array) * entries);
1810 }
1811
1812 for (const dd::Partition_value *part_value : part_obj->values()) {
1813 DBUG_ASSERT(part_value->column_num() == 0);
1814 if (part_value->value_utf8().empty()) {
1815 DBUG_ASSERT(part_value->list_num() == null_entry);
1816 continue;
1817 }
1818 list_index = part_value->list_num();
1819 /*
1820 If there is a NULL value in the partition values in the DD it is
1821 marked directly on the partition_element and should not have an own
1822 list_val. So compact the list_index range by remove the list_index for
1823 the null_entry.
1824 */
1825 if (part_elem->has_null_value && list_index > null_entry) list_index--;
1826 list_val = &list_val_array[list_index];
1827 DBUG_ASSERT(!list_val->unsigned_flag && !list_val->value);
1828 if (part_value->value_utf8()[0] == '-') {
1829 list_val->unsigned_flag = false;
1830 if (dd::Properties::from_str(part_value->value_utf8(),
1831 &list_val->value))
1832 return true;
1833 } else {
1834 list_val->unsigned_flag = true;
1835 if (dd::Properties::from_str(part_value->value_utf8(),
1836 (ulonglong *)&list_val->value))
1837 return true;
1838 }
1839 }
1840 for (uint i = 0; i < entries; i++) {
1841 if (part_elem->list_val_list.push_back(&list_val_array[i], mem_root))
1842 return true;
1843 }
1844 }
1845 } else {
1846 #ifndef DBUG_OFF
1847 DBUG_ASSERT(part_info->part_type == partition_type::HASH);
1848 DBUG_ASSERT(part_obj->values().empty());
1849 #endif
1850 }
1851 return false;
1852 }
1853
1854 /**
1855 Set field_list
1856
1857 To append each field to the field_list it will parse the
1858 submitted partition_expression string.
1859
1860 Must be in sync with get_field_list_str!
1861
1862 @param[in] mem_root Where to allocate the memory for the list entries.
1863 @param[in] str String object containing the column names.
1864 @param[in,out] field_list List to add field names to.
1865
1866 @return false on success, else true.
1867 */
1868
set_field_list(MEM_ROOT * mem_root,dd::String_type & str,List<char> * field_list)1869 static bool set_field_list(MEM_ROOT *mem_root, dd::String_type &str,
1870 List<char> *field_list) {
1871 dd::String_type field_name;
1872 dd::String_type::const_iterator it(str.begin());
1873 dd::String_type::const_iterator end(str.end());
1874
1875 while (it != end) {
1876 if (dd::eat_str(field_name, it, end, dd::FIELD_NAME_SEPARATOR_CHAR))
1877 return true;
1878 size_t len = field_name.length();
1879 DBUG_ASSERT(len);
1880 char *name = static_cast<char *>(mem_root->Alloc(len + 1));
1881 if (!name) return true; /* purecov: inspected */
1882 memcpy(name, field_name.c_str(), len);
1883 name[len] = '\0';
1884
1885 if (field_list->push_back(name, mem_root)) return true;
1886 }
1887 return false;
1888 }
1889
1890 /**
1891 Fill TABLE_SHARE with partitioning details from dd::Partition.
1892
1893 @details
1894 Set up as much as possible to ease creating new TABLE instances
1895 by copying from the TABLE_SHARE.
1896
1897 Also to prevent future memory duplication partition definitions (names etc)
1898 are stored on the TABLE_SHARE and can be referenced from each TABLE instance.
1899
1900 Note that [sub]part_expr still needs to be parsed from
1901 [sub]part_func_string for each TABLE instance to use the correct
1902 mem_root etc. To be as compatible with the .frm way to open a table
1903 as possible we currently generate the full partitioning clause which
1904 will be parsed for each new TABLE instance.
1905 TODO-PARTITION:
1906 - Create a way to handle Item expressions to be shared/copied
1907 from the TABLE_SHARE.
1908 - On the open of the first TABLE instance, copy the field images
1909 to the TABLE_SHARE::partition_info for each partition value.
1910
1911 @param thd Thread context.
1912 @param share Share to be updated with partitioning details.
1913 @param tab_obj dd::Table object to get partition info from.
1914
1915 @return false if success, else true.
1916 */
1917
fill_partitioning_from_dd(THD * thd,TABLE_SHARE * share,const dd::Table * tab_obj)1918 static bool fill_partitioning_from_dd(THD *thd, TABLE_SHARE *share,
1919 const dd::Table *tab_obj) {
1920 if (tab_obj->partition_type() == dd::Table::PT_NONE) return false;
1921
1922 // The DD only has information about how the table is partitioned in
1923 // the primary storage engine, so don't use this information for
1924 // tables in a secondary storage engine.
1925 if (share->is_secondary_engine()) return false;
1926
1927 partition_info *part_info;
1928 part_info = new (&share->mem_root) partition_info;
1929
1930 handlerton *hton = plugin_data<handlerton *>(
1931 ha_resolve_by_name_raw(thd, lex_cstring_handle(tab_obj->engine())));
1932 DBUG_ASSERT(hton && ha_storage_engine_is_enabled(hton));
1933 part_info->default_engine_type = hton;
1934 if (!part_info->default_engine_type) return true;
1935
1936 // TODO-PARTITION: change partition_info::part_type to same enum as below :)
1937 switch (tab_obj->partition_type()) {
1938 case dd::Table::PT_RANGE_COLUMNS:
1939 part_info->column_list = true;
1940 part_info->list_of_part_fields = true;
1941 // Fall through.
1942 case dd::Table::PT_RANGE:
1943 part_info->part_type = partition_type::RANGE;
1944 break;
1945 case dd::Table::PT_LIST_COLUMNS:
1946 part_info->column_list = true;
1947 part_info->list_of_part_fields = true;
1948 // Fall through.
1949 case dd::Table::PT_LIST:
1950 part_info->part_type = partition_type::LIST;
1951 break;
1952 case dd::Table::PT_LINEAR_HASH:
1953 part_info->linear_hash_ind = true;
1954 // Fall through.
1955 case dd::Table::PT_HASH:
1956 part_info->part_type = partition_type::HASH;
1957 break;
1958 case dd::Table::PT_LINEAR_KEY_51:
1959 part_info->linear_hash_ind = true;
1960 // Fall through.
1961 case dd::Table::PT_KEY_51:
1962 part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_51;
1963 part_info->list_of_part_fields = true;
1964 part_info->part_type = partition_type::HASH;
1965 break;
1966 case dd::Table::PT_LINEAR_KEY_55:
1967 part_info->linear_hash_ind = true;
1968 // Fall through.
1969 case dd::Table::PT_KEY_55:
1970 part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_55;
1971 part_info->list_of_part_fields = true;
1972 part_info->part_type = partition_type::HASH;
1973 break;
1974 case dd::Table::PT_AUTO_LINEAR:
1975 part_info->linear_hash_ind = true;
1976 // Fall through.
1977 case dd::Table::PT_AUTO:
1978 part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_55;
1979 part_info->part_type = partition_type::HASH;
1980 part_info->list_of_part_fields = true;
1981 part_info->is_auto_partitioned = true;
1982 share->auto_partitioned = true;
1983 break;
1984 default:
1985 // Unknown partitioning type!
1986 DBUG_ASSERT(0); /* purecov: deadcode */
1987 return true;
1988 }
1989 switch (tab_obj->subpartition_type()) {
1990 case dd::Table::ST_NONE:
1991 part_info->subpart_type = partition_type::NONE;
1992 break;
1993 case dd::Table::ST_LINEAR_HASH:
1994 part_info->linear_hash_ind = true;
1995 // Fall through.
1996 case dd::Table::ST_HASH:
1997 part_info->subpart_type = partition_type::HASH;
1998 break;
1999 case dd::Table::ST_LINEAR_KEY_51:
2000 part_info->linear_hash_ind = true;
2001 // Fall through.
2002 case dd::Table::ST_KEY_51:
2003 part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_51;
2004 part_info->list_of_subpart_fields = true;
2005 part_info->subpart_type = partition_type::HASH;
2006 break;
2007 case dd::Table::ST_LINEAR_KEY_55:
2008 part_info->linear_hash_ind = true;
2009 // Fall through.
2010 case dd::Table::ST_KEY_55:
2011 part_info->key_algorithm = enum_key_algorithm::KEY_ALGORITHM_55;
2012 part_info->list_of_subpart_fields = true;
2013 part_info->subpart_type = partition_type::HASH;
2014 break;
2015 default:
2016 // Unknown sub partitioning type!
2017 DBUG_ASSERT(0); /* purecov: deadcode */
2018 return true;
2019 }
2020
2021 dd::String_type part_expr = tab_obj->partition_expression();
2022 if (part_info->list_of_part_fields) {
2023 if (set_field_list(&share->mem_root, part_expr,
2024 &part_info->part_field_list)) {
2025 return true;
2026 }
2027 part_info->part_func_string = nullptr;
2028 part_info->part_func_len = 0;
2029 } else {
2030 part_info->part_func_string =
2031 strdup_root(&share->mem_root, part_expr.c_str());
2032 part_info->part_func_len = part_expr.length();
2033 }
2034 dd::String_type subpart_expr = tab_obj->subpartition_expression();
2035 part_info->subpart_func_len = subpart_expr.length();
2036 if (part_info->subpart_func_len) {
2037 if (part_info->list_of_subpart_fields) {
2038 if (set_field_list(&share->mem_root, subpart_expr,
2039 &part_info->subpart_field_list)) {
2040 return true;
2041 }
2042 part_info->subpart_func_string = nullptr;
2043 part_info->subpart_func_len = 0;
2044 } else {
2045 part_info->subpart_func_string =
2046 strdup_root(&share->mem_root, subpart_expr.c_str());
2047 }
2048 }
2049
2050 //
2051 // Iterate through all the partitions
2052 //
2053
2054 partition_element *curr_part_elem;
2055 List_iterator<partition_element> part_elem_it;
2056
2057 /* Partitions are sorted first on level and then on number. */
2058
2059 #ifndef DBUG_OFF
2060 uint number = 0;
2061 #endif
2062 for (const dd::Partition *part_obj : tab_obj->partitions()) {
2063 #ifndef DBUG_OFF
2064 /* Must be in sorted order (sorted by level first and then on number). */
2065 DBUG_ASSERT(part_obj->number() >= number);
2066 number = part_obj->number();
2067 #endif
2068
2069 DBUG_ASSERT(part_obj->parent_partition_id() == dd::INVALID_OBJECT_ID);
2070
2071 curr_part_elem = new (&share->mem_root) partition_element;
2072 if (!curr_part_elem) {
2073 return true;
2074 }
2075
2076 if (setup_partition_from_dd(thd, &share->mem_root, part_info,
2077 curr_part_elem, part_obj, false)) {
2078 return true;
2079 }
2080
2081 if (part_info->partitions.push_back(curr_part_elem, &share->mem_root))
2082 return true;
2083
2084 for (const dd::Partition *sub_part_obj : part_obj->subpartitions()) {
2085 partition_element *curr_sub_part_elem =
2086 new (&share->mem_root) partition_element;
2087 if (!curr_sub_part_elem) {
2088 return true;
2089 }
2090
2091 if (setup_partition_from_dd(thd, &share->mem_root, part_info,
2092 curr_sub_part_elem, sub_part_obj, true)) {
2093 return true;
2094 }
2095
2096 if (curr_part_elem->subpartitions.push_back(curr_sub_part_elem,
2097 &share->mem_root))
2098 return true;
2099 }
2100 }
2101
2102 // Get partition and sub_partition count.
2103 part_info->num_parts = part_info->partitions.elements;
2104 part_info->num_subparts = part_info->partitions[0]->subpartitions.elements;
2105
2106 switch (tab_obj->default_partitioning()) {
2107 case dd::Table::DP_NO:
2108 part_info->use_default_partitions = false;
2109 part_info->use_default_num_partitions = false;
2110 break;
2111 case dd::Table::DP_YES:
2112 part_info->use_default_partitions = true;
2113 part_info->use_default_num_partitions = true;
2114 break;
2115 case dd::Table::DP_NUMBER:
2116 part_info->use_default_partitions = true;
2117 part_info->use_default_num_partitions = false;
2118 break;
2119 case dd::Table::DP_NONE:
2120 default:
2121 DBUG_ASSERT(0); /* purecov: deadcode */
2122 }
2123 switch (tab_obj->default_subpartitioning()) {
2124 case dd::Table::DP_NO:
2125 part_info->use_default_subpartitions = false;
2126 part_info->use_default_num_subpartitions = false;
2127 break;
2128 case dd::Table::DP_YES:
2129 part_info->use_default_subpartitions = true;
2130 part_info->use_default_num_subpartitions = true;
2131 break;
2132 case dd::Table::DP_NUMBER:
2133 part_info->use_default_subpartitions = true;
2134 part_info->use_default_num_subpartitions = false;
2135 break;
2136 case dd::Table::DP_NONE:
2137 DBUG_ASSERT(!part_info->is_sub_partitioned());
2138 break;
2139 default:
2140 DBUG_ASSERT(0); /* purecov: deadcode */
2141 }
2142
2143 char *buf;
2144 uint buf_len;
2145
2146 // Turn off ANSI_QUOTES and other SQL modes which affect printing of
2147 // generated partitioning clause.
2148 Sql_mode_parse_guard parse_guard(thd);
2149
2150 buf = generate_partition_syntax(part_info, &buf_len, true, true, false,
2151 nullptr);
2152
2153 if (!buf) return true;
2154
2155 share->partition_info_str = strmake_root(&share->mem_root, buf, buf_len);
2156 if (!share->partition_info_str) return true;
2157
2158 share->partition_info_str_len = buf_len;
2159 share->m_part_info = part_info;
2160 return (false);
2161 }
2162
2163 /**
2164 Fill TABLE_SHARE with information about foreign keys from dd::Table.
2165 */
2166
fill_foreign_keys_from_dd(TABLE_SHARE * share,const dd::Table * tab_obj)2167 static bool fill_foreign_keys_from_dd(TABLE_SHARE *share,
2168 const dd::Table *tab_obj) {
2169 DBUG_ASSERT(share->foreign_keys == 0 && share->foreign_key_parents == 0);
2170
2171 share->foreign_keys = tab_obj->foreign_keys().size();
2172 share->foreign_key_parents = tab_obj->foreign_key_parents().size();
2173
2174 if (share->foreign_keys) {
2175 if (!(share->foreign_key =
2176 (TABLE_SHARE_FOREIGN_KEY_INFO *)share->mem_root.Alloc(
2177 share->foreign_keys * sizeof(TABLE_SHARE_FOREIGN_KEY_INFO))))
2178 return true;
2179
2180 uint i = 0;
2181
2182 for (const dd::Foreign_key *fk : tab_obj->foreign_keys()) {
2183 if (lex_string_strmake(&share->mem_root,
2184 &share->foreign_key[i].referenced_table_db,
2185 fk->referenced_table_schema_name().c_str(),
2186 fk->referenced_table_schema_name().length()))
2187 return true;
2188 if (lex_string_strmake(&share->mem_root,
2189 &share->foreign_key[i].referenced_table_name,
2190 fk->referenced_table_name().c_str(),
2191 fk->referenced_table_name().length()))
2192 return true;
2193 if (lex_string_strmake(&share->mem_root,
2194 &share->foreign_key[i].unique_constraint_name,
2195 fk->unique_constraint_name().c_str(),
2196 fk->unique_constraint_name().length()))
2197 return true;
2198
2199 share->foreign_key[i].update_rule = fk->update_rule();
2200 share->foreign_key[i].delete_rule = fk->delete_rule();
2201
2202 share->foreign_key[i].columns = fk->elements().size();
2203 if (!(share->foreign_key[i].column_name =
2204 (LEX_CSTRING *)share->mem_root.Alloc(
2205 share->foreign_key[i].columns * sizeof(LEX_CSTRING))))
2206 return true;
2207
2208 uint j = 0;
2209
2210 for (const dd::Foreign_key_element *fk_el : fk->elements()) {
2211 if (lex_string_strmake(&share->mem_root,
2212 &share->foreign_key[i].column_name[j],
2213 fk_el->column().name().c_str(),
2214 fk_el->column().name().length()))
2215 return true;
2216
2217 ++j;
2218 }
2219
2220 ++i;
2221 }
2222 }
2223
2224 if (share->foreign_key_parents) {
2225 if (!(share->foreign_key_parent =
2226 (TABLE_SHARE_FOREIGN_KEY_PARENT_INFO *)share->mem_root.Alloc(
2227 share->foreign_key_parents *
2228 sizeof(TABLE_SHARE_FOREIGN_KEY_PARENT_INFO))))
2229 return true;
2230
2231 uint i = 0;
2232
2233 for (const dd::Foreign_key_parent *fk_p : tab_obj->foreign_key_parents()) {
2234 if (lex_string_strmake(&share->mem_root,
2235 &share->foreign_key_parent[i].referencing_table_db,
2236 fk_p->child_schema_name().c_str(),
2237 fk_p->child_schema_name().length()))
2238 return true;
2239 if (lex_string_strmake(
2240 &share->mem_root,
2241 &share->foreign_key_parent[i].referencing_table_name,
2242 fk_p->child_table_name().c_str(),
2243 fk_p->child_table_name().length()))
2244 return true;
2245 share->foreign_key_parent[i].update_rule = fk_p->update_rule();
2246 share->foreign_key_parent[i].delete_rule = fk_p->delete_rule();
2247 ++i;
2248 }
2249 }
2250 return false;
2251 }
2252
2253 /**
2254 Fill check constraints from dd::Table object to the TABLE_SHARE.
2255
2256 @param[in,out] share TABLE_SHARE instance.
2257 @param[in] tab_obj Table instance.
2258
2259 @retval false On Success.
2260 @retval true On failure.
2261 */
fill_check_constraints_from_dd(TABLE_SHARE * share,const dd::Table * tab_obj)2262 static bool fill_check_constraints_from_dd(TABLE_SHARE *share,
2263 const dd::Table *tab_obj) {
2264 DBUG_ASSERT(share->check_constraint_share_list == nullptr);
2265
2266 if (tab_obj->check_constraints().size() > 0) {
2267 share->check_constraint_share_list = new (&share->mem_root)
2268 Sql_check_constraint_share_list(&share->mem_root);
2269 if (share->check_constraint_share_list == nullptr) return true; // OOM
2270
2271 if (share->check_constraint_share_list->reserve(
2272 tab_obj->check_constraints().size()))
2273 return true; // OOM
2274
2275 for (auto &cc : tab_obj->check_constraints()) {
2276 // Check constraint name.
2277 LEX_CSTRING name;
2278 if (lex_string_strmake(&share->mem_root, &name, cc->name().c_str(),
2279 cc->name().length()))
2280 return true; // OOM
2281
2282 // Check constraint expression (clause).
2283 LEX_CSTRING check_clause;
2284 if (lex_string_strmake(&share->mem_root, &check_clause,
2285 cc->check_clause().c_str(),
2286 cc->check_clause().length()))
2287 return true; // OOM
2288
2289 // Check constraint state.
2290 bool is_cc_enforced =
2291 (cc->constraint_state() == dd::Check_constraint::CS_ENFORCED);
2292
2293 share->check_constraint_share_list->push_back(
2294 Sql_check_constraint_share(name, check_clause, is_cc_enforced));
2295 }
2296 }
2297
2298 return false;
2299 }
2300
open_table_def(THD * thd,TABLE_SHARE * share,const dd::Table & table_def)2301 bool open_table_def(THD *thd, TABLE_SHARE *share, const dd::Table &table_def) {
2302 DBUG_TRACE;
2303
2304 MEM_ROOT *old_root = thd->mem_root;
2305 thd->mem_root = &share->mem_root; // Needed for make_field()++
2306 share->blob_fields = 0; // HACK
2307
2308 // Fill the TABLE_SHARE with details.
2309 bool error = (fill_share_from_dd(thd, share, &table_def) ||
2310 fill_columns_from_dd(thd, share, &table_def) ||
2311 fill_indexes_from_dd(thd, share, &table_def) ||
2312 fill_partitioning_from_dd(thd, share, &table_def) ||
2313 fill_foreign_keys_from_dd(share, &table_def) ||
2314 fill_check_constraints_from_dd(share, &table_def));
2315
2316 thd->mem_root = old_root;
2317
2318 if (!error) error = prepare_share(thd, share, &table_def);
2319
2320 if (!error) {
2321 share->table_category = get_table_category(share->db, share->table_name);
2322 thd->status_var.opened_shares++;
2323 return false;
2324 }
2325 return true;
2326 }
2327
2328 /*
2329 Ignore errors related to invalid collation and missing parser during
2330 open_table_def().
2331 */
2332 class Open_table_error_handler : public Internal_error_handler {
2333 public:
handle_condition(THD *,uint sql_errno,const char *,Sql_condition::enum_severity_level *,const char *)2334 virtual bool handle_condition(THD *, uint sql_errno, const char *,
2335 Sql_condition::enum_severity_level *,
2336 const char *) {
2337 return (sql_errno == ER_UNKNOWN_COLLATION ||
2338 sql_errno == ER_PLUGIN_IS_NOT_LOADED);
2339 }
2340 };
2341
open_table_def_suppress_invalid_meta_data(THD * thd,TABLE_SHARE * share,const dd::Table & table_def)2342 bool open_table_def_suppress_invalid_meta_data(THD *thd, TABLE_SHARE *share,
2343 const dd::Table &table_def) {
2344 Open_table_error_handler error_handler;
2345 thd->push_internal_handler(&error_handler);
2346 bool error = open_table_def(thd, share, table_def);
2347 thd->pop_internal_handler();
2348 return error;
2349 }
2350
2351 //////////////////////////////////////////////////////////////////////////
2352