1 /* 2 Copyright (c) 2012,2013 Monty Program Ab 3 4 This program is free software; you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation; version 2 of the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ 16 17 #ifdef USE_PRAGMA_IMPLEMENTATION 18 #pragma implementation // gcc: Class implementation 19 #endif 20 21 /* For use of 'PRIu64': */ 22 #define __STDC_FORMAT_MACROS 23 24 #include <my_global.h> 25 /* This C++ file's header file */ 26 #include "./rdb_datadic.h" 27 28 #include <inttypes.h> 29 /* C++ standard header files */ 30 #include <algorithm> 31 #include <array> 32 #include <limits> 33 #include <map> 34 #include <set> 35 #include <string> 36 #include <utility> 37 #include <vector> 38 39 /* MySQL header files */ 40 #include "./field.h" 41 #include "./key.h" 42 #include "./m_ctype.h" 43 #include "./my_bit.h" 44 #include "./my_bitmap.h" 45 #include "./sql_table.h" 46 47 /* MyRocks header files */ 48 #include "./ha_rocksdb.h" 49 #include "./ha_rocksdb_proto.h" 50 #include "./my_stacktrace.h" 51 #include "./rdb_cf_manager.h" 52 #include "./rdb_psi.h" 53 #include "./rdb_utils.h" 54 55 namespace myrocks { 56 57 void get_mem_comparable_space(const CHARSET_INFO *cs, 58 const std::vector<uchar> **xfrm, size_t *xfrm_len, 59 size_t *mb_len); 60 61 /* 62 MariaDB's replacement for FB/MySQL Field::check_field_name_match : 63 */ 64 inline bool field_check_field_name_match(Field *field, const char *name) 65 { 66 return (0 == my_strcasecmp(system_charset_info, 67 field->field_name.str, 68 name)); 69 } 70 71 72 /* 73 Decode current key field 74 @param fpi IN data structure contains field metadata 75 @param field IN current field 76 @param reader IN key slice reader 77 @param unp_reader IN unpack information reader 78 @return 79 HA_EXIT_SUCCESS OK 80 other HA_ERR error code 81 */ 82 int Rdb_convert_to_record_key_decoder::decode_field( 83 Rdb_field_packing *fpi, Field *field, Rdb_string_reader *reader, 84 const uchar *const default_value, Rdb_string_reader *unpack_reader) { 85 if (fpi->m_maybe_null) { 86 const char *nullp; 87 if (!(nullp = reader->read(1))) { 88 return HA_EXIT_FAILURE; 89 } 90 91 if (*nullp == 0) { 92 /* Set the NULL-bit of this field */ 93 field->set_null(); 94 /* Also set the field to its default value */ 95 memcpy(field->ptr, default_value, field->pack_length()); 96 return HA_EXIT_SUCCESS; 97 } else if (*nullp == 1) { 98 field->set_notnull(); 99 } else { 100 return HA_EXIT_FAILURE; 101 } 102 } 103 104 return (fpi->m_unpack_func)(fpi, field, field->ptr, reader, unpack_reader); 105 } 106 107 /* 108 Decode current key field 109 110 @param buf OUT the buf starting address 111 @param offset OUT the bytes offset when data is written 112 @param fpi IN data structure contains field metadata 113 @param table IN current table 114 @param field IN current field 115 @param has_unpack_inf IN whether contains unpack inf 116 @param reader IN key slice reader 117 @param unp_reader IN unpack information reader 118 @return 119 HA_EXIT_SUCCESS OK 120 other HA_ERR error code 121 */ 122 int Rdb_convert_to_record_key_decoder::decode( 123 uchar *const buf, uint *offset, Rdb_field_packing *fpi, TABLE *table, 124 Field *field, bool has_unpack_info, Rdb_string_reader *reader, 125 Rdb_string_reader *unpack_reader) { 126 DBUG_ASSERT(buf != nullptr); 127 DBUG_ASSERT(offset != nullptr); 128 129 uint field_offset = field->ptr - table->record[0]; 130 *offset = field_offset; 131 uint null_offset = field->null_offset(); 132 bool maybe_null = field->real_maybe_null(); 133 134 field->move_field(buf + field_offset, 135 maybe_null ? buf + null_offset : nullptr, field->null_bit); 136 137 // If we need unpack info, but there is none, tell the unpack function 138 // this by passing unp_reader as nullptr. If we never read unpack_info 139 // during unpacking anyway, then there won't an error. 140 bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info(); 141 142 int res = 143 decode_field(fpi, field, reader, table->s->default_values + field_offset, 144 maybe_missing_unpack ? nullptr : unpack_reader); 145 146 // Restore field->ptr and field->null_ptr 147 field->move_field(table->record[0] + field_offset, 148 maybe_null ? table->record[0] + null_offset : nullptr, 149 field->null_bit); 150 if (res != UNPACK_SUCCESS) { 151 return HA_ERR_ROCKSDB_CORRUPT_DATA; 152 } 153 return HA_EXIT_SUCCESS; 154 } 155 156 /* 157 Skip current key field 158 159 @param fpi IN data structure contains field metadata 160 @param field IN current field 161 @param reader IN key slice reader 162 @param unp_reader IN unpack information reader 163 @return 164 HA_EXIT_SUCCESS OK 165 other HA_ERR error code 166 */ 167 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi, 168 const Field *field, 169 Rdb_string_reader *reader, 170 Rdb_string_reader *unp_reader) { 171 /* It is impossible to unpack the column. Skip it. */ 172 if (fpi->m_maybe_null) { 173 const char *nullp; 174 if (!(nullp = reader->read(1))) { 175 return HA_ERR_ROCKSDB_CORRUPT_DATA; 176 } 177 if (*nullp == 0) { 178 /* This is a NULL value */ 179 return HA_EXIT_SUCCESS; 180 } 181 /* If NULL marker is not '0', it can be only '1' */ 182 if (*nullp != 1) { 183 return HA_ERR_ROCKSDB_CORRUPT_DATA; 184 } 185 } 186 if ((fpi->m_skip_func)(fpi, field, reader)) { 187 return HA_ERR_ROCKSDB_CORRUPT_DATA; 188 } 189 // If this is a space padded varchar, we need to skip the indicator 190 // bytes for trailing bytes. They're useless since we can't restore the 191 // field anyway. 192 // 193 // There is a special case for prefixed varchars where we do not 194 // generate unpack info, because we know prefixed varchars cannot be 195 // unpacked. In this case, it is not necessary to skip. 196 if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad && 197 !fpi->m_unpack_info_stores_value) { 198 unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1); 199 } 200 return HA_EXIT_SUCCESS; 201 } 202 203 Rdb_key_field_iterator::Rdb_key_field_iterator( 204 const Rdb_key_def *key_def, Rdb_field_packing *pack_info, 205 Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table, 206 bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) { 207 m_key_def = key_def; 208 m_pack_info = pack_info; 209 m_iter_index = 0; 210 m_iter_end = key_def->get_key_parts(); 211 m_reader = reader; 212 m_unp_reader = unp_reader; 213 m_table = table; 214 m_has_unpack_info = has_unpack_info; 215 m_covered_bitmap = covered_bitmap; 216 m_buf = buf; 217 m_secondary_key = 218 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY); 219 m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table); 220 m_is_hidden_pk = 221 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY); 222 m_curr_bitmap_pos = 0; 223 m_offset = 0; 224 } 225 226 void *Rdb_key_field_iterator::get_dst() const { return m_buf + m_offset; } 227 228 int Rdb_key_field_iterator::get_field_index() const { 229 DBUG_ASSERT(m_field != nullptr); 230 return m_field->field_index; 231 } 232 233 bool Rdb_key_field_iterator::get_is_null() const { return m_is_null; } 234 Field *Rdb_key_field_iterator::get_field() const { 235 DBUG_ASSERT(m_field != nullptr); 236 return m_field; 237 } 238 239 bool Rdb_key_field_iterator::has_next() { return m_iter_index < m_iter_end; } 240 241 /** 242 Iterate each field in the key and decode/skip one by one 243 */ 244 int Rdb_key_field_iterator::next() { 245 int status = HA_EXIT_SUCCESS; 246 while (m_iter_index < m_iter_end) { 247 int curr_index = m_iter_index++; 248 249 m_fpi = &m_pack_info[curr_index]; 250 /* 251 Hidden pk field is packed at the end of the secondary keys, but the SQL 252 layer does not know about it. Skip retrieving field if hidden pk. 253 */ 254 if ((m_secondary_key && m_hidden_pk_exists && 255 curr_index + 1 == m_iter_end) || 256 m_is_hidden_pk) { 257 DBUG_ASSERT(m_fpi->m_unpack_func); 258 if ((m_fpi->m_skip_func)(m_fpi, nullptr, m_reader)) { 259 return HA_ERR_ROCKSDB_CORRUPT_DATA; 260 } 261 return HA_EXIT_SUCCESS; 262 } 263 264 m_field = m_fpi->get_field_in_table(m_table); 265 266 bool covered_column = true; 267 if (m_covered_bitmap != nullptr && 268 m_field->real_type() == MYSQL_TYPE_VARCHAR && !m_fpi->m_covered) { 269 uint tmp= m_curr_bitmap_pos++; 270 covered_column = m_curr_bitmap_pos < MAX_REF_PARTS && 271 bitmap_is_set(m_covered_bitmap, tmp); 272 } 273 274 if (m_fpi->m_unpack_func && covered_column) { 275 /* It is possible to unpack this column. Do it. */ 276 status = Rdb_convert_to_record_key_decoder::decode( 277 m_buf, &m_offset, m_fpi, m_table, m_field, m_has_unpack_info, 278 m_reader, m_unp_reader); 279 if (status) { 280 return status; 281 } 282 break; 283 } else { 284 status = Rdb_convert_to_record_key_decoder::skip(m_fpi, m_field, m_reader, 285 m_unp_reader); 286 if (status) { 287 return status; 288 } 289 } 290 } 291 return HA_EXIT_SUCCESS; 292 } 293 294 /* 295 Rdb_key_def class implementation 296 */ 297 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg, 298 rocksdb::ColumnFamilyHandle *cf_handle_arg, 299 uint16_t index_dict_version_arg, uchar index_type_arg, 300 uint16_t kv_format_version_arg, bool is_reverse_cf_arg, 301 bool is_per_partition_cf_arg, const char *_name, 302 Rdb_index_stats _stats, uint32 index_flags_bitmap, 303 uint32 ttl_rec_offset, uint64 ttl_duration) 304 : m_index_number(indexnr_arg), 305 m_cf_handle(cf_handle_arg), 306 m_index_dict_version(index_dict_version_arg), 307 m_index_type(index_type_arg), 308 m_kv_format_version(kv_format_version_arg), 309 m_is_reverse_cf(is_reverse_cf_arg), 310 m_is_per_partition_cf(is_per_partition_cf_arg), 311 m_name(_name), 312 m_stats(_stats), 313 m_index_flags_bitmap(index_flags_bitmap), 314 m_ttl_rec_offset(ttl_rec_offset), 315 m_ttl_duration(ttl_duration), 316 m_ttl_column(""), 317 m_pk_part_no(nullptr), 318 m_pack_info(nullptr), 319 m_keyno(keyno_arg), 320 m_key_parts(0), 321 m_ttl_pk_key_part_offset(UINT_MAX), 322 m_ttl_field_index(UINT_MAX), 323 m_prefix_extractor(nullptr), 324 m_maxlength(0) // means 'not intialized' 325 { 326 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST); 327 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number); 328 m_total_index_flags_length = 329 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG); 330 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY && 331 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2, 332 m_total_index_flags_length == 0); 333 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY && 334 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2, 335 m_total_index_flags_length == 0); 336 DBUG_ASSERT(m_cf_handle != nullptr); 337 } 338 339 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k) 340 : m_index_number(k.m_index_number), 341 m_cf_handle(k.m_cf_handle), 342 m_is_reverse_cf(k.m_is_reverse_cf), 343 m_is_per_partition_cf(k.m_is_per_partition_cf), 344 m_name(k.m_name), 345 m_stats(k.m_stats), 346 m_index_flags_bitmap(k.m_index_flags_bitmap), 347 m_ttl_rec_offset(k.m_ttl_rec_offset), 348 m_ttl_duration(k.m_ttl_duration), 349 m_ttl_column(k.m_ttl_column), 350 m_pk_part_no(k.m_pk_part_no), 351 m_pack_info(k.m_pack_info), 352 m_keyno(k.m_keyno), 353 m_key_parts(k.m_key_parts), 354 m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset), 355 m_ttl_field_index(UINT_MAX), 356 m_prefix_extractor(k.m_prefix_extractor), 357 m_maxlength(k.m_maxlength) { 358 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST); 359 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number); 360 m_total_index_flags_length = 361 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG); 362 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY && 363 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2, 364 m_total_index_flags_length == 0); 365 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY && 366 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2, 367 m_total_index_flags_length == 0); 368 if (k.m_pack_info) { 369 const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts; 370 void *pack_info= my_malloc(size, MYF(0)); 371 memcpy(pack_info, k.m_pack_info, size); 372 m_pack_info = reinterpret_cast<Rdb_field_packing *>(pack_info); 373 } 374 375 if (k.m_pk_part_no) { 376 const size_t size = sizeof(uint) * m_key_parts; 377 m_pk_part_no = reinterpret_cast<uint *>(my_malloc(size, MYF(0))); 378 memcpy(m_pk_part_no, k.m_pk_part_no, size); 379 } 380 } 381 382 Rdb_key_def::~Rdb_key_def() { 383 mysql_mutex_destroy(&m_mutex); 384 385 my_free(m_pk_part_no); 386 m_pk_part_no = nullptr; 387 388 my_free(m_pack_info); 389 m_pack_info = nullptr; 390 } 391 392 void Rdb_key_def::setup(const TABLE *const tbl, 393 const Rdb_tbl_def *const tbl_def) { 394 DBUG_ASSERT(tbl != nullptr); 395 DBUG_ASSERT(tbl_def != nullptr); 396 397 /* 398 Set max_length based on the table. This can be called concurrently from 399 multiple threads, so there is a mutex to protect this code. 400 */ 401 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY); 402 const bool hidden_pk_exists = table_has_hidden_pk(tbl); 403 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY); 404 if (!m_maxlength) { 405 RDB_MUTEX_LOCK_CHECK(m_mutex); 406 if (m_maxlength != 0) { 407 RDB_MUTEX_UNLOCK_CHECK(m_mutex); 408 return; 409 } 410 411 KEY *key_info = nullptr; 412 KEY *pk_info = nullptr; 413 if (!is_hidden_pk) { 414 key_info = &tbl->key_info[m_keyno]; 415 if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key]; 416 m_name = std::string(key_info->name.str); 417 } else { 418 m_name = HIDDEN_PK_NAME; 419 } 420 421 if (secondary_key) { 422 m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts; 423 } else { 424 pk_info = nullptr; 425 m_pk_key_parts = 0; 426 } 427 428 // "unique" secondary keys support: 429 m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts; 430 431 if (secondary_key) { 432 /* 433 In most cases, SQL layer puts PK columns as invisible suffix at the 434 end of secondary key. There are cases where this doesn't happen: 435 - unique secondary indexes. 436 - partitioned tables. 437 438 Internally, we always need PK columns as suffix (and InnoDB does, 439 too, if you were wondering). 440 441 The loop below will attempt to put all PK columns at the end of key 442 definition. Columns that are already included in the index (either 443 by the user or by "extended keys" feature) are not included for the 444 second time. 445 */ 446 m_key_parts += m_pk_key_parts; 447 } 448 449 if (secondary_key) { 450 m_pk_part_no = reinterpret_cast<uint *>( 451 my_malloc(sizeof(uint) * m_key_parts, MYF(0))); 452 } else { 453 m_pk_part_no = nullptr; 454 } 455 456 const size_t size = sizeof(Rdb_field_packing) * m_key_parts; 457 m_pack_info = 458 reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0))); 459 460 /* 461 Guaranteed not to error here as checks have been made already during 462 table creation. 463 */ 464 Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column, 465 &m_ttl_field_index, true); 466 467 size_t max_len = INDEX_NUMBER_SIZE; 468 int unpack_len = 0; 469 int max_part_len = 0; 470 bool simulating_extkey = false; 471 uint dst_i = 0; 472 473 uint keyno_to_set = m_keyno; 474 uint keypart_to_set = 0; 475 476 if (is_hidden_pk) { 477 Field *field = nullptr; 478 m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0); 479 m_pack_info[dst_i].m_unpack_data_offset = unpack_len; 480 max_len += m_pack_info[dst_i].m_max_image_len; 481 max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len); 482 dst_i++; 483 } else { 484 KEY_PART_INFO *key_part = key_info->key_part; 485 486 /* this loop also loops over the 'extended key' tail */ 487 for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) { 488 Field *const field = key_part ? key_part->field : nullptr; 489 490 if (simulating_extkey && !hidden_pk_exists) { 491 DBUG_ASSERT(secondary_key); 492 /* Check if this field is already present in the key definition */ 493 bool found = false; 494 for (uint j= 0; j < key_info->ext_key_parts; j++) { 495 if (field->field_index == 496 key_info->key_part[j].field->field_index && 497 key_part->length == key_info->key_part[j].length) { 498 found = true; 499 break; 500 } 501 } 502 503 if (found) { 504 key_part++; 505 continue; 506 } 507 } 508 509 if (field && field->real_maybe_null()) max_len += 1; // NULL-byte 510 511 m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set, 512 key_part ? key_part->length : 0); 513 m_pack_info[dst_i].m_unpack_data_offset = unpack_len; 514 515 if (pk_info) { 516 m_pk_part_no[dst_i] = -1; 517 for (uint j = 0; j < m_pk_key_parts; j++) { 518 if (field->field_index == pk_info->key_part[j].field->field_index) { 519 m_pk_part_no[dst_i] = j; 520 break; 521 } 522 } 523 } else if (secondary_key && hidden_pk_exists) { 524 /* 525 The hidden pk can never be part of the sk. So it is always 526 appended to the end of the sk. 527 */ 528 m_pk_part_no[dst_i] = -1; 529 if (simulating_extkey) m_pk_part_no[dst_i] = 0; 530 } 531 532 max_len += m_pack_info[dst_i].m_max_image_len; 533 534 max_part_len = 535 std::max(max_part_len, m_pack_info[dst_i].m_max_image_len); 536 537 /* 538 Check key part name here, if it matches the TTL column then we store 539 the offset of the TTL key part here. 540 */ 541 if (!m_ttl_column.empty() && 542 field_check_field_name_match(field, m_ttl_column.c_str())) { 543 DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG); 544 DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG); 545 DBUG_ASSERT(!field->real_maybe_null()); 546 m_ttl_pk_key_part_offset = dst_i; 547 } 548 549 key_part++; 550 /* 551 For "unique" secondary indexes, pretend they have 552 "index extensions". 553 554 MariaDB also has this property: if an index has a partially-covered 555 column like KEY(varchar_col(N)), then the SQL layer will think it is 556 not "extended" with PK columns. The code below handles this case, 557 also. 558 */ 559 if (secondary_key && src_i+1 == key_info->ext_key_parts) { 560 simulating_extkey = true; 561 if (!hidden_pk_exists) { 562 keyno_to_set = tbl->s->primary_key; 563 key_part = pk_info->key_part; 564 keypart_to_set = (uint)-1; 565 } else { 566 keyno_to_set = tbl_def->m_key_count - 1; 567 key_part = nullptr; 568 keypart_to_set = 0; 569 } 570 } 571 572 dst_i++; 573 } 574 } 575 576 m_key_parts = dst_i; 577 578 /* Initialize the memory needed by the stats structure */ 579 m_stats.m_distinct_keys_per_prefix.resize(get_key_parts()); 580 581 /* Cache prefix extractor for bloom filter usage later */ 582 rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf()); 583 m_prefix_extractor = opt.prefix_extractor; 584 585 /* 586 This should be the last member variable set before releasing the mutex 587 so that other threads can't see the object partially set up. 588 */ 589 m_maxlength = max_len; 590 591 RDB_MUTEX_UNLOCK_CHECK(m_mutex); 592 } 593 } 594 595 /* 596 Determine if the table has TTL enabled by parsing the table comment. 597 598 @param[IN] table_arg 599 @param[IN] tbl_def_arg 600 @param[OUT] ttl_duration Default TTL value parsed from table comment 601 */ 602 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg, 603 const Rdb_tbl_def *const tbl_def_arg, 604 uint64 *ttl_duration) { 605 DBUG_ASSERT(table_arg != nullptr); 606 DBUG_ASSERT(tbl_def_arg != nullptr); 607 DBUG_ASSERT(ttl_duration != nullptr); 608 std::string table_comment(table_arg->s->comment.str, 609 table_arg->s->comment.length); 610 611 bool ttl_duration_per_part_match_found = false; 612 std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier( 613 table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found, 614 RDB_TTL_DURATION_QUALIFIER); 615 616 /* If we don't have a ttl duration, nothing to do here. */ 617 if (ttl_duration_str.empty()) { 618 return HA_EXIT_SUCCESS; 619 } 620 621 /* 622 Catch errors where a non-integral value was used as ttl duration, strtoull 623 will return 0. 624 */ 625 *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0); 626 if (!*ttl_duration) { 627 my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str()); 628 return HA_EXIT_FAILURE; 629 } 630 631 return HA_EXIT_SUCCESS; 632 } 633 634 /* 635 Determine if the table has TTL enabled by parsing the table comment. 636 637 @param[IN] table_arg 638 @param[IN] tbl_def_arg 639 @param[OUT] ttl_column TTL column in the table 640 @param[IN] skip_checks Skip validation checks (when called in 641 setup()) 642 */ 643 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg, 644 const Rdb_tbl_def *const tbl_def_arg, 645 std::string *ttl_column, 646 uint *ttl_field_index, bool skip_checks) { 647 std::string table_comment(table_arg->s->comment.str, 648 table_arg->s->comment.length); 649 /* 650 Check if there is a TTL column specified. Note that this is not required 651 and if omitted, an 8-byte ttl field will be prepended to each record 652 implicitly. 653 */ 654 bool ttl_col_per_part_match_found = false; 655 std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier( 656 table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found, 657 RDB_TTL_COL_QUALIFIER); 658 659 if (skip_checks) { 660 for (uint i = 0; i < table_arg->s->fields; i++) { 661 Field *const field = table_arg->field[i]; 662 if (field_check_field_name_match(field, ttl_col_str.c_str())) { 663 *ttl_column = ttl_col_str; 664 *ttl_field_index = i; 665 } 666 } 667 return HA_EXIT_SUCCESS; 668 } 669 670 /* Check if TTL column exists in table */ 671 if (!ttl_col_str.empty()) { 672 bool found = false; 673 for (uint i = 0; i < table_arg->s->fields; i++) { 674 Field *const field = table_arg->field[i]; 675 if (field_check_field_name_match(field, ttl_col_str.c_str()) && 676 field->real_type() == MYSQL_TYPE_LONGLONG && 677 field->key_type() == HA_KEYTYPE_ULONGLONG && 678 !field->real_maybe_null()) { 679 *ttl_column = ttl_col_str; 680 *ttl_field_index = i; 681 found = true; 682 break; 683 } 684 } 685 686 if (!found) { 687 my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str()); 688 return HA_EXIT_FAILURE; 689 } 690 } 691 692 return HA_EXIT_SUCCESS; 693 } 694 695 const std::string Rdb_key_def::gen_qualifier_for_table( 696 const char *const qualifier, const std::string &partition_name) { 697 bool has_partition = !partition_name.empty(); 698 std::string qualifier_str = ""; 699 700 if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) { 701 return has_partition ? gen_cf_name_qualifier_for_partition(partition_name) 702 : qualifier_str + RDB_CF_NAME_QUALIFIER + 703 RDB_QUALIFIER_VALUE_SEP; 704 } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) { 705 return has_partition 706 ? gen_ttl_duration_qualifier_for_partition(partition_name) 707 : qualifier_str + RDB_TTL_DURATION_QUALIFIER + 708 RDB_QUALIFIER_VALUE_SEP; 709 } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) { 710 return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name) 711 : qualifier_str + RDB_TTL_COL_QUALIFIER + 712 RDB_QUALIFIER_VALUE_SEP; 713 } else { 714 DBUG_ASSERT(0); 715 } 716 717 return qualifier_str; 718 } 719 720 /* 721 Formats the string and returns the column family name assignment part for a 722 specific partition. 723 */ 724 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition( 725 const std::string &prefix) { 726 DBUG_ASSERT(!prefix.empty()); 727 728 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER + 729 RDB_QUALIFIER_VALUE_SEP; 730 } 731 732 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition( 733 const std::string &prefix) { 734 DBUG_ASSERT(!prefix.empty()); 735 736 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + 737 RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP; 738 } 739 740 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition( 741 const std::string &prefix) { 742 DBUG_ASSERT(!prefix.empty()); 743 744 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER + 745 RDB_QUALIFIER_VALUE_SEP; 746 } 747 748 const std::string Rdb_key_def::parse_comment_for_qualifier( 749 const std::string &comment, const TABLE *const table_arg, 750 const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found, 751 const char *const qualifier) { 752 DBUG_ASSERT(table_arg != nullptr); 753 DBUG_ASSERT(tbl_def_arg != nullptr); 754 DBUG_ASSERT(per_part_match_found != nullptr); 755 DBUG_ASSERT(qualifier != nullptr); 756 757 std::string empty_result; 758 759 // Flag which marks if partition specific options were found. 760 *per_part_match_found = false; 761 762 if (comment.empty()) { 763 return empty_result; 764 } 765 766 // Let's fetch the comment for a index and check if there's a custom key 767 // name specified for a partition we are handling. 768 std::vector<std::string> v = 769 myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP); 770 771 std::string search_str = gen_qualifier_for_table(qualifier); 772 773 // If table has partitions then we need to check if user has requested 774 // qualifiers on a per partition basis. 775 // 776 // NOTE: this means if you specify a qualifier for a specific partition it 777 // will take precedence the 'table level' qualifier if one exists. 778 std::string search_str_part; 779 if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) { 780 std::string partition_name = tbl_def_arg->base_partition(); 781 DBUG_ASSERT(!partition_name.empty()); 782 search_str_part = gen_qualifier_for_table(qualifier, partition_name); 783 } 784 785 DBUG_ASSERT(!search_str.empty()); 786 787 // Basic O(N) search for a matching assignment. At most we expect maybe 788 // ten or so elements here. 789 if (!search_str_part.empty()) { 790 for (const auto &it : v) { 791 if (it.substr(0, search_str_part.length()) == search_str_part) { 792 // We found a prefix match. Try to parse it as an assignment. 793 std::vector<std::string> tokens = 794 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP); 795 796 // We found a custom qualifier, it was in the form we expected it to be. 797 // Return that instead of whatever we initially wanted to return. In 798 // a case below the `foo` part will be returned to the caller. 799 // 800 // p3_cfname=foo 801 // 802 // If no value was specified then we'll return an empty string which 803 // later gets translated into using a default CF. 804 if (tokens.size() == 2) { 805 *per_part_match_found = true; 806 return tokens[1]; 807 } else { 808 return empty_result; 809 } 810 } 811 } 812 } 813 814 // Do this loop again, this time searching for 'table level' qualifiers if we 815 // didn't find any partition level qualifiers above. 816 for (const auto &it : v) { 817 if (it.substr(0, search_str.length()) == search_str) { 818 std::vector<std::string> tokens = 819 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP); 820 if (tokens.size() == 2) { 821 return tokens[1]; 822 } else { 823 return empty_result; 824 } 825 } 826 } 827 828 // If we didn't find any partitioned/non-partitioned qualifiers, return an 829 // empty string. 830 return empty_result; 831 } 832 833 /** 834 Read a memcmp key part from a slice using the passed in reader. 835 836 Returns -1 if field was null, 1 if error, 0 otherwise. 837 */ 838 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg, 839 Rdb_string_reader *reader, 840 const uint part_num) const { 841 /* It is impossible to unpack the column. Skip it. */ 842 if (m_pack_info[part_num].m_maybe_null) { 843 const char *nullp; 844 if (!(nullp = reader->read(1))) return 1; 845 if (*nullp == 0) { 846 /* This is a NULL value */ 847 return -1; 848 } else { 849 /* If NULL marker is not '0', it can be only '1' */ 850 if (*nullp != 1) return 1; 851 } 852 } 853 854 Rdb_field_packing *fpi = &m_pack_info[part_num]; 855 DBUG_ASSERT(table_arg->s != nullptr); 856 857 bool is_hidden_pk_part = (part_num + 1 == m_key_parts) && 858 (table_arg->s->primary_key == MAX_INDEXES); 859 Field *field = nullptr; 860 if (!is_hidden_pk_part) { 861 field = fpi->get_field_in_table(table_arg); 862 } 863 if ((fpi->m_skip_func)(fpi, field, reader)) { 864 return 1; 865 } 866 return 0; 867 } 868 869 /** 870 Get a mem-comparable form of Primary Key from mem-comparable form of this key 871 872 @param 873 pk_descr Primary Key descriptor 874 key Index tuple from this key in mem-comparable form 875 pk_buffer OUT Put here mem-comparable form of the Primary Key. 876 877 @note 878 It may or may not be possible to restore primary key columns to their 879 mem-comparable form. To handle all cases, this function copies mem- 880 comparable forms directly. 881 882 RocksDB SE supports "Extended keys". This means that PK columns are present 883 at the end of every key. If the key already includes PK columns, then 884 these columns are not present at the end of the key. 885 886 Because of the above, we copy each primary key column. 887 888 @todo 889 If we checked crc32 checksums in this function, we would catch some CRC 890 violations that we currently don't. On the other hand, there is a broader 891 set of queries for which we would check the checksum twice. 892 */ 893 894 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table, 895 const Rdb_key_def &pk_descr, 896 const rocksdb::Slice *const key, 897 uchar *const pk_buffer) const { 898 DBUG_ASSERT(table != nullptr); 899 DBUG_ASSERT(key != nullptr); 900 DBUG_ASSERT(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY); 901 DBUG_ASSERT(pk_buffer); 902 903 uint size = 0; 904 uchar *buf = pk_buffer; 905 DBUG_ASSERT(m_pk_key_parts); 906 907 /* Put the PK number */ 908 rdb_netbuf_store_index(buf, pk_descr.m_index_number); 909 buf += INDEX_NUMBER_SIZE; 910 size += INDEX_NUMBER_SIZE; 911 912 const char *start_offs[MAX_REF_PARTS]; 913 const char *end_offs[MAX_REF_PARTS]; 914 int pk_key_part; 915 uint i; 916 Rdb_string_reader reader(key); 917 918 // Skip the index number 919 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN; 920 921 for (i = 0; i < m_key_parts; i++) { 922 if ((pk_key_part = m_pk_part_no[i]) != -1) { 923 start_offs[pk_key_part] = reader.get_current_ptr(); 924 } 925 926 if (read_memcmp_key_part(table, &reader, i) > 0) { 927 return RDB_INVALID_KEY_LEN; 928 } 929 930 if (pk_key_part != -1) { 931 end_offs[pk_key_part] = reader.get_current_ptr(); 932 } 933 } 934 935 for (i = 0; i < m_pk_key_parts; i++) { 936 const uint part_size = end_offs[i] - start_offs[i]; 937 memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]); 938 buf += part_size; 939 size += part_size; 940 } 941 942 return size; 943 } 944 945 /** 946 Get a mem-comparable form of Secondary Key from mem-comparable form of this 947 key, without the extended primary key tail. 948 949 @param 950 key Index tuple from this key in mem-comparable form 951 sk_buffer OUT Put here mem-comparable form of the Secondary Key. 952 n_null_fields OUT Put number of null fields contained within sk entry 953 */ 954 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table, 955 const rocksdb::Slice &key, 956 uchar *sk_buffer, 957 uint *n_null_fields) const { 958 DBUG_ASSERT(table != nullptr); 959 DBUG_ASSERT(sk_buffer != nullptr); 960 DBUG_ASSERT(n_null_fields != nullptr); 961 DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table)); 962 963 uchar *buf = sk_buffer; 964 965 int res; 966 Rdb_string_reader reader(&key); 967 const char *start = reader.get_current_ptr(); 968 969 // Skip the index number 970 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN; 971 972 for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) { 973 if ((res = read_memcmp_key_part(table, &reader, i)) > 0) { 974 return RDB_INVALID_KEY_LEN; 975 } else if (res == -1) { 976 (*n_null_fields)++; 977 } 978 } 979 980 uint sk_memcmp_len = reader.get_current_ptr() - start; 981 memcpy(buf, start, sk_memcmp_len); 982 return sk_memcmp_len; 983 } 984 985 /** 986 Convert index tuple into storage (i.e. mem-comparable) format 987 988 @detail 989 Currently this is done by unpacking into table->record[0] and then 990 packing index columns into storage format. 991 992 @param pack_buffer Temporary area for packing varchar columns. Its 993 size is at least max_storage_fmt_length() bytes. 994 */ 995 996 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer, 997 uchar *const packed_tuple, 998 const uchar *const key_tuple, 999 const key_part_map &keypart_map) const { 1000 DBUG_ASSERT(tbl != nullptr); 1001 DBUG_ASSERT(pack_buffer != nullptr); 1002 DBUG_ASSERT(packed_tuple != nullptr); 1003 DBUG_ASSERT(key_tuple != nullptr); 1004 1005 /* We were given a record in KeyTupleFormat. First, save it to record */ 1006 const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map); 1007 key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len); 1008 1009 uint n_used_parts = my_count_bits(keypart_map); 1010 if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0; // Full key is used 1011 1012 /* Then, convert the record into a mem-comparable form */ 1013 return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr, 1014 false, 0, n_used_parts); 1015 } 1016 1017 /** 1018 @brief 1019 Check if "unpack info" data includes checksum. 1020 1021 @detail 1022 This is used only by CHECK TABLE to count the number of rows that have 1023 checksums. 1024 */ 1025 1026 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) { 1027 size_t size = unpack_info.size(); 1028 if (size == 0) { 1029 return false; 1030 } 1031 const uchar *ptr = (const uchar *)unpack_info.data(); 1032 1033 // Skip unpack info if present. 1034 if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) { 1035 const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1); 1036 SHIP_ASSERT(size >= skip_len); 1037 1038 size -= skip_len; 1039 ptr += skip_len; 1040 } 1041 1042 return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG); 1043 } 1044 1045 /* 1046 @return Number of bytes that were changed 1047 */ 1048 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) { 1049 DBUG_ASSERT(packed_tuple != nullptr); 1050 1051 int changed = 0; 1052 uchar *p = packed_tuple + len - 1; 1053 for (; p > packed_tuple; p--) { 1054 changed++; 1055 if (*p != uchar(0xFF)) { 1056 *p = *p + 1; 1057 break; 1058 } 1059 *p = '\0'; 1060 } 1061 return changed; 1062 } 1063 1064 /* 1065 @return Number of bytes that were changed 1066 */ 1067 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) { 1068 DBUG_ASSERT(packed_tuple != nullptr); 1069 1070 int changed = 0; 1071 uchar *p = packed_tuple + len - 1; 1072 for (; p > packed_tuple; p--) { 1073 changed++; 1074 if (*p != uchar(0x00)) { 1075 *p = *p - 1; 1076 break; 1077 } 1078 *p = 0xFF; 1079 } 1080 return changed; 1081 } 1082 1083 static const std::map<char, size_t> UNPACK_HEADER_SIZES = { 1084 {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE}, 1085 {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}}; 1086 1087 /* 1088 @return The length in bytes of the header specified by the given tag 1089 */ 1090 size_t Rdb_key_def::get_unpack_header_size(char tag) { 1091 DBUG_ASSERT(is_unpack_data_tag(tag)); 1092 return UNPACK_HEADER_SIZES.at(tag); 1093 } 1094 1095 /* 1096 Get a bitmap indicating which varchar columns must be covered for this 1097 lookup to be covered. If the bitmap is a subset of the covered bitmap, then 1098 the lookup is covered. If it can already be determined that the lookup is 1099 not covered, map->bitmap will be set to null. 1100 */ 1101 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const { 1102 DBUG_ASSERT(map->bitmap == nullptr); 1103 bitmap_init(map, nullptr, MAX_REF_PARTS, false); 1104 uint curr_bitmap_pos = 0; 1105 1106 // Indicates which columns in the read set might be covered. 1107 MY_BITMAP maybe_covered_bitmap; 1108 bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false); 1109 1110 for (uint i = 0; i < m_key_parts; i++) { 1111 if (table_has_hidden_pk(table) && i + 1 == m_key_parts) { 1112 continue; 1113 } 1114 1115 Field *const field = m_pack_info[i].get_field_in_table(table); 1116 1117 // Columns which are always covered are not stored in the covered bitmap so 1118 // we can ignore them here too. 1119 if (m_pack_info[i].m_covered && 1120 bitmap_is_set(table->read_set, field->field_index)) { 1121 bitmap_set_bit(&maybe_covered_bitmap, field->field_index); 1122 continue; 1123 } 1124 1125 switch (field->real_type()) { 1126 // This type may be covered depending on the record. If it was requested, 1127 // we require the covered bitmap to have this bit set. 1128 case MYSQL_TYPE_VARCHAR: 1129 if (curr_bitmap_pos < MAX_REF_PARTS) { 1130 if (bitmap_is_set(table->read_set, field->field_index)) { 1131 bitmap_set_bit(map, curr_bitmap_pos); 1132 bitmap_set_bit(&maybe_covered_bitmap, field->field_index); 1133 } 1134 curr_bitmap_pos++; 1135 } else { 1136 bitmap_free(&maybe_covered_bitmap); 1137 bitmap_free(map); 1138 return; 1139 } 1140 break; 1141 // This column is a type which is never covered. If it was requested, we 1142 // know this lookup will never be covered. 1143 default: 1144 if (bitmap_is_set(table->read_set, field->field_index)) { 1145 bitmap_free(&maybe_covered_bitmap); 1146 bitmap_free(map); 1147 return; 1148 } 1149 break; 1150 } 1151 } 1152 1153 // If there are columns which are not covered in the read set, the lookup 1154 // can't be covered. 1155 if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) { 1156 bitmap_free(map); 1157 } 1158 bitmap_free(&maybe_covered_bitmap); 1159 } 1160 1161 /* 1162 Return true if for this secondary index 1163 - All of the requested columns are in the index 1164 - All values for columns that are prefix-only indexes are shorter or equal 1165 in length to the prefix 1166 */ 1167 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info, 1168 const MY_BITMAP *const lookup_bitmap) const { 1169 DBUG_ASSERT(lookup_bitmap != nullptr); 1170 if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) { 1171 return false; 1172 } 1173 1174 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info); 1175 1176 // Check if this unpack_info has a covered_bitmap 1177 const char *unpack_header = unp_reader.get_current_ptr(); 1178 const bool has_covered_unpack_info = 1179 unp_reader.remaining_bytes() && 1180 unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG; 1181 if (!has_covered_unpack_info || 1182 !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) { 1183 return false; 1184 } 1185 1186 MY_BITMAP covered_bitmap; 1187 my_bitmap_map covered_bits; 1188 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false); 1189 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header + 1190 sizeof(RDB_UNPACK_COVERED_DATA_TAG) + 1191 RDB_UNPACK_COVERED_DATA_LEN_SIZE); 1192 1193 return bitmap_is_subset(lookup_bitmap, &covered_bitmap); 1194 } 1195 1196 /* Indicates that all key parts can be unpacked to cover a secondary lookup */ 1197 bool Rdb_key_def::can_cover_lookup() const { 1198 for (uint i = 0; i < m_key_parts; i++) { 1199 if (!m_pack_info[i].m_covered) return false; 1200 } 1201 return true; 1202 } 1203 1204 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info, 1205 uchar *tuple, uchar *const packed_tuple, 1206 uchar *const pack_buffer, 1207 Rdb_string_writer *const unpack_info, 1208 uint *const n_null_fields) const { 1209 if (field->real_maybe_null()) { 1210 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1)); 1211 if (field->is_real_null()) { 1212 /* NULL value. store '\0' so that it sorts before non-NULL values */ 1213 *tuple++ = 0; 1214 /* That's it, don't store anything else */ 1215 if (n_null_fields) (*n_null_fields)++; 1216 return tuple; 1217 } else { 1218 /* Not a NULL value. Store '1' */ 1219 *tuple++ = 1; 1220 } 1221 } 1222 1223 const bool create_unpack_info = 1224 (unpack_info && // we were requested to generate unpack_info 1225 pack_info->uses_unpack_info()); // and this keypart uses it 1226 Rdb_pack_field_context pack_ctx(unpack_info); 1227 1228 // Set the offset for methods which do not take an offset as an argument 1229 DBUG_ASSERT( 1230 is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len)); 1231 1232 (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx); 1233 1234 /* Make "unpack info" to be stored in the value */ 1235 if (create_unpack_info) { 1236 (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field, 1237 &pack_ctx); 1238 } 1239 1240 return tuple; 1241 } 1242 1243 /** 1244 Get index columns from the record and pack them into mem-comparable form. 1245 1246 @param 1247 tbl Table we're working on 1248 record IN Record buffer with fields in table->record format 1249 pack_buffer IN Temporary area for packing varchars. The size is 1250 at least max_storage_fmt_length() bytes. 1251 packed_tuple OUT Key in the mem-comparable form 1252 unpack_info OUT Unpack data 1253 unpack_info_len OUT Unpack data length 1254 n_key_parts Number of keyparts to process. 0 means all of them. 1255 n_null_fields OUT Number of key fields with NULL value. 1256 ttl_bytes IN Previous ttl bytes from old record for update case or 1257 current ttl bytes from just packed primary key/value 1258 @detail 1259 Some callers do not need the unpack information, they can pass 1260 unpack_info=nullptr, unpack_info_len=nullptr. 1261 1262 @return 1263 Length of the packed tuple 1264 */ 1265 1266 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer, 1267 const uchar *const record, 1268 uchar *const packed_tuple, 1269 Rdb_string_writer *const unpack_info, 1270 const bool should_store_row_debug_checksums, 1271 const longlong hidden_pk_id, uint n_key_parts, 1272 uint *const n_null_fields, 1273 const char *const ttl_bytes) const { 1274 DBUG_ASSERT(tbl != nullptr); 1275 DBUG_ASSERT(pack_buffer != nullptr); 1276 DBUG_ASSERT(record != nullptr); 1277 DBUG_ASSERT(packed_tuple != nullptr); 1278 // Checksums for PKs are made when record is packed. 1279 // We should never attempt to make checksum just from PK values 1280 DBUG_ASSERT_IMP(should_store_row_debug_checksums, 1281 (m_index_type == INDEX_TYPE_SECONDARY)); 1282 1283 uchar *tuple = packed_tuple; 1284 size_t unpack_start_pos = size_t(-1); 1285 size_t unpack_len_pos = size_t(-1); 1286 size_t covered_bitmap_pos = size_t(-1); 1287 const bool hidden_pk_exists = table_has_hidden_pk(tbl); 1288 1289 rdb_netbuf_store_index(tuple, m_index_number); 1290 tuple += INDEX_NUMBER_SIZE; 1291 1292 // If n_key_parts is 0, it means all columns. 1293 // The following includes the 'extended key' tail. 1294 // The 'extended key' includes primary key. This is done to 'uniqify' 1295 // non-unique indexes 1296 const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS; 1297 1298 // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the 1299 // hidden key part. So we skip it (its always 1 part). 1300 if (hidden_pk_exists && !hidden_pk_id && use_all_columns) { 1301 n_key_parts = m_key_parts - 1; 1302 } else if (use_all_columns) { 1303 n_key_parts = m_key_parts; 1304 } 1305 1306 if (n_null_fields) *n_null_fields = 0; 1307 1308 // Check if we need a covered bitmap. If it is certain that all key parts are 1309 // covering, we don't need one. 1310 bool store_covered_bitmap = false; 1311 if (unpack_info && use_covered_bitmap_format()) { 1312 for (uint i = 0; i < n_key_parts; i++) { 1313 if (!m_pack_info[i].m_covered) { 1314 store_covered_bitmap = true; 1315 break; 1316 } 1317 } 1318 } 1319 1320 const char tag = 1321 store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG; 1322 1323 if (unpack_info) { 1324 unpack_info->clear(); 1325 1326 if (m_index_type == INDEX_TYPE_SECONDARY && 1327 m_total_index_flags_length > 0) { 1328 // Reserve space for index flag fields 1329 unpack_info->allocate(m_total_index_flags_length); 1330 1331 // Insert TTL timestamp 1332 if (has_ttl() && ttl_bytes) { 1333 write_index_flag_field(unpack_info, 1334 reinterpret_cast<const uchar *>(ttl_bytes), 1335 Rdb_key_def::TTL_FLAG); 1336 } 1337 } 1338 1339 unpack_start_pos = unpack_info->get_current_pos(); 1340 unpack_info->write_uint8(tag); 1341 unpack_len_pos = unpack_info->get_current_pos(); 1342 // we don't know the total length yet, so write a zero 1343 unpack_info->write_uint16(0); 1344 1345 if (store_covered_bitmap) { 1346 // Reserve two bytes for the covered bitmap. This will store, for key 1347 // parts which are not always covering, whether or not it is covering 1348 // for this record. 1349 covered_bitmap_pos = unpack_info->get_current_pos(); 1350 unpack_info->write_uint16(0); 1351 } 1352 } 1353 1354 MY_BITMAP covered_bitmap; 1355 my_bitmap_map covered_bits; 1356 uint curr_bitmap_pos = 0; 1357 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false); 1358 1359 for (uint i = 0; i < n_key_parts; i++) { 1360 // Fill hidden pk id into the last key part for secondary keys for tables 1361 // with no pk 1362 if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) { 1363 m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id); 1364 break; 1365 } 1366 1367 Field *const field = m_pack_info[i].get_field_in_table(tbl); 1368 DBUG_ASSERT(field != nullptr); 1369 1370 uint field_offset = field->ptr - tbl->record[0]; 1371 uint null_offset = field->null_offset(tbl->record[0]); 1372 bool maybe_null = field->real_maybe_null(); 1373 1374 field->move_field( 1375 const_cast<uchar *>(record) + field_offset, 1376 maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr, 1377 field->null_bit); 1378 // WARNING! Don't return without restoring field->ptr and field->null_ptr 1379 1380 tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer, 1381 unpack_info, n_null_fields); 1382 1383 // If this key part is a prefix of a VARCHAR field, check if it's covered. 1384 if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR && 1385 !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) { 1386 size_t data_length = field->data_length(); 1387 uint16 key_length; 1388 if (m_pk_part_no[i] == (uint)-1) { 1389 key_length = tbl->key_info[get_keyno()].key_part[i].length; 1390 } else { 1391 key_length = 1392 tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length; 1393 } 1394 1395 if (m_pack_info[i].m_unpack_func != nullptr && 1396 data_length <= key_length) { 1397 bitmap_set_bit(&covered_bitmap, curr_bitmap_pos); 1398 } 1399 curr_bitmap_pos++; 1400 } 1401 1402 // Restore field->ptr and field->null_ptr 1403 field->move_field(tbl->record[0] + field_offset, 1404 maybe_null ? tbl->record[0] + null_offset : nullptr, 1405 field->null_bit); 1406 } 1407 1408 if (unpack_info) { 1409 const size_t len = unpack_info->get_current_pos() - unpack_start_pos; 1410 DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max()); 1411 1412 // Don't store the unpack_info if it has only the header (that is, there's 1413 // no meaningful content). 1414 // Primary Keys are special: for them, store the unpack_info even if it's 1415 // empty (provided m_maybe_unpack_info==true, see 1416 // ha_rocksdb::convert_record_to_storage_format) 1417 if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) { 1418 if (len == get_unpack_header_size(tag) && !covered_bits) { 1419 unpack_info->truncate(unpack_start_pos); 1420 } else if (store_covered_bitmap) { 1421 unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits); 1422 } 1423 } else { 1424 unpack_info->write_uint16_at(unpack_len_pos, len); 1425 } 1426 1427 // 1428 // Secondary keys have key and value checksums in the value part 1429 // Primary key is a special case (the value part has non-indexed columns), 1430 // so the checksums are computed and stored by 1431 // ha_rocksdb::convert_record_to_storage_format 1432 // 1433 if (should_store_row_debug_checksums) { 1434 const uint32_t key_crc32 = crc32(0, packed_tuple, tuple - packed_tuple); 1435 const uint32_t val_crc32 = 1436 crc32(0, unpack_info->ptr(), unpack_info->get_current_pos()); 1437 1438 unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG); 1439 unpack_info->write_uint32(key_crc32); 1440 unpack_info->write_uint32(val_crc32); 1441 } 1442 } 1443 1444 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0)); 1445 1446 return tuple - packed_tuple; 1447 } 1448 1449 /** 1450 Pack the hidden primary key into mem-comparable form. 1451 1452 @param 1453 tbl Table we're working on 1454 hidden_pk_id IN New value to be packed into key 1455 packed_tuple OUT Key in the mem-comparable form 1456 1457 @return 1458 Length of the packed tuple 1459 */ 1460 1461 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id, 1462 uchar *const packed_tuple) const { 1463 DBUG_ASSERT(packed_tuple != nullptr); 1464 1465 uchar *tuple = packed_tuple; 1466 rdb_netbuf_store_index(tuple, m_index_number); 1467 tuple += INDEX_NUMBER_SIZE; 1468 DBUG_ASSERT(m_key_parts == 1); 1469 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1470 m_pack_info[0].m_max_image_len)); 1471 1472 m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id); 1473 1474 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0)); 1475 return tuple - packed_tuple; 1476 } 1477 1478 /* 1479 Function of type rdb_index_field_pack_t 1480 */ 1481 1482 void Rdb_key_def::pack_with_make_sort_key( 1483 Rdb_field_packing *const fpi, Field *const field, 1484 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst, 1485 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) { 1486 DBUG_ASSERT(fpi != nullptr); 1487 DBUG_ASSERT(field != nullptr); 1488 DBUG_ASSERT(dst != nullptr); 1489 DBUG_ASSERT(*dst != nullptr); 1490 1491 const int max_len = fpi->m_max_image_len; 1492 MY_BITMAP*old_map; 1493 1494 old_map= dbug_tmp_use_all_columns(field->table, 1495 &field->table->read_set); 1496 field->sort_string(*dst, max_len); 1497 dbug_tmp_restore_column_map(&field->table->read_set, old_map); 1498 *dst += max_len; 1499 } 1500 1501 /* 1502 Compares two keys without unpacking 1503 1504 @detail 1505 @return 1506 0 - Ok. column_index is the index of the first column which is different. 1507 -1 if two kes are equal 1508 1 - Data format error. 1509 */ 1510 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1, 1511 const rocksdb::Slice *key2, 1512 std::size_t *const column_index) const { 1513 DBUG_ASSERT(key1 != nullptr); 1514 DBUG_ASSERT(key2 != nullptr); 1515 DBUG_ASSERT(column_index != nullptr); 1516 1517 // the caller should check the return value and 1518 // not rely on column_index being valid 1519 *column_index = 0xbadf00d; 1520 1521 Rdb_string_reader reader1(key1); 1522 Rdb_string_reader reader2(key2); 1523 1524 // Skip the index number 1525 if ((!reader1.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE; 1526 1527 if ((!reader2.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE; 1528 1529 for (uint i = 0; i < m_key_parts; i++) { 1530 const Rdb_field_packing *const fpi = &m_pack_info[i]; 1531 if (fpi->m_maybe_null) { 1532 const auto nullp1 = reader1.read(1); 1533 const auto nullp2 = reader2.read(1); 1534 1535 if (nullp1 == nullptr || nullp2 == nullptr) { 1536 return HA_EXIT_FAILURE; 1537 } 1538 1539 if (*nullp1 != *nullp2) { 1540 *column_index = i; 1541 return HA_EXIT_SUCCESS; 1542 } 1543 1544 if (*nullp1 == 0) { 1545 /* This is a NULL value */ 1546 continue; 1547 } 1548 } 1549 1550 const auto before_skip1 = reader1.get_current_ptr(); 1551 const auto before_skip2 = reader2.get_current_ptr(); 1552 DBUG_ASSERT(fpi->m_skip_func); 1553 if ((fpi->m_skip_func)(fpi, nullptr, &reader1)) { 1554 return HA_EXIT_FAILURE; 1555 } 1556 if ((fpi->m_skip_func)(fpi, nullptr, &reader2)) { 1557 return HA_EXIT_FAILURE; 1558 } 1559 const auto size1 = reader1.get_current_ptr() - before_skip1; 1560 const auto size2 = reader2.get_current_ptr() - before_skip2; 1561 if (size1 != size2) { 1562 *column_index = i; 1563 return HA_EXIT_SUCCESS; 1564 } 1565 1566 if (memcmp(before_skip1, before_skip2, size1) != 0) { 1567 *column_index = i; 1568 return HA_EXIT_SUCCESS; 1569 } 1570 } 1571 1572 *column_index = m_key_parts; 1573 return HA_EXIT_SUCCESS; 1574 } 1575 1576 /* 1577 @brief 1578 Given a zero-padded key, determine its real key length 1579 1580 @detail 1581 Fixed-size skip functions just read. 1582 */ 1583 1584 size_t Rdb_key_def::key_length(const TABLE *const table, 1585 const rocksdb::Slice &key) const { 1586 DBUG_ASSERT(table != nullptr); 1587 1588 Rdb_string_reader reader(&key); 1589 1590 if ((!reader.read(INDEX_NUMBER_SIZE))) { 1591 return size_t(-1); 1592 } 1593 for (uint i = 0; i < m_key_parts; i++) { 1594 const Rdb_field_packing *fpi = &m_pack_info[i]; 1595 const Field *field = nullptr; 1596 if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY) { 1597 field = fpi->get_field_in_table(table); 1598 } 1599 if ((fpi->m_skip_func)(fpi, field, &reader)) { 1600 return size_t(-1); 1601 } 1602 } 1603 return key.size() - reader.remaining_bytes(); 1604 } 1605 1606 /* 1607 Take mem-comparable form and unpack_info and unpack it to Table->record 1608 1609 @detail 1610 not all indexes support this 1611 1612 @return 1613 HA_EXIT_SUCCESS OK 1614 other HA_ERR error code 1615 */ 1616 1617 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf, 1618 const rocksdb::Slice *const packed_key, 1619 const rocksdb::Slice *const unpack_info, 1620 const bool verify_row_debug_checksums) const { 1621 Rdb_string_reader reader(packed_key); 1622 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info); 1623 1624 // There is no checksuming data after unpack_info for primary keys, because 1625 // the layout there is different. The checksum is verified in 1626 // ha_rocksdb::convert_record_from_storage_format instead. 1627 DBUG_ASSERT_IMP(!(m_index_type == INDEX_TYPE_SECONDARY), 1628 !verify_row_debug_checksums); 1629 1630 // Skip the index number 1631 if ((!reader.read(INDEX_NUMBER_SIZE))) { 1632 return HA_ERR_ROCKSDB_CORRUPT_DATA; 1633 } 1634 1635 // For secondary keys, we expect the value field to contain index flags, 1636 // unpack data, and checksum data in that order. One or all can be missing, 1637 // but they cannot be reordered. 1638 if (unp_reader.remaining_bytes()) { 1639 if (m_index_type == INDEX_TYPE_SECONDARY && 1640 m_total_index_flags_length > 0 && 1641 !unp_reader.read(m_total_index_flags_length)) { 1642 return HA_ERR_ROCKSDB_CORRUPT_DATA; 1643 } 1644 } 1645 1646 const char *unpack_header = unp_reader.get_current_ptr(); 1647 bool has_unpack_info = 1648 unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]); 1649 if (has_unpack_info) { 1650 if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) { 1651 return HA_ERR_ROCKSDB_CORRUPT_DATA; 1652 } 1653 } 1654 1655 // Read the covered bitmap 1656 MY_BITMAP covered_bitmap; 1657 my_bitmap_map covered_bits; 1658 bool has_covered_bitmap = 1659 has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG); 1660 if (has_covered_bitmap) { 1661 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false); 1662 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header + 1663 sizeof(RDB_UNPACK_COVERED_DATA_TAG) + 1664 RDB_UNPACK_COVERED_DATA_LEN_SIZE); 1665 } 1666 1667 int err = HA_EXIT_SUCCESS; 1668 1669 1670 Rdb_key_field_iterator iter( 1671 this, m_pack_info, &reader, &unp_reader, table, has_unpack_info, 1672 has_covered_bitmap ? &covered_bitmap : nullptr, buf); 1673 while (iter.has_next()) { 1674 err = iter.next(); 1675 if (err) { 1676 return err; 1677 } 1678 } 1679 1680 /* 1681 Check checksum values if present 1682 */ 1683 const char *ptr; 1684 if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) { 1685 if (verify_row_debug_checksums) { 1686 uint32_t stored_key_chksum = rdb_netbuf_to_uint32( 1687 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE)); 1688 const uint32_t stored_val_chksum = rdb_netbuf_to_uint32( 1689 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE)); 1690 1691 const uint32_t computed_key_chksum = 1692 crc32(0, (const uchar *)packed_key->data(), packed_key->size()); 1693 const uint32_t computed_val_chksum = 1694 crc32(0, (const uchar *)unpack_info->data(), 1695 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE); 1696 1697 DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1", 1698 stored_key_chksum++;); 1699 1700 if (stored_key_chksum != computed_key_chksum) { 1701 report_checksum_mismatch(true, packed_key->data(), packed_key->size()); 1702 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH; 1703 } 1704 1705 if (stored_val_chksum != computed_val_chksum) { 1706 report_checksum_mismatch(false, unpack_info->data(), 1707 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE); 1708 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH; 1709 } 1710 } else { 1711 /* The checksums are present but we are not checking checksums */ 1712 } 1713 } 1714 1715 if (reader.remaining_bytes()) return HA_ERR_ROCKSDB_CORRUPT_DATA; 1716 1717 return HA_EXIT_SUCCESS; 1718 } 1719 1720 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) { 1721 return table->s->primary_key == MAX_INDEXES; 1722 } 1723 1724 void Rdb_key_def::report_checksum_mismatch(const bool is_key, 1725 const char *const data, 1726 const size_t data_size) const { 1727 // NO_LINT_DEBUG 1728 sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x", 1729 is_key ? "key" : "value", get_index_number()); 1730 1731 const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN); 1732 // NO_LINT_DEBUG 1733 sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s", 1734 (uint64_t)data_size, buf.c_str()); 1735 1736 my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch"); 1737 } 1738 1739 bool Rdb_key_def::index_format_min_check(const int pk_min, 1740 const int sk_min) const { 1741 switch (m_index_type) { 1742 case INDEX_TYPE_PRIMARY: 1743 case INDEX_TYPE_HIDDEN_PRIMARY: 1744 return (m_kv_format_version >= pk_min); 1745 case INDEX_TYPE_SECONDARY: 1746 return (m_kv_format_version >= sk_min); 1747 default: 1748 DBUG_ASSERT(0); 1749 return false; 1750 } 1751 } 1752 1753 /////////////////////////////////////////////////////////////////////////////////////////// 1754 // Rdb_field_packing 1755 /////////////////////////////////////////////////////////////////////////////////////////// 1756 1757 /* 1758 Function of type rdb_index_field_skip_t 1759 */ 1760 1761 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi, 1762 const Field *const field 1763 MY_ATTRIBUTE((__unused__)), 1764 Rdb_string_reader *const reader) { 1765 if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE; 1766 return HA_EXIT_SUCCESS; 1767 } 1768 1769 /* 1770 (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not 1771 split in the middle of an UTF-8 character. See the implementation of 1772 unpack_binary_or_utf8_varchar. 1773 */ 1774 #define RDB_ESCAPE_LENGTH 9 1775 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH 1776 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0, 1777 "RDB_ESCAPE_LENGTH-1 must be even."); 1778 1779 #define RDB_ENCODED_SIZE(len) \ 1780 ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \ 1781 RDB_ESCAPE_LENGTH 1782 1783 #define RDB_LEGACY_ENCODED_SIZE(len) \ 1784 ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \ 1785 RDB_LEGACY_ESCAPE_LENGTH 1786 1787 /* 1788 Function of type rdb_index_field_skip_t 1789 */ 1790 1791 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi, 1792 const Field *const field, 1793 Rdb_string_reader *const reader) { 1794 const uchar *ptr; 1795 bool finished = false; 1796 1797 size_t dst_len; /* How much data can be there */ 1798 if (field) { 1799 const Field_varstring *const field_var = 1800 static_cast<const Field_varstring *>(field); 1801 dst_len = field_var->pack_length() - field_var->length_bytes; 1802 } else { 1803 dst_len = UINT_MAX; 1804 } 1805 1806 bool use_legacy_format = fpi->m_use_legacy_varbinary_format; 1807 1808 /* Decode the length-emitted encoding here */ 1809 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) { 1810 uint used_bytes; 1811 1812 /* See pack_with_varchar_encoding. */ 1813 if (use_legacy_format) { 1814 used_bytes = calc_unpack_legacy_variable_format( 1815 ptr[RDB_ESCAPE_LENGTH - 1], &finished); 1816 } else { 1817 used_bytes = 1818 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished); 1819 } 1820 1821 if (used_bytes == (uint)-1 || dst_len < used_bytes) { 1822 return HA_EXIT_FAILURE; // Corruption in the data 1823 } 1824 1825 if (finished) { 1826 break; 1827 } 1828 1829 dst_len -= used_bytes; 1830 } 1831 1832 if (!finished) { 1833 return HA_EXIT_FAILURE; 1834 } 1835 1836 return HA_EXIT_SUCCESS; 1837 } 1838 1839 const int VARCHAR_CMP_LESS_THAN_SPACES = 1; 1840 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2; 1841 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3; 1842 1843 /* 1844 Skip a keypart that uses Variable-Length Space-Padded encoding 1845 */ 1846 1847 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi, 1848 const Field *const field, 1849 Rdb_string_reader *const reader) { 1850 const uchar *ptr; 1851 bool finished = false; 1852 1853 size_t dst_len = UINT_MAX; /* How much data can be there */ 1854 1855 if (field) { 1856 const Field_varstring *const field_var = 1857 static_cast<const Field_varstring *>(field); 1858 dst_len = field_var->pack_length() - field_var->length_bytes; 1859 } 1860 1861 /* Decode the length-emitted encoding here */ 1862 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) { 1863 // See pack_with_varchar_space_pad 1864 const uchar c = ptr[fpi->m_segment_size - 1]; 1865 if (c == VARCHAR_CMP_EQUAL_TO_SPACES) { 1866 // This is the last segment 1867 finished = true; 1868 break; 1869 } else if (c == VARCHAR_CMP_LESS_THAN_SPACES || 1870 c == VARCHAR_CMP_GREATER_THAN_SPACES) { 1871 // This is not the last segment 1872 if ((fpi->m_segment_size - 1) > dst_len) { 1873 // The segment is full of data but the table field can't hold that 1874 // much! This must be data corruption. 1875 return HA_EXIT_FAILURE; 1876 } 1877 dst_len -= (fpi->m_segment_size - 1); 1878 } else { 1879 // Encountered a value that's none of the VARCHAR_CMP* constants 1880 // It's data corruption. 1881 return HA_EXIT_FAILURE; 1882 } 1883 } 1884 return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE; 1885 } 1886 1887 /* 1888 Function of type rdb_index_field_unpack_t 1889 */ 1890 1891 int Rdb_key_def::unpack_integer( 1892 Rdb_field_packing *const fpi, Field *const field, uchar *const to, 1893 Rdb_string_reader *const reader, 1894 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) { 1895 const int length = fpi->m_max_image_len; 1896 1897 const uchar *from; 1898 if (!(from = (const uchar *)reader->read(length))) { 1899 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */ 1900 } 1901 1902 #ifdef WORDS_BIGENDIAN 1903 { 1904 if (static_cast<Field_num *>(field)->unsigned_flag) { 1905 to[0] = from[0]; 1906 } else { 1907 to[0] = static_cast<char>(from[0] ^ 128); // Reverse the sign bit. 1908 } 1909 memcpy(to + 1, from + 1, length - 1); 1910 } 1911 #else 1912 { 1913 const int sign_byte = from[0]; 1914 if (static_cast<Field_num *>(field)->unsigned_flag) { 1915 to[length - 1] = sign_byte; 1916 } else { 1917 to[length - 1] = 1918 static_cast<char>(sign_byte ^ 128); // Reverse the sign bit. 1919 } 1920 for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j]; 1921 } 1922 #endif 1923 return UNPACK_SUCCESS; 1924 } 1925 1926 #if !defined(WORDS_BIGENDIAN) 1927 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) { 1928 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN) 1929 // A few systems store the most-significant _word_ first on little-endian 1930 dst[0] = src[3]; 1931 dst[1] = src[2]; 1932 dst[2] = src[1]; 1933 dst[3] = src[0]; 1934 dst[4] = src[7]; 1935 dst[5] = src[6]; 1936 dst[6] = src[5]; 1937 dst[7] = src[4]; 1938 #else 1939 dst[0] = src[7]; 1940 dst[1] = src[6]; 1941 dst[2] = src[5]; 1942 dst[3] = src[4]; 1943 dst[4] = src[3]; 1944 dst[5] = src[2]; 1945 dst[6] = src[1]; 1946 dst[7] = src[0]; 1947 #endif 1948 } 1949 1950 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) { 1951 dst[0] = src[3]; 1952 dst[1] = src[2]; 1953 dst[2] = src[1]; 1954 dst[3] = src[0]; 1955 } 1956 #else 1957 #define rdb_swap_double_bytes nullptr 1958 #define rdb_swap_float_bytes nullptr 1959 #endif 1960 1961 int Rdb_key_def::unpack_floating_point( 1962 uchar *const dst, Rdb_string_reader *const reader, const size_t size, 1963 const int exp_digit, const uchar *const zero_pattern, 1964 const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) { 1965 const uchar *const from = (const uchar *)reader->read(size); 1966 if (from == nullptr) { 1967 /* Mem-comparable image doesn't have enough bytes */ 1968 return UNPACK_FAILURE; 1969 } 1970 1971 /* Check to see if the value is zero */ 1972 if (memcmp(from, zero_pattern, size) == 0) { 1973 memcpy(dst, zero_val, size); 1974 return UNPACK_SUCCESS; 1975 } 1976 1977 #if defined(WORDS_BIGENDIAN) 1978 // On big-endian, output can go directly into result 1979 uchar *const tmp = dst; 1980 #else 1981 // Otherwise use a temporary buffer to make byte-swapping easier later 1982 uchar tmp[8]; 1983 #endif 1984 1985 memcpy(tmp, from, size); 1986 1987 if (tmp[0] & 0x80) { 1988 // If the high bit is set the original value was positive so 1989 // remove the high bit and subtract one from the exponent. 1990 ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1]; 1991 exp_part &= 0x7FFF; // clear high bit; 1992 exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent 1993 tmp[0] = (uchar)(exp_part >> 8); 1994 tmp[1] = (uchar)exp_part; 1995 } else { 1996 // Otherwise the original value was negative and all bytes have been 1997 // negated. 1998 for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF; 1999 } 2000 2001 #if !defined(WORDS_BIGENDIAN) 2002 // On little-endian, swap the bytes around 2003 swap_func(dst, tmp); 2004 #else 2005 DBUG_ASSERT(swap_func == nullptr); 2006 #endif 2007 2008 return UNPACK_SUCCESS; 2009 } 2010 2011 #if !defined(DBL_EXP_DIG) 2012 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG) 2013 #endif 2014 2015 /* 2016 Function of type rdb_index_field_unpack_t 2017 2018 Unpack a double by doing the reverse action of change_double_for_sort 2019 (sql/filesort.cc). Note that this only works on IEEE values. 2020 Note also that this code assumes that NaN and +/-Infinity are never 2021 allowed in the database. 2022 */ 2023 int Rdb_key_def::unpack_double( 2024 Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)), 2025 Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr, 2026 Rdb_string_reader *const reader, 2027 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) { 2028 static double zero_val = 0.0; 2029 static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0}; 2030 2031 return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG, 2032 zero_pattern, (const uchar *)&zero_val, 2033 rdb_swap_double_bytes); 2034 } 2035 2036 #if !defined(FLT_EXP_DIG) 2037 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG) 2038 #endif 2039 2040 /* 2041 Function of type rdb_index_field_unpack_t 2042 2043 Unpack a float by doing the reverse action of Field_float::make_sort_key 2044 (sql/field.cc). Note that this only works on IEEE values. 2045 Note also that this code assumes that NaN and +/-Infinity are never 2046 allowed in the database. 2047 */ 2048 int Rdb_key_def::unpack_float( 2049 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)), 2050 uchar *const field_ptr, Rdb_string_reader *const reader, 2051 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) { 2052 static float zero_val = 0.0; 2053 static const uchar zero_pattern[4] = {128, 0, 0, 0}; 2054 2055 return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG, 2056 zero_pattern, (const uchar *)&zero_val, 2057 rdb_swap_float_bytes); 2058 } 2059 2060 /* 2061 Function of type rdb_index_field_unpack_t used to 2062 Unpack by doing the reverse action to Field_newdate::make_sort_key. 2063 */ 2064 2065 int Rdb_key_def::unpack_newdate( 2066 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)), 2067 uchar *const field_ptr, Rdb_string_reader *const reader, 2068 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) { 2069 const char *from; 2070 DBUG_ASSERT(fpi->m_max_image_len == 3); 2071 2072 if (!(from = reader->read(3))) { 2073 /* Mem-comparable image doesn't have enough bytes */ 2074 return UNPACK_FAILURE; 2075 } 2076 2077 field_ptr[0] = from[2]; 2078 field_ptr[1] = from[1]; 2079 field_ptr[2] = from[0]; 2080 return UNPACK_SUCCESS; 2081 } 2082 2083 /* 2084 Function of type rdb_index_field_unpack_t, used to 2085 Unpack the string by copying it over. 2086 This is for BINARY(n) where the value occupies the whole length. 2087 */ 2088 2089 int Rdb_key_def::unpack_binary_str( 2090 Rdb_field_packing *const fpi, Field *const field, uchar *const to, 2091 Rdb_string_reader *const reader, 2092 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) { 2093 const char *from; 2094 if (!(from = reader->read(fpi->m_max_image_len))) { 2095 /* Mem-comparable image doesn't have enough bytes */ 2096 return UNPACK_FAILURE; 2097 } 2098 2099 memcpy(to, from, fpi->m_max_image_len); 2100 return UNPACK_SUCCESS; 2101 } 2102 2103 /* 2104 Function of type rdb_index_field_unpack_t. 2105 For UTF-8, we need to convert 2-byte wide-character entities back into 2106 UTF8 sequences. 2107 */ 2108 2109 int Rdb_key_def::unpack_utf8_str( 2110 Rdb_field_packing *const fpi, Field *const field, uchar *dst, 2111 Rdb_string_reader *const reader, 2112 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) { 2113 my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset(); 2114 const uchar *src; 2115 if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) { 2116 /* Mem-comparable image doesn't have enough bytes */ 2117 return UNPACK_FAILURE; 2118 } 2119 2120 const uchar *const src_end = src + fpi->m_max_image_len; 2121 uchar *const dst_end = dst + field->pack_length(); 2122 2123 while (src < src_end) { 2124 my_wc_t wc = (src[0] << 8) | src[1]; 2125 src += 2; 2126 int res = cset->cset->wc_mb(cset, wc, dst, dst_end); 2127 DBUG_ASSERT(res > 0 && res <= 3); 2128 if (res < 0) return UNPACK_FAILURE; 2129 dst += res; 2130 } 2131 2132 cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst, 2133 cset->pad_char); 2134 return UNPACK_SUCCESS; 2135 } 2136 2137 /* 2138 This is the original algorithm to encode a variable binary field. It 2139 sets a flag byte every Nth byte. The flag value is (255 - #pad) where 2140 #pad is the number of padding bytes that were needed (0 if all N-1 2141 bytes were used). 2142 2143 If N=8 and the field is: 2144 * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251 2145 * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252 2146 And the 4 byte string compares as greater than the 3 byte string 2147 2148 Unfortunately the algorithm has a flaw. If the input is exactly a 2149 multiple of N-1, an extra N bytes are written. Since we usually use 2150 N=9, an 8 byte input will generate 18 bytes of output instead of the 2151 9 bytes of output that is optimal. 2152 2153 See pack_variable_format for the newer algorithm. 2154 */ 2155 void Rdb_key_def::pack_legacy_variable_format( 2156 const uchar *src, // The data to encode 2157 size_t src_len, // The length of the data to encode 2158 uchar **dst) // The location to encode the data 2159 { 2160 size_t copy_len; 2161 size_t padding_bytes; 2162 uchar *ptr = *dst; 2163 2164 do { 2165 copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len); 2166 padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len; 2167 memcpy(ptr, src, copy_len); 2168 ptr += copy_len; 2169 src += copy_len; 2170 // pad with zeros if necessary 2171 if (padding_bytes > 0) { 2172 memset(ptr, 0, padding_bytes); 2173 ptr += padding_bytes; 2174 } 2175 2176 *(ptr++) = 255 - padding_bytes; 2177 2178 src_len -= copy_len; 2179 } while (padding_bytes == 0); 2180 2181 *dst = ptr; 2182 } 2183 2184 /* 2185 This is the new algorithm. Similarly to the legacy format the input 2186 is split up into N-1 bytes and a flag byte is used as the Nth byte 2187 in the output. 2188 2189 - If the previous segment needed any padding the flag is set to the 2190 number of bytes used (0..N-2). 0 is possible in the first segment 2191 if the input is 0 bytes long. 2192 - If no padding was used and there is no more data left in the input 2193 the flag is set to N-1 2194 - If no padding was used and there is still data left in the input the 2195 flag is set to N. 2196 2197 For N=9, the following input values encode to the specified 2198 outout (where 'X' indicates a byte of the original input): 2199 - 0 bytes is encoded as 0 0 0 0 0 0 0 0 0 2200 - 1 byte is encoded as X 0 0 0 0 0 0 0 1 2201 - 2 bytes is encoded as X X 0 0 0 0 0 0 2 2202 - 7 bytes is encoded as X X X X X X X 0 7 2203 - 8 bytes is encoded as X X X X X X X X 8 2204 - 9 bytes is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1 2205 - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2 2206 */ 2207 void Rdb_key_def::pack_variable_format( 2208 const uchar *src, // The data to encode 2209 size_t src_len, // The length of the data to encode 2210 uchar **dst) // The location to encode the data 2211 { 2212 uchar *ptr = *dst; 2213 2214 for (;;) { 2215 // Figure out how many bytes to copy, copy them and adjust pointers 2216 const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len); 2217 memcpy(ptr, src, copy_len); 2218 ptr += copy_len; 2219 src += copy_len; 2220 src_len -= copy_len; 2221 2222 // Are we at the end of the input? 2223 if (src_len == 0) { 2224 // pad with zeros if necessary; 2225 const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len; 2226 if (padding_bytes > 0) { 2227 memset(ptr, 0, padding_bytes); 2228 ptr += padding_bytes; 2229 } 2230 2231 // Put the flag byte (0 - N-1) in the output 2232 *(ptr++) = (uchar)copy_len; 2233 break; 2234 } 2235 2236 // We have more data - put the flag byte (N) in and continue 2237 *(ptr++) = RDB_ESCAPE_LENGTH; 2238 } 2239 2240 *dst = ptr; 2241 } 2242 2243 /* 2244 Function of type rdb_index_field_pack_t 2245 */ 2246 2247 void Rdb_key_def::pack_with_varchar_encoding( 2248 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst, 2249 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) { 2250 const CHARSET_INFO *const charset = field->charset(); 2251 Field_varstring *const field_var = (Field_varstring *)field; 2252 2253 const size_t value_length = (field_var->length_bytes == 1) 2254 ? (uint)*field->ptr 2255 : uint2korr(field->ptr); 2256 size_t xfrm_len = charset->coll->strnxfrm( 2257 charset, buf, fpi->m_max_image_len, field_var->char_length(), 2258 field_var->ptr + field_var->length_bytes, value_length, 0); 2259 2260 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */ 2261 if (fpi->m_use_legacy_varbinary_format) { 2262 pack_legacy_variable_format(buf, xfrm_len, dst); 2263 } else { 2264 pack_variable_format(buf, xfrm_len, dst); 2265 } 2266 } 2267 2268 /* 2269 Compare the string in [buf..buf_end) with a string that is an infinite 2270 sequence of strings in space_xfrm 2271 */ 2272 2273 static int rdb_compare_string_with_spaces( 2274 const uchar *buf, const uchar *const buf_end, 2275 const std::vector<uchar> *const space_xfrm) { 2276 int cmp = 0; 2277 while (buf < buf_end) { 2278 size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size()); 2279 if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break; 2280 buf += bytes; 2281 } 2282 return cmp; 2283 } 2284 2285 static const int RDB_TRIMMED_CHARS_OFFSET = 8; 2286 /* 2287 Pack the data with Variable-Length Space-Padded Encoding. 2288 2289 The encoding is there to meet two goals: 2290 2291 Goal#1. Comparison. The SQL standard says 2292 2293 " If the collation for the comparison has the PAD SPACE characteristic, 2294 for the purposes of the comparison, the shorter value is effectively 2295 extended to the length of the longer by concatenation of <space>s on the 2296 right. 2297 2298 At the moment, all MySQL collations except one have the PAD SPACE 2299 characteristic. The exception is the "binary" collation that is used by 2300 [VAR]BINARY columns. (Note that binary collations for specific charsets, 2301 like utf8_bin or latin1_bin are not the same as "binary" collation, they have 2302 the PAD SPACE characteristic). 2303 2304 Goal#2 is to preserve the number of trailing spaces in the original value. 2305 2306 This is achieved by using the following encoding: 2307 The key part: 2308 - Stores mem-comparable image of the column 2309 - It is stored in chunks of fpi->m_segment_size bytes (*) 2310 = If the remainder of the chunk is not occupied, it is padded with mem- 2311 comparable image of the space character (cs->pad_char to be precise). 2312 - The last byte of the chunk shows how the rest of column's mem-comparable 2313 image would compare to mem-comparable image of the column extended with 2314 spaces. There are three possible values. 2315 - VARCHAR_CMP_LESS_THAN_SPACES, 2316 - VARCHAR_CMP_EQUAL_TO_SPACES 2317 - VARCHAR_CMP_GREATER_THAN_SPACES 2318 2319 VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest 2320 is spaces, or something that sorts as spaces, so there is no reason to store 2321 it). 2322 2323 Example: if fpi->m_segment_size=5, and the collation is latin1_bin: 2324 2325 'abcd\0' => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0 ' <VARCHAR_CMP_EQUAL> ] 2326 'abcd' => [ 'abcd' <VARCHAR_CMP_EQUAL>] 2327 'abcd ' => [ 'abcd' <VARCHAR_CMP_EQUAL>] 2328 'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>] 2329 2330 As mentioned above, the last chunk is padded with mem-comparable images of 2331 cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc. 2332 2333 fpi->m_segment_size depends on the used collation. It is chosen to be such 2334 that no mem-comparable image of space will ever stretch across the segments 2335 (see get_segment_size_from_collation). 2336 2337 == The value part (aka unpack_info) == 2338 The value part stores the number of space characters that one needs to add 2339 when unpacking the string. 2340 - If the number is positive, it means add this many spaces at the end 2341 - If the number is negative, it means padding has added extra spaces which 2342 must be removed. 2343 2344 Storage considerations 2345 - depending on column's max size, the number may occupy 1 or 2 bytes 2346 - the number of spaces that need to be removed is not more than 2347 RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and 2348 then store it as unsigned. 2349 2350 @seealso 2351 unpack_binary_or_utf8_varchar_space_pad 2352 unpack_simple_varchar_space_pad 2353 dummy_make_unpack_info 2354 skip_variable_space_pad 2355 */ 2356 2357 void Rdb_key_def::pack_with_varchar_space_pad( 2358 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst, 2359 Rdb_pack_field_context *const pack_ctx) { 2360 Rdb_string_writer *const unpack_info = pack_ctx->writer; 2361 const CHARSET_INFO *const charset = field->charset(); 2362 const auto field_var = static_cast<Field_varstring *>(field); 2363 2364 const size_t value_length = (field_var->length_bytes == 1) 2365 ? (uint)*field->ptr 2366 : uint2korr(field->ptr); 2367 2368 const size_t trimmed_len = charset->cset->lengthsp( 2369 charset, (const char *)field_var->ptr + field_var->length_bytes, 2370 value_length); 2371 const size_t xfrm_len = charset->coll->strnxfrm( 2372 charset, buf, fpi->m_max_image_len, field_var->char_length(), 2373 field_var->ptr + field_var->length_bytes, trimmed_len, 0); 2374 2375 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */ 2376 uchar *const buf_end = buf + xfrm_len; 2377 2378 size_t encoded_size = 0; 2379 uchar *ptr = *dst; 2380 size_t padding_bytes; 2381 while (true) { 2382 const size_t copy_len = 2383 std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf); 2384 padding_bytes = fpi->m_segment_size - 1 - copy_len; 2385 memcpy(ptr, buf, copy_len); 2386 ptr += copy_len; 2387 buf += copy_len; 2388 2389 if (padding_bytes) { 2390 memcpy(ptr, fpi->space_xfrm->data(), padding_bytes); 2391 ptr += padding_bytes; 2392 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment 2393 } else { 2394 // Compare the string suffix with a hypothetical infinite string of 2395 // spaces. It could be that the first difference is beyond the end of 2396 // current chunk. 2397 const int cmp = 2398 rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm); 2399 2400 if (cmp < 0) { 2401 *ptr = VARCHAR_CMP_LESS_THAN_SPACES; 2402 } else if (cmp > 0) { 2403 *ptr = VARCHAR_CMP_GREATER_THAN_SPACES; 2404 } else { 2405 // It turns out all the rest are spaces. 2406 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; 2407 } 2408 } 2409 encoded_size += fpi->m_segment_size; 2410 2411 if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break; 2412 } 2413 2414 // m_unpack_info_stores_value means unpack_info stores the whole original 2415 // value. There is no need to store the number of trimmed/padded endspaces 2416 // in that case. 2417 if (unpack_info && !fpi->m_unpack_info_stores_value) { 2418 // (value_length - trimmed_len) is the number of trimmed space *characters* 2419 // then, padding_bytes is the number of *bytes* added as padding 2420 // then, we add 8, because we don't store negative values. 2421 DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0); 2422 DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0); 2423 const size_t removed_chars = 2424 RDB_TRIMMED_CHARS_OFFSET + 2425 (value_length - trimmed_len) / fpi->space_mb_len - 2426 padding_bytes / fpi->space_xfrm_len; 2427 2428 if (fpi->m_unpack_info_uses_two_bytes) { 2429 unpack_info->write_uint16(removed_chars); 2430 } else { 2431 DBUG_ASSERT(removed_chars < 0x100); 2432 unpack_info->write_uint8(removed_chars); 2433 } 2434 } 2435 2436 *dst += encoded_size; 2437 } 2438 2439 /* 2440 Calculate the number of used bytes in the chunk and whether this is the 2441 last chunk in the input. This is based on the old legacy format - see 2442 pack_legacy_variable_format. 2443 */ 2444 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) { 2445 uint pad = 255 - flag; 2446 uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad; 2447 if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) { 2448 return (uint)-1; 2449 } 2450 2451 *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1; 2452 return used_bytes; 2453 } 2454 2455 /* 2456 Calculate the number of used bytes in the chunk and whether this is the 2457 last chunk in the input. This is based on the new format - see 2458 pack_variable_format. 2459 */ 2460 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) { 2461 // Check for invalid flag values 2462 if (flag > RDB_ESCAPE_LENGTH) { 2463 return (uint)-1; 2464 } 2465 2466 // Values from 1 to N-1 indicate this is the last chunk and that is how 2467 // many bytes were used 2468 if (flag < RDB_ESCAPE_LENGTH) { 2469 *done = true; 2470 return flag; 2471 } 2472 2473 // A value of N means we used N-1 bytes and had more to go 2474 *done = false; 2475 return RDB_ESCAPE_LENGTH - 1; 2476 } 2477 2478 /* 2479 Unpack data that has charset information. Each two bytes of the input is 2480 treated as a wide-character and converted to its multibyte equivalent in 2481 the output. 2482 */ 2483 static int unpack_charset( 2484 const CHARSET_INFO *cset, // character set information 2485 const uchar *src, // source data to unpack 2486 uint src_len, // length of source data 2487 uchar *dst, // destination of unpacked data 2488 uint dst_len, // length of destination data 2489 uint *used_bytes) // output number of bytes used 2490 { 2491 if (src_len & 1) { 2492 /* 2493 UTF-8 characters are encoded into two-byte entities. There is no way 2494 we can have an odd number of bytes after encoding. 2495 */ 2496 return UNPACK_FAILURE; 2497 } 2498 2499 uchar *dst_end = dst + dst_len; 2500 uint used = 0; 2501 2502 for (uint ii = 0; ii < src_len; ii += 2) { 2503 my_wc_t wc = (src[ii] << 8) | src[ii + 1]; 2504 int res = cset->cset->wc_mb(cset, wc, dst + used, dst_end); 2505 DBUG_ASSERT(res > 0 && res <= 3); 2506 if (res < 0) { 2507 return UNPACK_FAILURE; 2508 } 2509 2510 used += res; 2511 } 2512 2513 *used_bytes = used; 2514 return UNPACK_SUCCESS; 2515 } 2516 2517 /* 2518 Function of type rdb_index_field_unpack_t 2519 */ 2520 2521 int Rdb_key_def::unpack_binary_or_utf8_varchar( 2522 Rdb_field_packing *const fpi, Field *const field, uchar *dst, 2523 Rdb_string_reader *const reader, 2524 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) { 2525 const uchar *ptr; 2526 size_t len = 0; 2527 bool finished = false; 2528 uchar *d0 = dst; 2529 Field_varstring *const field_var = (Field_varstring *)field; 2530 dst += field_var->length_bytes; 2531 // How much we can unpack 2532 size_t dst_len = field_var->pack_length() - field_var->length_bytes; 2533 2534 bool use_legacy_format = fpi->m_use_legacy_varbinary_format; 2535 2536 /* Decode the length-emitted encoding here */ 2537 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) { 2538 uint used_bytes; 2539 2540 /* See pack_with_varchar_encoding. */ 2541 if (use_legacy_format) { 2542 used_bytes = calc_unpack_legacy_variable_format( 2543 ptr[RDB_ESCAPE_LENGTH - 1], &finished); 2544 } else { 2545 used_bytes = 2546 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished); 2547 } 2548 2549 if (used_bytes == (uint)-1 || dst_len < used_bytes) { 2550 return UNPACK_FAILURE; // Corruption in the data 2551 } 2552 2553 /* 2554 Now, we need to decode used_bytes of data and append them to the value. 2555 */ 2556 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) { 2557 int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst, 2558 dst_len, &used_bytes); 2559 if (err != UNPACK_SUCCESS) { 2560 return err; 2561 } 2562 } else { 2563 memcpy(dst, ptr, used_bytes); 2564 } 2565 2566 dst += used_bytes; 2567 dst_len -= used_bytes; 2568 len += used_bytes; 2569 2570 if (finished) { 2571 break; 2572 } 2573 } 2574 2575 if (!finished) { 2576 return UNPACK_FAILURE; 2577 } 2578 2579 /* Save the length */ 2580 if (field_var->length_bytes == 1) { 2581 d0[0] = (uchar)len; 2582 } else { 2583 DBUG_ASSERT(field_var->length_bytes == 2); 2584 int2store(d0, len); 2585 } 2586 return UNPACK_SUCCESS; 2587 } 2588 2589 /* 2590 @seealso 2591 pack_with_varchar_space_pad - packing function 2592 unpack_simple_varchar_space_pad - unpacking function for 'simple' 2593 charsets. 2594 skip_variable_space_pad - skip function 2595 */ 2596 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad( 2597 Rdb_field_packing *const fpi, Field *const field, uchar *dst, 2598 Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) { 2599 const uchar *ptr; 2600 size_t len = 0; 2601 bool finished = false; 2602 Field_varstring *const field_var = static_cast<Field_varstring *>(field); 2603 uchar *d0 = dst; 2604 uchar *dst_end = dst + field_var->pack_length(); 2605 dst += field_var->length_bytes; 2606 2607 uint space_padding_bytes = 0; 2608 uint extra_spaces; 2609 if ((fpi->m_unpack_info_uses_two_bytes 2610 ? unp_reader->read_uint16(&extra_spaces) 2611 : unp_reader->read_uint8(&extra_spaces))) { 2612 return UNPACK_FAILURE; 2613 } 2614 2615 if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) { 2616 space_padding_bytes = 2617 -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET); 2618 extra_spaces = 0; 2619 } else { 2620 extra_spaces -= RDB_TRIMMED_CHARS_OFFSET; 2621 } 2622 2623 space_padding_bytes *= fpi->space_xfrm_len; 2624 2625 /* Decode the length-emitted encoding here */ 2626 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) { 2627 const char last_byte = ptr[fpi->m_segment_size - 1]; 2628 size_t used_bytes; 2629 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment 2630 { 2631 if (space_padding_bytes > (fpi->m_segment_size - 1)) { 2632 return UNPACK_FAILURE; // Cannot happen, corrupted data 2633 } 2634 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes; 2635 finished = true; 2636 } else { 2637 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES && 2638 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) { 2639 return UNPACK_FAILURE; // Invalid value 2640 } 2641 used_bytes = fpi->m_segment_size - 1; 2642 } 2643 2644 // Now, need to decode used_bytes of data and append them to the value. 2645 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) { 2646 if (used_bytes & 1) { 2647 /* 2648 UTF-8 characters are encoded into two-byte entities. There is no way 2649 we can have an odd number of bytes after encoding. 2650 */ 2651 return UNPACK_FAILURE; 2652 } 2653 2654 const uchar *src = ptr; 2655 const uchar *const src_end = ptr + used_bytes; 2656 while (src < src_end) { 2657 my_wc_t wc = (src[0] << 8) | src[1]; 2658 src += 2; 2659 const CHARSET_INFO *cset = fpi->m_varchar_charset; 2660 int res = cset->cset->wc_mb(cset, wc, dst, dst_end); 2661 DBUG_ASSERT(res <= 3); 2662 if (res <= 0) return UNPACK_FAILURE; 2663 dst += res; 2664 len += res; 2665 } 2666 } else { 2667 if (dst + used_bytes > dst_end) return UNPACK_FAILURE; 2668 memcpy(dst, ptr, used_bytes); 2669 dst += used_bytes; 2670 len += used_bytes; 2671 } 2672 2673 if (finished) { 2674 if (extra_spaces) { 2675 // Both binary and UTF-8 charset store space as ' ', 2676 // so the following is ok: 2677 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE; 2678 memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces); 2679 len += extra_spaces; 2680 } 2681 break; 2682 } 2683 } 2684 2685 if (!finished) return UNPACK_FAILURE; 2686 2687 /* Save the length */ 2688 if (field_var->length_bytes == 1) { 2689 d0[0] = (uchar)len; 2690 } else { 2691 DBUG_ASSERT(field_var->length_bytes == 2); 2692 int2store(d0, len); 2693 } 2694 return UNPACK_SUCCESS; 2695 } 2696 2697 ///////////////////////////////////////////////////////////////////////// 2698 2699 /* 2700 Function of type rdb_make_unpack_info_t 2701 */ 2702 2703 void Rdb_key_def::make_unpack_unknown( 2704 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)), 2705 const Field *const field, Rdb_pack_field_context *const pack_ctx) { 2706 pack_ctx->writer->write(field->ptr, field->pack_length()); 2707 } 2708 2709 /* 2710 This point of this function is only to indicate that unpack_info is 2711 available. 2712 2713 The actual unpack_info data is produced by the function that packs the key, 2714 that is, pack_with_varchar_space_pad. 2715 */ 2716 2717 void Rdb_key_def::dummy_make_unpack_info( 2718 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)), 2719 const Field *field MY_ATTRIBUTE((__unused__)), 2720 Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) { 2721 // Do nothing 2722 } 2723 2724 /* 2725 Function of type rdb_index_field_unpack_t 2726 */ 2727 2728 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi, 2729 Field *const field, uchar *const dst, 2730 Rdb_string_reader *const reader, 2731 Rdb_string_reader *const unp_reader) { 2732 const uchar *ptr; 2733 const uint len = fpi->m_unpack_data_len; 2734 // We don't use anything from the key, so skip over it. 2735 if (skip_max_length(fpi, field, reader)) { 2736 return UNPACK_FAILURE; 2737 } 2738 2739 DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr); 2740 2741 if ((ptr = (const uchar *)unp_reader->read(len))) { 2742 memcpy(dst, ptr, len); 2743 return UNPACK_SUCCESS; 2744 } 2745 return UNPACK_FAILURE; 2746 } 2747 2748 /* 2749 Function of type rdb_make_unpack_info_t 2750 */ 2751 2752 void Rdb_key_def::make_unpack_unknown_varchar( 2753 const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)), 2754 const Field *const field, Rdb_pack_field_context *const pack_ctx) { 2755 const auto f = static_cast<const Field_varstring *>(field); 2756 uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr); 2757 len += f->length_bytes; 2758 pack_ctx->writer->write(field->ptr, len); 2759 } 2760 2761 /* 2762 Function of type rdb_index_field_unpack_t 2763 2764 @detail 2765 Unpack a key part in an "unknown" collation from its 2766 (mem_comparable_form, unpack_info) form. 2767 2768 "Unknown" means we have no clue about how mem_comparable_form is made from 2769 the original string, so we keep the whole original string in the unpack_info. 2770 2771 @seealso 2772 make_unpack_unknown, unpack_unknown 2773 */ 2774 2775 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi, 2776 Field *const field, uchar *dst, 2777 Rdb_string_reader *const reader, 2778 Rdb_string_reader *const unp_reader) { 2779 const uchar *ptr; 2780 uchar *const d0 = dst; 2781 const auto f = static_cast<Field_varstring *>(field); 2782 dst += f->length_bytes; 2783 const uint len_bytes = f->length_bytes; 2784 // We don't use anything from the key, so skip over it. 2785 if ((fpi->m_skip_func)(fpi, field, reader)) { 2786 return UNPACK_FAILURE; 2787 } 2788 2789 DBUG_ASSERT(len_bytes > 0); 2790 DBUG_ASSERT(unp_reader != nullptr); 2791 2792 if ((ptr = (const uchar *)unp_reader->read(len_bytes))) { 2793 memcpy(d0, ptr, len_bytes); 2794 const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr); 2795 if ((ptr = (const uchar *)unp_reader->read(len))) { 2796 memcpy(dst, ptr, len); 2797 return UNPACK_SUCCESS; 2798 } 2799 } 2800 return UNPACK_FAILURE; 2801 } 2802 2803 /* 2804 Write unpack_data for a "simple" collation 2805 */ 2806 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer, 2807 const Rdb_collation_codec *const codec, 2808 const uchar *const src, 2809 const size_t src_len) { 2810 for (uint i = 0; i < src_len; i++) { 2811 writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]); 2812 } 2813 } 2814 2815 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader, 2816 const Rdb_collation_codec *const codec, 2817 const uchar *const src, const size_t src_len, 2818 uchar *const dst) { 2819 for (uint i = 0; i < src_len; i++) { 2820 if (codec->m_dec_size[src[i]] > 0) { 2821 uint *ret; 2822 DBUG_ASSERT(reader != nullptr); 2823 2824 if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) { 2825 return UNPACK_FAILURE; 2826 } 2827 dst[i] = codec->m_dec_idx[*ret][src[i]]; 2828 } else { 2829 dst[i] = codec->m_dec_idx[0][src[i]]; 2830 } 2831 } 2832 2833 return UNPACK_SUCCESS; 2834 } 2835 2836 /* 2837 Function of type rdb_make_unpack_info_t 2838 2839 @detail 2840 Make unpack_data for VARCHAR(n) in a "simple" charset. 2841 */ 2842 2843 void Rdb_key_def::make_unpack_simple_varchar( 2844 const Rdb_collation_codec *const codec, const Field *const field, 2845 Rdb_pack_field_context *const pack_ctx) { 2846 const auto f = static_cast<const Field_varstring *>(field); 2847 uchar *const src = f->ptr + f->length_bytes; 2848 const size_t src_len = 2849 f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr); 2850 Rdb_bit_writer bit_writer(pack_ctx->writer); 2851 // The std::min compares characters with bytes, but for simple collations, 2852 // mbmaxlen = 1. 2853 rdb_write_unpack_simple(&bit_writer, codec, src, 2854 std::min((size_t)f->char_length(), src_len)); 2855 } 2856 2857 /* 2858 Function of type rdb_index_field_unpack_t 2859 2860 @seealso 2861 pack_with_varchar_space_pad - packing function 2862 unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function 2863 */ 2864 2865 int Rdb_key_def::unpack_simple_varchar_space_pad( 2866 Rdb_field_packing *const fpi, Field *const field, uchar *dst, 2867 Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) { 2868 const uchar *ptr; 2869 size_t len = 0; 2870 bool finished = false; 2871 uchar *d0 = dst; 2872 const Field_varstring *const field_var = 2873 static_cast<Field_varstring *>(field); 2874 // For simple collations, char_length is also number of bytes. 2875 DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length()); 2876 uchar *dst_end = dst + field_var->pack_length(); 2877 dst += field_var->length_bytes; 2878 Rdb_bit_reader bit_reader(unp_reader); 2879 2880 uint space_padding_bytes = 0; 2881 uint extra_spaces; 2882 DBUG_ASSERT(unp_reader != nullptr); 2883 2884 if ((fpi->m_unpack_info_uses_two_bytes 2885 ? unp_reader->read_uint16(&extra_spaces) 2886 : unp_reader->read_uint8(&extra_spaces))) { 2887 return UNPACK_FAILURE; 2888 } 2889 2890 if (extra_spaces <= 8) { 2891 space_padding_bytes = -(static_cast<int>(extra_spaces) - 8); 2892 extra_spaces = 0; 2893 } else { 2894 extra_spaces -= 8; 2895 } 2896 2897 space_padding_bytes *= fpi->space_xfrm_len; 2898 2899 /* Decode the length-emitted encoding here */ 2900 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) { 2901 const char last_byte = 2902 ptr[fpi->m_segment_size - 1]; // number of padding bytes 2903 size_t used_bytes; 2904 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) { 2905 // this is the last one 2906 if (space_padding_bytes > (fpi->m_segment_size - 1)) { 2907 return UNPACK_FAILURE; // Cannot happen, corrupted data 2908 } 2909 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes; 2910 finished = true; 2911 } else { 2912 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES && 2913 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) { 2914 return UNPACK_FAILURE; 2915 } 2916 used_bytes = fpi->m_segment_size - 1; 2917 } 2918 2919 if (dst + used_bytes > dst_end) { 2920 // The value on disk is longer than the field definition allows? 2921 return UNPACK_FAILURE; 2922 } 2923 2924 uint ret; 2925 if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr, 2926 used_bytes, dst)) != UNPACK_SUCCESS) { 2927 return ret; 2928 } 2929 2930 dst += used_bytes; 2931 len += used_bytes; 2932 2933 if (finished) { 2934 if (extra_spaces) { 2935 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE; 2936 // pad_char has a 1-byte form in all charsets that 2937 // are handled by rdb_init_collation_mapping. 2938 memset(dst, field_var->charset()->pad_char, extra_spaces); 2939 len += extra_spaces; 2940 } 2941 break; 2942 } 2943 } 2944 2945 if (!finished) return UNPACK_FAILURE; 2946 2947 /* Save the length */ 2948 if (field_var->length_bytes == 1) { 2949 d0[0] = (uchar)len; 2950 } else { 2951 DBUG_ASSERT(field_var->length_bytes == 2); 2952 int2store(d0, len); 2953 } 2954 return UNPACK_SUCCESS; 2955 } 2956 2957 /* 2958 Function of type rdb_make_unpack_info_t 2959 2960 @detail 2961 Make unpack_data for CHAR(n) value in a "simple" charset. 2962 It is CHAR(N), so SQL layer has padded the value with spaces up to N chars. 2963 2964 @seealso 2965 The VARCHAR variant is in make_unpack_simple_varchar 2966 */ 2967 2968 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec, 2969 const Field *const field, 2970 Rdb_pack_field_context *const pack_ctx) { 2971 const uchar *const src = field->ptr; 2972 Rdb_bit_writer bit_writer(pack_ctx->writer); 2973 rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length()); 2974 } 2975 2976 /* 2977 Function of type rdb_index_field_unpack_t 2978 */ 2979 2980 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi, 2981 Field *const field MY_ATTRIBUTE((__unused__)), 2982 uchar *const dst, 2983 Rdb_string_reader *const reader, 2984 Rdb_string_reader *const unp_reader) { 2985 const uchar *ptr; 2986 const uint len = fpi->m_max_image_len; 2987 Rdb_bit_reader bit_reader(unp_reader); 2988 2989 if (!(ptr = (const uchar *)reader->read(len))) { 2990 return UNPACK_FAILURE; 2991 } 2992 2993 return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr, 2994 fpi->m_charset_codec, ptr, len, dst); 2995 } 2996 2997 // See Rdb_charset_space_info::spaces_xfrm 2998 const int RDB_SPACE_XFRM_SIZE = 32; 2999 3000 // A class holding information about how space character is represented in a 3001 // charset. 3002 class Rdb_charset_space_info { 3003 public: 3004 Rdb_charset_space_info(const Rdb_charset_space_info &) = delete; 3005 Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete; 3006 Rdb_charset_space_info() = default; 3007 3008 // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes 3009 std::vector<uchar> spaces_xfrm; 3010 3011 // length(strxfrm(' ')) 3012 size_t space_xfrm_len; 3013 3014 // length of the space character itself 3015 // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20 3016 // (length=2) 3017 size_t space_mb_len; 3018 }; 3019 3020 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE> 3021 rdb_mem_comparable_space; 3022 3023 /* 3024 @brief 3025 For a given charset, get 3026 - strxfrm(' '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long. 3027 - length of strxfrm(charset, ' ') 3028 - length of the space character in the charset 3029 3030 @param cs IN Charset to get the space for 3031 @param ptr OUT A few space characters 3032 @param len OUT Return length of the space (in bytes) 3033 3034 @detail 3035 It is tempting to pre-generate mem-comparable form of space character for 3036 every charset on server startup. 3037 One can't do that: some charsets are not initialized until somebody 3038 attempts to use them (e.g. create or open a table that has a field that 3039 uses the charset). 3040 */ 3041 3042 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs, 3043 const std::vector<uchar> **xfrm, 3044 size_t *const xfrm_len, 3045 size_t *const mb_len) { 3046 DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE); 3047 if (!rdb_mem_comparable_space[cs->number].get()) { 3048 RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex); 3049 if (!rdb_mem_comparable_space[cs->number].get()) { 3050 // Upper bound of how many bytes can be occupied by multi-byte form of a 3051 // character in any charset. 3052 const int MAX_MULTI_BYTE_CHAR_SIZE = 4; 3053 DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE); 3054 3055 // multi-byte form of the ' ' (space) character 3056 uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE]; 3057 3058 const size_t space_mb_len = cs->cset->wc_mb( 3059 cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb)); 3060 3061 // mem-comparable image of the space character 3062 std::array<uchar, 20> space; 3063 3064 const size_t space_len = cs->coll->strnxfrm( 3065 cs, space.data(), sizeof(space), 1, space_mb, space_mb_len, 0); 3066 Rdb_charset_space_info *const info = new Rdb_charset_space_info; 3067 info->space_xfrm_len = space_len; 3068 info->space_mb_len = space_mb_len; 3069 while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) { 3070 info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(), 3071 space.data() + space_len); 3072 } 3073 rdb_mem_comparable_space[cs->number].reset(info); 3074 } 3075 RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex); 3076 } 3077 3078 *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm; 3079 *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len; 3080 *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len; 3081 } 3082 3083 mysql_mutex_t rdb_mem_cmp_space_mutex; 3084 3085 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE> 3086 rdb_collation_data; 3087 mysql_mutex_t rdb_collation_data_mutex; 3088 3089 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) { 3090 return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 && 3091 !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD)); 3092 } 3093 3094 static const Rdb_collation_codec *rdb_init_collation_mapping( 3095 const my_core::CHARSET_INFO *const cs) { 3096 DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE); 3097 const Rdb_collation_codec *codec = rdb_collation_data[cs->number]; 3098 3099 if (codec == nullptr && rdb_is_collation_supported(cs)) { 3100 RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex); 3101 3102 codec = rdb_collation_data[cs->number]; 3103 if (codec == nullptr) { 3104 Rdb_collation_codec *cur = nullptr; 3105 3106 // Compute reverse mapping for simple collations. 3107 if (rdb_is_collation_supported(cs)) { 3108 cur = new Rdb_collation_codec; 3109 std::map<uchar, std::vector<uchar>> rev_map; 3110 size_t max_conflict_size = 0; 3111 for (int src = 0; src < 256; src++) { 3112 uchar dst = cs->sort_order[src]; 3113 rev_map[dst].push_back(src); 3114 max_conflict_size = std::max(max_conflict_size, rev_map[dst].size()); 3115 } 3116 cur->m_dec_idx.resize(max_conflict_size); 3117 3118 for (auto const &p : rev_map) { 3119 uchar dst = p.first; 3120 for (uint idx = 0; idx < p.second.size(); idx++) { 3121 uchar src = p.second[idx]; 3122 uchar bits = 3123 my_bit_log2(my_round_up_to_next_power(p.second.size())); 3124 cur->m_enc_idx[src] = idx; 3125 cur->m_enc_size[src] = bits; 3126 cur->m_dec_size[dst] = bits; 3127 cur->m_dec_idx[idx][dst] = src; 3128 } 3129 } 3130 3131 cur->m_make_unpack_info_func = {Rdb_key_def::make_unpack_simple_varchar, 3132 Rdb_key_def::make_unpack_simple}; 3133 cur->m_unpack_func = {Rdb_key_def::unpack_simple_varchar_space_pad, 3134 Rdb_key_def::unpack_simple}; 3135 } else { 3136 // Out of luck for now. 3137 } 3138 3139 if (cur != nullptr) { 3140 codec = cur; 3141 cur->m_cs = cs; 3142 rdb_collation_data[cs->number] = cur; 3143 } 3144 } 3145 3146 RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex); 3147 } 3148 3149 return codec; 3150 } 3151 3152 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) { 3153 int ret; 3154 if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN || 3155 cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) { 3156 /* 3157 In these collations, a character produces one weight, which is 3 bytes. 3158 Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we 3159 get 3*3+1=10 3160 */ 3161 ret = 10; 3162 } else { 3163 /* 3164 All other collations. There are two classes: 3165 - Unicode-based, except for collations mentioned in the if-condition. 3166 For these all weights are 2 bytes long, a character may produce 0..8 3167 weights. 3168 in any case, 8 bytes of payload in the segment guarantee that the last 3169 space character won't span across segments. 3170 3171 - Collations not based on unicode. These have length(strxfrm(' '))=1, 3172 there nothing to worry about. 3173 3174 In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker. 3175 */ 3176 ret = 9; 3177 } 3178 DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE); 3179 return ret; 3180 } 3181 3182 /* 3183 @brief 3184 Setup packing of index field into its mem-comparable form 3185 3186 @detail 3187 - It is possible produce mem-comparable form for any datatype. 3188 - Some datatypes also allow to unpack the original value from its 3189 mem-comparable form. 3190 = Some of these require extra information to be stored in "unpack_info". 3191 unpack_info is not a part of mem-comparable form, it is only used to 3192 restore the original value 3193 3194 @param 3195 field IN field to be packed/un-packed 3196 3197 @return 3198 TRUE - Field can be read with index-only reads 3199 FALSE - Otherwise 3200 */ 3201 3202 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr, 3203 const Field *const field, const uint keynr_arg, 3204 const uint key_part_arg, 3205 const uint16 key_length) { 3206 int res = false; 3207 enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG; 3208 3209 m_keynr = keynr_arg; 3210 m_key_part = key_part_arg; 3211 3212 m_maybe_null = field ? field->real_maybe_null() : false; 3213 m_unpack_func = nullptr; 3214 m_make_unpack_info_func = nullptr; 3215 m_unpack_data_len = 0; 3216 space_xfrm = nullptr; // safety 3217 // whether to use legacy format for varchar 3218 m_use_legacy_varbinary_format = false; 3219 // ha_rocksdb::index_flags() will pass key_descr == null to 3220 // see whether field(column) can be read-only reads through return value, 3221 // but the legacy vs. new varchar format doesn't affect return value. 3222 // Just change m_use_legacy_varbinary_format to true if key_descr isn't given. 3223 if (!key_descr || key_descr->use_legacy_varbinary_format()) { 3224 m_use_legacy_varbinary_format = true; 3225 } 3226 /* Calculate image length. By default, is is pack_length() */ 3227 m_max_image_len = 3228 field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN; 3229 m_skip_func = Rdb_key_def::skip_max_length; 3230 m_pack_func = Rdb_key_def::pack_with_make_sort_key; 3231 3232 m_covered = false; 3233 3234 switch (type) { 3235 case MYSQL_TYPE_LONGLONG: 3236 case MYSQL_TYPE_LONG: 3237 case MYSQL_TYPE_INT24: 3238 case MYSQL_TYPE_SHORT: 3239 case MYSQL_TYPE_TINY: 3240 m_unpack_func = Rdb_key_def::unpack_integer; 3241 m_covered = true; 3242 return true; 3243 3244 case MYSQL_TYPE_DOUBLE: 3245 m_unpack_func = Rdb_key_def::unpack_double; 3246 m_covered = true; 3247 return true; 3248 3249 case MYSQL_TYPE_FLOAT: 3250 m_unpack_func = Rdb_key_def::unpack_float; 3251 m_covered = true; 3252 return true; 3253 3254 case MYSQL_TYPE_NEWDECIMAL: 3255 /* 3256 Decimal is packed with Field_new_decimal::make_sort_key, which just 3257 does memcpy. 3258 Unpacking decimal values was supported only after fix for issue#253, 3259 because of that ha_rocksdb::get_storage_type() handles decimal values 3260 in a special way. 3261 */ 3262 case MYSQL_TYPE_DATETIME2: 3263 case MYSQL_TYPE_TIMESTAMP2: 3264 /* These are packed with Field_temporal_with_date_and_timef::make_sort_key 3265 */ 3266 case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */ 3267 case MYSQL_TYPE_YEAR: /* YEAR is packed with Field_tiny::make_sort_key */ 3268 /* Everything that comes here is packed with just a memcpy(). */ 3269 m_unpack_func = Rdb_key_def::unpack_binary_str; 3270 m_covered = true; 3271 return true; 3272 3273 case MYSQL_TYPE_NEWDATE: 3274 /* 3275 This is packed by Field_newdate::make_sort_key. It assumes the data is 3276 3 bytes, and packing is done by swapping the byte order (for both big- 3277 and little-endian) 3278 */ 3279 m_unpack_func = Rdb_key_def::unpack_newdate; 3280 m_covered = true; 3281 return true; 3282 case MYSQL_TYPE_TINY_BLOB: 3283 case MYSQL_TYPE_MEDIUM_BLOB: 3284 case MYSQL_TYPE_LONG_BLOB: 3285 case MYSQL_TYPE_BLOB: { 3286 if (key_descr) { 3287 // The my_charset_bin collation is special in that it will consider 3288 // shorter strings sorting as less than longer strings. 3289 // 3290 // See Field_blob::make_sort_key for details. 3291 m_max_image_len = 3292 key_length + (field->charset()->number == COLLATION_BINARY 3293 ? reinterpret_cast<const Field_blob *>(field) 3294 ->pack_length_no_ptr() 3295 : 0); 3296 // Return false because indexes on text/blob will always require 3297 // a prefix. With a prefix, the optimizer will not be able to do an 3298 // index-only scan since there may be content occuring after the prefix 3299 // length. 3300 return false; 3301 } 3302 break; 3303 } 3304 default: 3305 break; 3306 } 3307 3308 m_unpack_info_stores_value = false; 3309 /* Handle [VAR](CHAR|BINARY) */ 3310 3311 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) { 3312 /* 3313 For CHAR-based columns, check how strxfrm image will take. 3314 field->field_length = field->char_length() * cs->mbmaxlen. 3315 */ 3316 const CHARSET_INFO *cs = field->charset(); 3317 m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length); 3318 } 3319 const bool is_varchar = (type == MYSQL_TYPE_VARCHAR); 3320 const CHARSET_INFO *cs = field->charset(); 3321 // max_image_len before chunking is taken into account 3322 const int max_image_len_before_chunks = m_max_image_len; 3323 3324 if (is_varchar) { 3325 // The default for varchar is variable-length, without space-padding for 3326 // comparisons 3327 m_varchar_charset = cs; 3328 m_skip_func = Rdb_key_def::skip_variable_length; 3329 m_pack_func = Rdb_key_def::pack_with_varchar_encoding; 3330 if (!key_descr || key_descr->use_legacy_varbinary_format()) { 3331 m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len); 3332 } else { 3333 // Calculate the maximum size of the short section plus the 3334 // maximum size of the long section 3335 m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len); 3336 } 3337 3338 const auto field_var = static_cast<const Field_varstring *>(field); 3339 m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100); 3340 } 3341 3342 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) { 3343 // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for 3344 // information about character-based datatypes are compared. 3345 bool use_unknown_collation = false; 3346 DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans", 3347 use_unknown_collation = true;); 3348 3349 if (cs->number == COLLATION_BINARY) { 3350 // - SQL layer pads BINARY(N) so that it always is N bytes long. 3351 // - For VARBINARY(N), values may have different lengths, so we're using 3352 // variable-length encoding. This is also the only charset where the 3353 // values are not space-padded for comparison. 3354 m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar 3355 : Rdb_key_def::unpack_binary_str; 3356 res = true; 3357 } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) { 3358 // For _bin collations, mem-comparable form of the string is the string 3359 // itself. 3360 3361 if (is_varchar) { 3362 // VARCHARs - are compared as if they were space-padded - but are 3363 // not actually space-padded (reading the value back produces the 3364 // original value, without the padding) 3365 m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad; 3366 m_skip_func = Rdb_key_def::skip_variable_space_pad; 3367 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad; 3368 m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info; 3369 m_segment_size = get_segment_size_from_collation(cs); 3370 m_max_image_len = 3371 (max_image_len_before_chunks / (m_segment_size - 1) + 1) * 3372 m_segment_size; 3373 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len, 3374 &space_mb_len); 3375 } else { 3376 // SQL layer pads CHAR(N) values to their maximum length. 3377 // We just store that and restore it back. 3378 m_unpack_func = (cs->number == COLLATION_LATIN1_BIN) 3379 ? Rdb_key_def::unpack_binary_str 3380 : Rdb_key_def::unpack_utf8_str; 3381 } 3382 res = true; 3383 } else { 3384 // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin 3385 3386 res = true; // index-only scans are possible 3387 m_unpack_data_len = is_varchar ? 0 : field->field_length; 3388 const uint idx = is_varchar ? 0 : 1; 3389 const Rdb_collation_codec *codec = nullptr; 3390 3391 if (is_varchar) { 3392 // VARCHAR requires space-padding for doing comparisons 3393 // 3394 // The check for cs->levels_for_order is to catch 3395 // latin2_czech_cs and cp1250_czech_cs - multi-level collations 3396 // that Variable-Length Space Padded Encoding can't handle. 3397 // It is not expected to work for any other multi-level collations, 3398 // either. 3399 // Currently we handle these collations as NO_PAD, even if they have 3400 // PAD_SPACE attribute. 3401 if (cs->levels_for_order == 1) { 3402 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad; 3403 m_skip_func = Rdb_key_def::skip_variable_space_pad; 3404 m_segment_size = get_segment_size_from_collation(cs); 3405 m_max_image_len = 3406 (max_image_len_before_chunks / (m_segment_size - 1) + 1) * 3407 m_segment_size; 3408 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len, 3409 &space_mb_len); 3410 } else { 3411 // NO_LINT_DEBUG 3412 sql_print_warning( 3413 "RocksDB: you're trying to create an index " 3414 "with a multi-level collation %s", 3415 cs->name); 3416 // NO_LINT_DEBUG 3417 sql_print_warning( 3418 "MyRocks will handle this collation internally " 3419 " as if it had a NO_PAD attribute."); 3420 m_pack_func = Rdb_key_def::pack_with_varchar_encoding; 3421 m_skip_func = Rdb_key_def::skip_variable_length; 3422 } 3423 } 3424 3425 if ((codec = rdb_init_collation_mapping(cs)) != nullptr) { 3426 // The collation allows to store extra information in the unpack_info 3427 // which can be used to restore the original value from the 3428 // mem-comparable form. 3429 m_make_unpack_info_func = codec->m_make_unpack_info_func[idx]; 3430 m_unpack_func = codec->m_unpack_func[idx]; 3431 m_charset_codec = codec; 3432 } else if (use_unknown_collation) { 3433 // We have no clue about how this collation produces mem-comparable 3434 // form. Our way of restoring the original value is to keep a copy of 3435 // the original value in unpack_info. 3436 m_unpack_info_stores_value = true; 3437 m_make_unpack_info_func = is_varchar 3438 ? Rdb_key_def::make_unpack_unknown_varchar 3439 : Rdb_key_def::make_unpack_unknown; 3440 m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar 3441 : Rdb_key_def::unpack_unknown; 3442 } else { 3443 // Same as above: we don't know how to restore the value from its 3444 // mem-comparable form. 3445 // Here, we just indicate to the SQL layer we can't do it. 3446 DBUG_ASSERT(m_unpack_func == nullptr); 3447 m_unpack_info_stores_value = false; 3448 res = false; // Indicate that index-only reads are not possible 3449 } 3450 } 3451 3452 // Make an adjustment: if this column is partially covered, tell the SQL 3453 // layer we can't do index-only scans. Later when we perform an index read, 3454 // we'll check on a record-by-record basis if we can do an index-only scan 3455 // or not. 3456 uint field_length; 3457 if (field->table) { 3458 field_length = field->table->field[field->field_index]->field_length; 3459 } else { 3460 field_length = field->field_length; 3461 } 3462 3463 if (field_length != key_length) { 3464 res = false; 3465 // If this index doesn't support covered bitmaps, then we won't know 3466 // during a read if the column is actually covered or not. If so, we need 3467 // to assume the column isn't covered and skip it during unpacking. 3468 // 3469 // If key_descr == NULL, then this is a dummy field and we probably don't 3470 // need to perform this step. However, to preserve the behavior before 3471 // this change, we'll only skip this step if we have an index which 3472 // supports covered bitmaps. 3473 if (!key_descr || !key_descr->use_covered_bitmap_format()) { 3474 m_unpack_func = nullptr; 3475 m_make_unpack_info_func = nullptr; 3476 m_unpack_info_stores_value = true; 3477 } 3478 } 3479 } 3480 3481 m_covered = res; 3482 return res; 3483 } 3484 3485 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const { 3486 return tbl->key_info[m_keynr].key_part[m_key_part].field; 3487 } 3488 3489 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst, 3490 const longlong hidden_pk_id) const { 3491 DBUG_ASSERT(m_max_image_len == 8); 3492 3493 String to; 3494 rdb_netstr_append_uint64(&to, hidden_pk_id); 3495 memcpy(*dst, to.ptr(), m_max_image_len); 3496 3497 *dst += m_max_image_len; 3498 } 3499 3500 /////////////////////////////////////////////////////////////////////////////////////////// 3501 // Rdb_ddl_manager 3502 /////////////////////////////////////////////////////////////////////////////////////////// 3503 3504 Rdb_tbl_def::~Rdb_tbl_def() { 3505 auto ddl_manager = rdb_get_ddl_manager(); 3506 /* Don't free key definitions */ 3507 if (m_key_descr_arr) { 3508 for (uint i = 0; i < m_key_count; i++) { 3509 if (ddl_manager && m_key_descr_arr[i]) { 3510 ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id()); 3511 } 3512 3513 m_key_descr_arr[i] = nullptr; 3514 } 3515 3516 delete[] m_key_descr_arr; 3517 m_key_descr_arr = nullptr; 3518 } 3519 } 3520 3521 /* 3522 Put table definition DDL entry. Actual write is done at 3523 Rdb_dict_manager::commit. 3524 3525 We write 3526 dbname.tablename -> version + {key_entry, key_entry, key_entry, ... } 3527 3528 Where key entries are a tuple of 3529 ( cf_id, index_nr ) 3530 */ 3531 3532 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict, 3533 rocksdb::WriteBatch *const batch, 3534 const rocksdb::Slice &key) { 3535 StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes; 3536 indexes.alloc(Rdb_key_def::VERSION_SIZE + 3537 m_key_count * Rdb_key_def::PACKED_SIZE * 2); 3538 rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION); 3539 3540 for (uint i = 0; i < m_key_count; i++) { 3541 const Rdb_key_def &kd = *m_key_descr_arr[i]; 3542 3543 uchar flags = 3544 (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) | 3545 (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0); 3546 3547 const uint cf_id = kd.get_cf()->GetID(); 3548 /* 3549 If cf_id already exists, cf_flags must be the same. 3550 To prevent race condition, reading/modifying/committing CF flags 3551 need to be protected by mutex (dict_manager->lock()). 3552 When RocksDB supports transaction with pessimistic concurrency 3553 control, we can switch to use it and removing mutex. 3554 */ 3555 uint existing_cf_flags; 3556 const std::string cf_name = kd.get_cf()->GetName(); 3557 3558 if (dict->get_cf_flags(cf_id, &existing_cf_flags)) { 3559 // For the purposes of comparison we'll clear the partitioning bit. The 3560 // intent here is to make sure that both partitioned and non-partitioned 3561 // tables can refer to the same CF. 3562 existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE; 3563 flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE; 3564 3565 if (existing_cf_flags != flags) { 3566 my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags, 3567 existing_cf_flags); 3568 return true; 3569 } 3570 } else { 3571 dict->add_cf_flags(batch, cf_id, flags); 3572 } 3573 3574 rdb_netstr_append_uint32(&indexes, cf_id); 3575 3576 uint32 index_number = kd.get_index_number(); 3577 rdb_netstr_append_uint32(&indexes, index_number); 3578 3579 struct Rdb_index_info index_info; 3580 index_info.m_gl_index_id = {cf_id, index_number}; 3581 index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST; 3582 index_info.m_index_type = kd.m_index_type; 3583 index_info.m_kv_version = kd.m_kv_format_version; 3584 index_info.m_index_flags = kd.m_index_flags_bitmap; 3585 index_info.m_ttl_duration = kd.m_ttl_duration; 3586 3587 dict->add_or_update_index_cf_mapping(batch, &index_info); 3588 } 3589 3590 const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length()); 3591 3592 dict->put_key(batch, key, svalue); 3593 return false; 3594 } 3595 3596 time_t Rdb_tbl_def::get_create_time() { 3597 time_t create_time = m_create_time; 3598 3599 if (create_time == CREATE_TIME_UNKNOWN) { 3600 // Read it from the .frm file. It's not a problem if several threads do this 3601 // concurrently 3602 char path[FN_REFLEN]; 3603 snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home, 3604 m_dbname.c_str(), m_tablename.c_str(), reg_ext); 3605 unpack_filename(path,path); 3606 MY_STAT f_stat; 3607 if (my_stat(path, &f_stat, MYF(0))) 3608 create_time = f_stat.st_ctime; 3609 else 3610 create_time = 0; // will be shown as SQL NULL 3611 m_create_time = create_time; 3612 } 3613 return create_time; 3614 } 3615 3616 // Length that each index flag takes inside the record. 3617 // Each index in the array maps to the enum INDEX_FLAG 3618 static const std::array<uint, 1> index_flag_lengths = { 3619 {ROCKSDB_SIZEOF_TTL_RECORD}}; 3620 3621 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) { 3622 return flag & index_flags; 3623 } 3624 3625 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags, 3626 enum INDEX_FLAG flag, 3627 uint *const length) { 3628 DBUG_ASSERT_IMP(flag != MAX_FLAG, 3629 Rdb_key_def::has_index_flag(index_flags, flag)); 3630 3631 uint offset = 0; 3632 for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) { 3633 int mask = 1 << bit; 3634 3635 /* Exit once we've reached the proper flag */ 3636 if (flag & mask) { 3637 if (length != nullptr) { 3638 *length = index_flag_lengths[bit]; 3639 } 3640 break; 3641 } 3642 3643 if (index_flags & mask) { 3644 offset += index_flag_lengths[bit]; 3645 } 3646 } 3647 3648 return offset; 3649 } 3650 3651 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf, 3652 const uchar *const val, 3653 enum INDEX_FLAG flag) const { 3654 uint len; 3655 uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len); 3656 DBUG_ASSERT(offset + len <= buf->get_current_pos()); 3657 memcpy(buf->ptr() + offset, val, len); 3658 } 3659 3660 void Rdb_tbl_def::check_if_is_mysql_system_table() { 3661 static const char *const system_dbs[] = { 3662 "mysql", 3663 "performance_schema", 3664 "information_schema", 3665 }; 3666 3667 m_is_mysql_system_table = false; 3668 for (uint ii = 0; ii < array_elements(system_dbs); ii++) { 3669 if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) { 3670 m_is_mysql_system_table = true; 3671 break; 3672 } 3673 } 3674 } 3675 3676 void Rdb_tbl_def::check_and_set_read_free_rpl_table() { 3677 m_is_read_free_rpl_table = 3678 #if 0 // MARIAROCKS_NOT_YET : read-free replication is not supported 3679 rdb_read_free_regex_handler.matches(base_tablename()); 3680 #else 3681 false; 3682 #endif 3683 } 3684 3685 void Rdb_tbl_def::set_name(const std::string &name) { 3686 int err MY_ATTRIBUTE((__unused__)); 3687 3688 m_dbname_tablename = name; 3689 err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename, 3690 &m_partition); 3691 DBUG_ASSERT(err == 0); 3692 3693 check_if_is_mysql_system_table(); 3694 } 3695 3696 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() { 3697 for (uint i = 0; i < m_key_count; i++) { 3698 auto &k = m_key_descr_arr[i]; 3699 if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY || 3700 k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) { 3701 return k->get_gl_index_id(); 3702 } 3703 } 3704 3705 // Every table must have a primary key, even if it's hidden. 3706 abort(); 3707 return GL_INDEX_ID(); 3708 } 3709 3710 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) { 3711 m_index_num_to_keydef.erase(gl_index_id); 3712 } 3713 3714 void Rdb_ddl_manager::add_uncommitted_keydefs( 3715 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) { 3716 mysql_rwlock_wrlock(&m_rwlock); 3717 for (const auto &index : indexes) { 3718 m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index; 3719 } 3720 mysql_rwlock_unlock(&m_rwlock); 3721 } 3722 3723 void Rdb_ddl_manager::remove_uncommitted_keydefs( 3724 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) { 3725 mysql_rwlock_wrlock(&m_rwlock); 3726 for (const auto &index : indexes) { 3727 m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id()); 3728 } 3729 mysql_rwlock_unlock(&m_rwlock); 3730 } 3731 3732 namespace // anonymous namespace = not visible outside this source file 3733 { 3734 struct Rdb_validate_tbls : public Rdb_tables_scanner { 3735 using tbl_info_t = std::pair<std::string, bool>; 3736 using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>; 3737 3738 tbl_list_t m_list; 3739 3740 int add_table(Rdb_tbl_def *tdef) override; 3741 3742 bool compare_to_actual_tables(const std::string &datadir, bool *has_errors); 3743 3744 bool scan_for_frms(const std::string &datadir, const std::string &dbname, 3745 bool *has_errors); 3746 3747 bool check_frm_file(const std::string &fullpath, const std::string &dbname, 3748 const std::string &tablename, bool *has_errors); 3749 }; 3750 } // anonymous namespace 3751 3752 /* 3753 Get a list of tables that we expect to have .frm files for. This will use the 3754 information just read from the RocksDB data dictionary. 3755 */ 3756 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) { 3757 DBUG_ASSERT(tdef != nullptr); 3758 3759 /* Add the database/table into the list that are not temp table */ 3760 if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) { 3761 bool is_partition = tdef->base_partition().size() != 0; 3762 m_list[tdef->base_dbname()].insert( 3763 tbl_info_t(tdef->base_tablename(), is_partition)); 3764 } 3765 3766 return HA_EXIT_SUCCESS; 3767 } 3768 3769 /* 3770 Access the .frm file for this dbname/tablename and see if it is a RocksDB 3771 table (or partition table). 3772 */ 3773 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath, 3774 const std::string &dbname, 3775 const std::string &tablename, 3776 bool *has_errors) { 3777 /* Check this .frm file to see what engine it uses */ 3778 String fullfilename(fullpath.c_str(), &my_charset_bin); 3779 fullfilename.append(FN_DIRSEP); 3780 fullfilename.append(tablename.c_str()); 3781 fullfilename.append(".frm"); 3782 3783 /* 3784 This function will return the legacy_db_type of the table. Currently 3785 it does not reference the first parameter (THD* thd), but if it ever 3786 did in the future we would need to make a version that does it without 3787 the connection handle as we don't have one here. 3788 */ 3789 char eng_type_buf[NAME_CHAR_LEN+1]; 3790 LEX_CSTRING eng_type_str = {eng_type_buf, 0}; 3791 bool is_sequence; 3792 enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type_str, &is_sequence); 3793 if (type == TABLE_TYPE_UNKNOWN) { 3794 // NO_LINT_DEBUG 3795 sql_print_warning("RocksDB: Failed to open/read .from file: %s", 3796 fullfilename.ptr()); 3797 return false; 3798 } 3799 3800 if (type == TABLE_TYPE_NORMAL) { 3801 /* For a RocksDB table do we have a reference in the data dictionary? */ 3802 if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) { 3803 /* 3804 Attempt to remove the table entry from the list of tables. If this 3805 fails then we know we had a .frm file that wasn't registered in RocksDB. 3806 */ 3807 tbl_info_t element(tablename, false); 3808 if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) { 3809 // NO_LINT_DEBUG 3810 sql_print_warning( 3811 "RocksDB: Schema mismatch - " 3812 "A .frm file exists for table %s.%s, " 3813 "but that table is not registered in RocksDB", 3814 dbname.c_str(), tablename.c_str()); 3815 *has_errors = true; 3816 } 3817 } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) { 3818 /* 3819 For partition tables, see if it is in the m_list as a partition, 3820 but don't generate an error if it isn't there - we don't know that the 3821 .frm is for RocksDB. 3822 */ 3823 if (m_list.count(dbname) > 0) { 3824 m_list[dbname].erase(tbl_info_t(tablename, true)); 3825 } 3826 } 3827 } 3828 3829 return true; 3830 } 3831 3832 /* Scan the database subdirectory for .frm files */ 3833 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir, 3834 const std::string &dbname, 3835 bool *has_errors) { 3836 bool result = true; 3837 std::string fullpath = datadir + dbname; 3838 struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT)); 3839 3840 /* Access the directory */ 3841 if (dir_info == nullptr) { 3842 // NO_LINT_DEBUG 3843 sql_print_warning("RocksDB: Could not open database directory: %s", 3844 fullpath.c_str()); 3845 return false; 3846 } 3847 3848 /* Scan through the files in the directory */ 3849 struct fileinfo *file_info = dir_info->dir_entry; 3850 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) { 3851 /* Find .frm files that are not temp files (those that contain '#sql') */ 3852 const char *ext = strrchr(file_info->name, '.'); 3853 if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr && 3854 strcmp(ext, ".frm") == 0) { 3855 std::string tablename = 3856 std::string(file_info->name, ext - file_info->name); 3857 3858 /* Check to see if the .frm file is from RocksDB */ 3859 if (!check_frm_file(fullpath, dbname, tablename, has_errors)) { 3860 result = false; 3861 break; 3862 } 3863 } 3864 } 3865 3866 /* Remove any databases who have no more tables listed */ 3867 if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) { 3868 m_list.erase(dbname); 3869 } 3870 3871 /* Release the directory entry */ 3872 my_dirend(dir_info); 3873 3874 return result; 3875 } 3876 3877 /* 3878 Scan the datadir for all databases (subdirectories) and get a list of .frm 3879 files they contain 3880 */ 3881 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir, 3882 bool *has_errors) { 3883 bool result = true; 3884 struct st_my_dir *dir_info; 3885 struct fileinfo *file_info; 3886 3887 dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT)); 3888 if (dir_info == nullptr) { 3889 // NO_LINT_DEBUG 3890 sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str()); 3891 return false; 3892 } 3893 3894 file_info = dir_info->dir_entry; 3895 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) { 3896 /* Ignore files/dirs starting with '.' */ 3897 if (file_info->name[0] == '.') continue; 3898 3899 /* Ignore all non-directory files */ 3900 if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue; 3901 3902 /* Scan all the .frm files in the directory */ 3903 if (!scan_for_frms(datadir, file_info->name, has_errors)) { 3904 result = false; 3905 break; 3906 } 3907 } 3908 3909 /* Release the directory info */ 3910 my_dirend(dir_info); 3911 3912 return result; 3913 } 3914 3915 /* 3916 Validate that all auto increment values in the data dictionary are on a 3917 supported version. 3918 */ 3919 bool Rdb_ddl_manager::validate_auto_incr() { 3920 std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator()); 3921 3922 uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE]; 3923 rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC); 3924 const rocksdb::Slice auto_incr_entry_slice( 3925 reinterpret_cast<char *>(auto_incr_entry), 3926 Rdb_key_def::INDEX_NUMBER_SIZE); 3927 for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) { 3928 const rocksdb::Slice key = it->key(); 3929 const rocksdb::Slice val = it->value(); 3930 GL_INDEX_ID gl_index_id; 3931 3932 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE && 3933 memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) { 3934 break; 3935 } 3936 3937 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) { 3938 return false; 3939 } 3940 3941 if (val.size() <= Rdb_key_def::VERSION_SIZE) { 3942 return false; 3943 } 3944 3945 // Check if we have orphaned entries for whatever reason by cross 3946 // referencing ddl entries. 3947 auto ptr = reinterpret_cast<const uchar *>(key.data()); 3948 ptr += Rdb_key_def::INDEX_NUMBER_SIZE; 3949 rdb_netbuf_read_gl_index(&ptr, &gl_index_id); 3950 if (!m_dict->get_index_info(gl_index_id, nullptr)) { 3951 // NO_LINT_DEBUG 3952 sql_print_warning( 3953 "RocksDB: AUTOINC mismatch - " 3954 "Index number (%u, %u) found in AUTOINC " 3955 "but does not exist as a DDL entry", 3956 gl_index_id.cf_id, gl_index_id.index_id); 3957 return false; 3958 } 3959 3960 ptr = reinterpret_cast<const uchar *>(val.data()); 3961 const int version = rdb_netbuf_read_uint16(&ptr); 3962 if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) { 3963 // NO_LINT_DEBUG 3964 sql_print_warning( 3965 "RocksDB: AUTOINC mismatch - " 3966 "Index number (%u, %u) found in AUTOINC " 3967 "is on unsupported version %d", 3968 gl_index_id.cf_id, gl_index_id.index_id, version); 3969 return false; 3970 } 3971 } 3972 3973 if (!it->status().ok()) { 3974 return false; 3975 } 3976 3977 return true; 3978 } 3979 3980 /* 3981 Validate that all the tables in the RocksDB database dictionary match the .frm 3982 files in the datadir 3983 */ 3984 bool Rdb_ddl_manager::validate_schemas(void) { 3985 bool has_errors = false; 3986 const std::string datadir = std::string(mysql_real_data_home); 3987 Rdb_validate_tbls table_list; 3988 3989 /* Get the list of tables from the database dictionary */ 3990 if (scan_for_tables(&table_list) != 0) { 3991 return false; 3992 } 3993 3994 /* Compare that to the list of actual .frm files */ 3995 if (!table_list.compare_to_actual_tables(datadir, &has_errors)) { 3996 return false; 3997 } 3998 3999 /* 4000 Any tables left in the tables list are ones that are registered in RocksDB 4001 but don't have .frm files. 4002 */ 4003 for (const auto &db : table_list.m_list) { 4004 for (const auto &table : db.second) { 4005 // NO_LINT_DEBUG 4006 sql_print_warning( 4007 "RocksDB: Schema mismatch - " 4008 "Table %s.%s is registered in RocksDB " 4009 "but does not have a .frm file", 4010 db.first.c_str(), table.first.c_str()); 4011 has_errors = true; 4012 } 4013 } 4014 4015 return !has_errors; 4016 } 4017 4018 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg, 4019 Rdb_cf_manager *const cf_manager, 4020 const uint32_t validate_tables) { 4021 m_dict = dict_arg; 4022 mysql_rwlock_init(0, &m_rwlock); 4023 4024 /* Read the data dictionary and populate the hash */ 4025 uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE]; 4026 rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER); 4027 const rocksdb::Slice ddl_entry_slice((char *)ddl_entry, 4028 Rdb_key_def::INDEX_NUMBER_SIZE); 4029 4030 /* Reading data dictionary should always skip bloom filter */ 4031 rocksdb::Iterator *it = m_dict->new_iterator(); 4032 int i = 0; 4033 4034 uint max_index_id_in_dict = 0; 4035 m_dict->get_max_index_id(&max_index_id_in_dict); 4036 4037 for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) { 4038 const uchar *ptr; 4039 const uchar *ptr_end; 4040 const rocksdb::Slice key = it->key(); 4041 const rocksdb::Slice val = it->value(); 4042 4043 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE && 4044 memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) { 4045 break; 4046 } 4047 4048 if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) { 4049 // NO_LINT_DEBUG 4050 sql_print_error("RocksDB: Table_store: key has length %d (corruption?)", 4051 (int)key.size()); 4052 return true; 4053 } 4054 4055 Rdb_tbl_def *const tdef = 4056 new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE); 4057 4058 // Now, read the DDLs. 4059 const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE; 4060 if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) { 4061 // NO_LINT_DEBUG 4062 sql_print_error("RocksDB: Table_store: invalid keylist for table %s", 4063 tdef->full_tablename().c_str()); 4064 return true; 4065 } 4066 tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2); 4067 tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count]; 4068 4069 ptr = reinterpret_cast<const uchar *>(val.data()); 4070 const int version = rdb_netbuf_read_uint16(&ptr); 4071 if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) { 4072 // NO_LINT_DEBUG 4073 sql_print_error( 4074 "RocksDB: DDL ENTRY Version was not expected." 4075 "Expected: %d, Actual: %d", 4076 Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version); 4077 return true; 4078 } 4079 ptr_end = ptr + real_val_size; 4080 for (uint keyno = 0; ptr < ptr_end; keyno++) { 4081 GL_INDEX_ID gl_index_id; 4082 rdb_netbuf_read_gl_index(&ptr, &gl_index_id); 4083 uint flags = 0; 4084 struct Rdb_index_info index_info; 4085 if (!m_dict->get_index_info(gl_index_id, &index_info)) { 4086 // NO_LINT_DEBUG 4087 sql_print_error( 4088 "RocksDB: Could not get index information " 4089 "for Index Number (%u,%u), table %s", 4090 gl_index_id.cf_id, gl_index_id.index_id, 4091 tdef->full_tablename().c_str()); 4092 return true; 4093 } 4094 if (max_index_id_in_dict < gl_index_id.index_id) { 4095 // NO_LINT_DEBUG 4096 sql_print_error( 4097 "RocksDB: Found max index id %u from data dictionary " 4098 "but also found larger index id %u from dictionary. " 4099 "This should never happen and possibly a bug.", 4100 max_index_id_in_dict, gl_index_id.index_id); 4101 return true; 4102 } 4103 if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) { 4104 // NO_LINT_DEBUG 4105 sql_print_error( 4106 "RocksDB: Could not get Column Family Flags " 4107 "for CF Number %d, table %s", 4108 gl_index_id.cf_id, tdef->full_tablename().c_str()); 4109 return true; 4110 } 4111 4112 if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) { 4113 // The per-index cf option is deprecated. Make sure we don't have the 4114 // flag set in any existing database. NO_LINT_DEBUG 4115 // NO_LINT_DEBUG 4116 sql_print_error( 4117 "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF " 4118 "number %d, table %s", 4119 gl_index_id.cf_id, tdef->full_tablename().c_str()); 4120 } 4121 4122 rocksdb::ColumnFamilyHandle *const cfh = 4123 cf_manager->get_cf(gl_index_id.cf_id); 4124 DBUG_ASSERT(cfh != nullptr); 4125 4126 uint32 ttl_rec_offset = 4127 Rdb_key_def::has_index_flag(index_info.m_index_flags, 4128 Rdb_key_def::TTL_FLAG) 4129 ? Rdb_key_def::calculate_index_flag_offset( 4130 index_info.m_index_flags, Rdb_key_def::TTL_FLAG) 4131 : UINT_MAX; 4132 4133 /* 4134 We can't fully initialize Rdb_key_def object here, because full 4135 initialization requires that there is an open TABLE* where we could 4136 look at Field* objects and set max_length and other attributes 4137 */ 4138 tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>( 4139 gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version, 4140 index_info.m_index_type, index_info.m_kv_version, 4141 flags & Rdb_key_def::REVERSE_CF_FLAG, 4142 flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "", 4143 m_dict->get_stats(gl_index_id), index_info.m_index_flags, 4144 ttl_rec_offset, index_info.m_ttl_duration); 4145 } 4146 put(tdef); 4147 i++; 4148 } 4149 4150 /* 4151 If validate_tables is greater than 0 run the validation. Only fail the 4152 initialzation if the setting is 1. If the setting is 2 we continue. 4153 */ 4154 if (validate_tables > 0) { 4155 std::string msg; 4156 if (!validate_schemas()) { 4157 msg = 4158 "RocksDB: Problems validating data dictionary " 4159 "against .frm files, exiting"; 4160 } else if (!validate_auto_incr()) { 4161 msg = 4162 "RocksDB: Problems validating auto increment values in " 4163 "data dictionary, exiting"; 4164 } 4165 if (validate_tables == 1 && !msg.empty()) { 4166 // NO_LINT_DEBUG 4167 sql_print_error("%s", msg.c_str()); 4168 return true; 4169 } 4170 } 4171 4172 // index ids used by applications should not conflict with 4173 // data dictionary index ids 4174 if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) { 4175 max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID; 4176 } 4177 4178 m_sequence.init(max_index_id_in_dict + 1); 4179 4180 if (!it->status().ok()) { 4181 rdb_log_status_error(it->status(), "Table_store load error"); 4182 return true; 4183 } 4184 delete it; 4185 // NO_LINT_DEBUG 4186 sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables", 4187 i); 4188 return false; 4189 } 4190 4191 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name, 4192 const bool lock) { 4193 if (lock) { 4194 mysql_rwlock_rdlock(&m_rwlock); 4195 } 4196 4197 Rdb_tbl_def *rec = nullptr; 4198 const auto it = m_ddl_map.find(table_name); 4199 if (it != m_ddl_map.end()) { 4200 rec = it->second; 4201 } 4202 4203 if (lock) { 4204 mysql_rwlock_unlock(&m_rwlock); 4205 } 4206 4207 return rec; 4208 } 4209 4210 // this is a safe version of the find() function below. It acquires a read 4211 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we 4212 // are finding it. Copying it into 'ret' increments the count making sure 4213 // that the object will not be discarded until we are finished with it. 4214 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find( 4215 GL_INDEX_ID gl_index_id) { 4216 std::shared_ptr<const Rdb_key_def> ret(nullptr); 4217 4218 mysql_rwlock_rdlock(&m_rwlock); 4219 4220 auto it = m_index_num_to_keydef.find(gl_index_id); 4221 if (it != m_index_num_to_keydef.end()) { 4222 const auto table_def = find(it->second.first, false); 4223 if (table_def && it->second.second < table_def->m_key_count) { 4224 const auto &kd = table_def->m_key_descr_arr[it->second.second]; 4225 if (kd->max_storage_fmt_length() != 0) { 4226 ret = kd; 4227 } 4228 } 4229 } else { 4230 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id); 4231 if (it != m_index_num_to_uncommitted_keydef.end()) { 4232 const auto &kd = it->second; 4233 if (kd->max_storage_fmt_length() != 0) { 4234 ret = kd; 4235 } 4236 } 4237 } 4238 4239 mysql_rwlock_unlock(&m_rwlock); 4240 4241 return ret; 4242 } 4243 4244 // this method assumes at least read-only lock on m_rwlock 4245 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find( 4246 GL_INDEX_ID gl_index_id) { 4247 auto it = m_index_num_to_keydef.find(gl_index_id); 4248 if (it != m_index_num_to_keydef.end()) { 4249 auto table_def = find(it->second.first, false); 4250 if (table_def) { 4251 if (it->second.second < table_def->m_key_count) { 4252 return table_def->m_key_descr_arr[it->second.second]; 4253 } 4254 } 4255 } else { 4256 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id); 4257 if (it != m_index_num_to_uncommitted_keydef.end()) { 4258 return it->second; 4259 } 4260 } 4261 4262 static std::shared_ptr<Rdb_key_def> empty = nullptr; 4263 4264 return empty; 4265 } 4266 4267 // this method returns the name of the table based on an index id. It acquires 4268 // a read lock on m_rwlock. 4269 const std::string Rdb_ddl_manager::safe_get_table_name( 4270 const GL_INDEX_ID &gl_index_id) { 4271 std::string ret; 4272 mysql_rwlock_rdlock(&m_rwlock); 4273 auto it = m_index_num_to_keydef.find(gl_index_id); 4274 if (it != m_index_num_to_keydef.end()) { 4275 ret = it->second.first; 4276 } 4277 mysql_rwlock_unlock(&m_rwlock); 4278 return ret; 4279 } 4280 4281 void Rdb_ddl_manager::set_stats( 4282 const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) { 4283 mysql_rwlock_wrlock(&m_rwlock); 4284 for (auto src : stats) { 4285 const auto &keydef = find(src.second.m_gl_index_id); 4286 if (keydef) { 4287 keydef->m_stats = src.second; 4288 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats; 4289 } 4290 } 4291 mysql_rwlock_unlock(&m_rwlock); 4292 } 4293 4294 void Rdb_ddl_manager::adjust_stats( 4295 const std::vector<Rdb_index_stats> &new_data, 4296 const std::vector<Rdb_index_stats> &deleted_data) { 4297 mysql_rwlock_wrlock(&m_rwlock); 4298 int i = 0; 4299 for (const auto &data : {new_data, deleted_data}) { 4300 for (const auto &src : data) { 4301 const auto &keydef = find(src.m_gl_index_id); 4302 if (keydef) { 4303 keydef->m_stats.m_distinct_keys_per_prefix.resize( 4304 keydef->get_key_parts()); 4305 keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length()); 4306 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats; 4307 } 4308 } 4309 i++; 4310 } 4311 const bool should_save_stats = !m_stats2store.empty(); 4312 mysql_rwlock_unlock(&m_rwlock); 4313 if (should_save_stats) { 4314 // Queue an async persist_stats(false) call to the background thread. 4315 rdb_queue_save_stats_request(); 4316 } 4317 } 4318 4319 void Rdb_ddl_manager::persist_stats(const bool sync) { 4320 mysql_rwlock_wrlock(&m_rwlock); 4321 const auto local_stats2store = std::move(m_stats2store); 4322 m_stats2store.clear(); 4323 mysql_rwlock_unlock(&m_rwlock); 4324 4325 // Persist stats 4326 const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin(); 4327 std::vector<Rdb_index_stats> stats; 4328 std::transform(local_stats2store.begin(), local_stats2store.end(), 4329 std::back_inserter(stats), 4330 [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) { 4331 return s.second; 4332 }); 4333 m_dict->add_stats(wb.get(), stats); 4334 m_dict->commit(wb.get(), sync); 4335 } 4336 4337 /* 4338 Put table definition of `tbl` into the mapping, and also write it to the 4339 on-disk data dictionary. 4340 */ 4341 4342 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl, 4343 rocksdb::WriteBatch *const batch) { 4344 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer; 4345 4346 buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER); 4347 4348 const std::string &dbname_tablename = tbl->full_tablename(); 4349 buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size()); 4350 4351 int res; 4352 if ((res = tbl->put_dict(m_dict, batch, buf_writer.to_slice()))) { 4353 return res; 4354 } 4355 if ((res = put(tbl))) { 4356 return res; 4357 } 4358 return HA_EXIT_SUCCESS; 4359 } 4360 4361 /* Return 0 - ok, other value - error */ 4362 /* TODO: 4363 This function modifies m_ddl_map and m_index_num_to_keydef. 4364 However, these changes need to be reversed if dict_manager.commit fails 4365 See the discussion here: https://reviews.facebook.net/D35925#inline-259167 4366 Tracked by https://github.com/facebook/mysql-5.6/issues/33 4367 */ 4368 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) { 4369 Rdb_tbl_def *rec; 4370 const std::string &dbname_tablename = tbl->full_tablename(); 4371 4372 if (lock) mysql_rwlock_wrlock(&m_rwlock); 4373 4374 // We have to do this find because 'tbl' is not yet in the list. We need 4375 // to find the one we are replacing ('rec') 4376 rec = find(dbname_tablename, false); 4377 if (rec) { 4378 // Free the old record. 4379 delete rec; 4380 m_ddl_map.erase(dbname_tablename); 4381 } 4382 m_ddl_map.emplace(dbname_tablename, tbl); 4383 4384 for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) { 4385 m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] = 4386 std::make_pair(dbname_tablename, keyno); 4387 } 4388 tbl->check_and_set_read_free_rpl_table(); 4389 4390 if (lock) mysql_rwlock_unlock(&m_rwlock); 4391 return 0; 4392 } 4393 4394 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl, 4395 rocksdb::WriteBatch *const batch, 4396 const bool lock) { 4397 if (lock) mysql_rwlock_wrlock(&m_rwlock); 4398 4399 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer; 4400 key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER); 4401 const std::string &dbname_tablename = tbl->full_tablename(); 4402 key_writer.write(dbname_tablename.c_str(), dbname_tablename.size()); 4403 4404 m_dict->delete_key(batch, key_writer.to_slice()); 4405 4406 const auto it = m_ddl_map.find(dbname_tablename); 4407 if (it != m_ddl_map.end()) { 4408 // Free Rdb_tbl_def 4409 delete it->second; 4410 4411 m_ddl_map.erase(it); 4412 } 4413 4414 if (lock) mysql_rwlock_unlock(&m_rwlock); 4415 } 4416 4417 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to, 4418 rocksdb::WriteBatch *const batch) { 4419 Rdb_tbl_def *rec; 4420 Rdb_tbl_def *new_rec; 4421 bool res = true; 4422 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer; 4423 4424 mysql_rwlock_wrlock(&m_rwlock); 4425 if (!(rec = find(from, false))) { 4426 mysql_rwlock_unlock(&m_rwlock); 4427 return true; 4428 } 4429 4430 new_rec = new Rdb_tbl_def(to); 4431 4432 new_rec->m_key_count = rec->m_key_count; 4433 new_rec->m_auto_incr_val = 4434 rec->m_auto_incr_val.load(std::memory_order_relaxed); 4435 new_rec->m_key_descr_arr = rec->m_key_descr_arr; 4436 4437 new_rec->m_hidden_pk_val = 4438 rec->m_hidden_pk_val.load(std::memory_order_relaxed); 4439 4440 // so that it's not free'd when deleting the old rec 4441 rec->m_key_descr_arr = nullptr; 4442 4443 // Create a new key 4444 new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER); 4445 4446 const std::string &dbname_tablename = new_rec->full_tablename(); 4447 new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size()); 4448 4449 // Create a key to add 4450 if (!new_rec->put_dict(m_dict, batch, new_buf_writer.to_slice())) { 4451 remove(rec, batch, false); 4452 put(new_rec, false); 4453 res = false; // ok 4454 } 4455 4456 mysql_rwlock_unlock(&m_rwlock); 4457 return res; 4458 } 4459 4460 void Rdb_ddl_manager::cleanup() { 4461 for (const auto &kv : m_ddl_map) { 4462 delete kv.second; 4463 } 4464 m_ddl_map.clear(); 4465 4466 mysql_rwlock_destroy(&m_rwlock); 4467 m_sequence.cleanup(); 4468 } 4469 4470 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) { 4471 int ret; 4472 Rdb_tbl_def *rec; 4473 4474 DBUG_ASSERT(tables_scanner != nullptr); 4475 4476 mysql_rwlock_rdlock(&m_rwlock); 4477 4478 ret = 0; 4479 4480 for (const auto &kv : m_ddl_map) { 4481 rec = kv.second; 4482 ret = tables_scanner->add_table(rec); 4483 if (ret) break; 4484 } 4485 4486 mysql_rwlock_unlock(&m_rwlock); 4487 return ret; 4488 } 4489 4490 /* 4491 Rdb_binlog_manager class implementation 4492 */ 4493 4494 bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) { 4495 DBUG_ASSERT(dict_arg != nullptr); 4496 m_dict = dict_arg; 4497 4498 m_key_writer.reset(); 4499 m_key_writer.write_index(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER); 4500 m_key_slice = m_key_writer.to_slice(); 4501 return false; 4502 } 4503 4504 void Rdb_binlog_manager::cleanup() {} 4505 4506 /** 4507 Set binlog name, pos and optionally gtid into WriteBatch. 4508 This function should be called as part of transaction commit, 4509 since binlog info is set only at transaction commit. 4510 Actual write into RocksDB is not done here, so checking if 4511 write succeeded or not is not possible here. 4512 @param binlog_name Binlog name 4513 @param binlog_pos Binlog pos 4514 @param batch WriteBatch 4515 */ 4516 void Rdb_binlog_manager::update(const char *const binlog_name, 4517 const my_off_t binlog_pos, 4518 rocksdb::WriteBatchBase *const batch) { 4519 if (binlog_name && binlog_pos) { 4520 // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024 4521 const size_t RDB_MAX_BINLOG_INFO_LEN = 1024; 4522 Rdb_buf_writer<RDB_MAX_BINLOG_INFO_LEN> value_writer; 4523 4524 // store version 4525 value_writer.write_uint16(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION); 4526 4527 // store binlog file name length 4528 DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN); 4529 const uint16_t binlog_name_len = strlen(binlog_name); 4530 value_writer.write_uint16(binlog_name_len); 4531 4532 // store binlog file name 4533 value_writer.write(binlog_name, binlog_name_len); 4534 4535 // store binlog pos 4536 value_writer.write_uint32(binlog_pos); 4537 4538 #ifdef MARIADB_MERGE_2019 4539 // store binlog gtid length. 4540 // If gtid was not set, store 0 instead 4541 const uint16_t binlog_max_gtid_len = 4542 binlog_max_gtid ? strlen(binlog_max_gtid) : 0; 4543 value_writer.write_uint16(binlog_max_gtid_len); 4544 4545 if (binlog_max_gtid_len > 0) { 4546 // store binlog gtid 4547 value_writer.write(binlog_max_gtid, binlog_max_gtid_len); 4548 } 4549 #endif 4550 4551 m_dict->put_key(batch, m_key_slice, value_writer.to_slice()); 4552 } 4553 } 4554 4555 /** 4556 Read binlog committed entry stored in RocksDB, then unpack 4557 @param[OUT] binlog_name Binlog name 4558 @param[OUT] binlog_pos Binlog pos 4559 @param[OUT] binlog_gtid Binlog GTID 4560 @return 4561 true is binlog info was found (valid behavior) 4562 false otherwise 4563 */ 4564 bool Rdb_binlog_manager::read(char *const binlog_name, 4565 my_off_t *const binlog_pos, 4566 char *const binlog_gtid) const { 4567 bool ret = false; 4568 if (binlog_name) { 4569 std::string value; 4570 rocksdb::Status status = m_dict->get_value(m_key_slice, &value); 4571 if (status.ok()) { 4572 if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos, 4573 binlog_gtid)) { 4574 ret = true; 4575 } 4576 } 4577 } 4578 return ret; 4579 } 4580 4581 /** 4582 Unpack value then split into binlog_name, binlog_pos (and binlog_gtid) 4583 @param[IN] value Binlog state info fetched from RocksDB 4584 @param[OUT] binlog_name Binlog name 4585 @param[OUT] binlog_pos Binlog pos 4586 @param[OUT] binlog_gtid Binlog GTID 4587 @return true on error 4588 */ 4589 bool Rdb_binlog_manager::unpack_value(const uchar *const value, 4590 size_t value_size_arg, 4591 char *const binlog_name, 4592 my_off_t *const binlog_pos, 4593 char *const binlog_gtid) const { 4594 uint pack_len = 0; 4595 intmax_t value_size= value_size_arg; 4596 4597 DBUG_ASSERT(binlog_pos != nullptr); 4598 4599 if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0) 4600 return true; 4601 // read version 4602 const uint16_t version = rdb_netbuf_to_uint16(value); 4603 4604 pack_len += Rdb_key_def::VERSION_SIZE; 4605 if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION) return true; 4606 4607 if ((value_size -= sizeof(uint16)) < 0) 4608 return true; 4609 4610 // read binlog file name length 4611 const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len); 4612 pack_len += sizeof(uint16); 4613 4614 if (binlog_name_len >= (FN_REFLEN+1)) 4615 return true; 4616 4617 if ((value_size -= binlog_name_len) < 0) 4618 return true; 4619 4620 if (binlog_name_len) { 4621 // read and set binlog name 4622 memcpy(binlog_name, value + pack_len, binlog_name_len); 4623 binlog_name[binlog_name_len] = '\0'; 4624 pack_len += binlog_name_len; 4625 4626 if ((value_size -= sizeof(uint32)) < 0) 4627 return true; 4628 // read and set binlog pos 4629 *binlog_pos = rdb_netbuf_to_uint32(value + pack_len); 4630 pack_len += sizeof(uint32); 4631 4632 if ((value_size -= sizeof(uint16)) < 0) 4633 return true; 4634 // read gtid length 4635 const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len); 4636 pack_len += sizeof(uint16); 4637 4638 if (binlog_gtid_len >= GTID_BUF_LEN) 4639 return true; 4640 if ((value_size -= binlog_gtid_len) < 0) 4641 return true; 4642 4643 if (binlog_gtid && binlog_gtid_len > 0) { 4644 // read and set gtid 4645 memcpy(binlog_gtid, value + pack_len, binlog_gtid_len); 4646 binlog_gtid[binlog_gtid_len] = '\0'; 4647 pack_len += binlog_gtid_len; 4648 } 4649 } 4650 return false; 4651 } 4652 4653 /** 4654 Inserts a row into mysql.slave_gtid_info table. Doing this inside 4655 storage engine is more efficient than inserting/updating through MySQL. 4656 4657 @param[IN] id Primary key of the table. 4658 @param[IN] db Database name. This is column 2 of the table. 4659 @param[IN] gtid Gtid in human readable form. This is column 3 of the table. 4660 @param[IN] write_batch Handle to storage engine writer. 4661 */ 4662 void Rdb_binlog_manager::update_slave_gtid_info( 4663 const uint id, const char *const db, const char *const gtid, 4664 rocksdb::WriteBatchBase *const write_batch) { 4665 if (id && db && gtid) { 4666 // Make sure that if the slave_gtid_info table exists we have a 4667 // pointer to it via m_slave_gtid_info_tbl. 4668 if (!m_slave_gtid_info_tbl.load()) { 4669 m_slave_gtid_info_tbl.store( 4670 rdb_get_ddl_manager()->find("mysql.slave_gtid_info")); 4671 } 4672 if (!m_slave_gtid_info_tbl.load()) { 4673 // slave_gtid_info table is not present. Simply return. 4674 return; 4675 } 4676 DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1); 4677 4678 const std::shared_ptr<const Rdb_key_def> &kd = 4679 m_slave_gtid_info_tbl.load()->m_key_descr_arr[0]; 4680 String value; 4681 4682 // Build key 4683 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE + 4> key_writer; 4684 key_writer.write_index(kd->get_index_number()); 4685 key_writer.write_uint32(id); 4686 4687 // Build value 4688 Rdb_buf_writer<128> value_writer; 4689 DBUG_ASSERT(gtid); 4690 const uint db_len = strlen(db); 4691 const uint gtid_len = strlen(gtid); 4692 // 1 byte used for flags. Empty here. 4693 value_writer.write_byte(0); 4694 4695 // Write column 1. 4696 DBUG_ASSERT(strlen(db) <= 64); 4697 value_writer.write_byte(db_len); 4698 value_writer.write(db, db_len); 4699 4700 // Write column 2. 4701 DBUG_ASSERT(gtid_len <= 56); 4702 value_writer.write_byte(gtid_len); 4703 value_writer.write(gtid, gtid_len); 4704 4705 write_batch->Put(kd->get_cf(), key_writer.to_slice(), 4706 value_writer.to_slice()); 4707 } 4708 } 4709 4710 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict, 4711 Rdb_cf_manager *const cf_manager) { 4712 DBUG_ASSERT(rdb_dict != nullptr); 4713 DBUG_ASSERT(cf_manager != nullptr); 4714 4715 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST); 4716 4717 m_db = rdb_dict; 4718 4719 m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME); 4720 rocksdb::ColumnFamilyHandle *default_cfh = 4721 cf_manager->get_cf(DEFAULT_CF_NAME); 4722 4723 // System CF and default CF should be initialized 4724 if (m_system_cfh == nullptr || default_cfh == nullptr) { 4725 return HA_EXIT_FAILURE; 4726 } 4727 4728 rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID); 4729 4730 m_key_slice_max_index_id = 4731 rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id), 4732 Rdb_key_def::INDEX_NUMBER_SIZE); 4733 4734 resume_drop_indexes(); 4735 rollback_ongoing_index_creation(); 4736 4737 // Initialize system CF and default CF flags 4738 const std::unique_ptr<rocksdb::WriteBatch> wb = begin(); 4739 rocksdb::WriteBatch *const batch = wb.get(); 4740 4741 add_cf_flags(batch, m_system_cfh->GetID(), 0); 4742 add_cf_flags(batch, default_cfh->GetID(), 0); 4743 commit(batch); 4744 4745 return HA_EXIT_SUCCESS; 4746 } 4747 4748 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const { 4749 return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch); 4750 } 4751 4752 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch, 4753 const rocksdb::Slice &key, 4754 const rocksdb::Slice &value) const { 4755 batch->Put(m_system_cfh, key, value); 4756 } 4757 4758 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key, 4759 std::string *const value) const { 4760 rocksdb::ReadOptions options; 4761 options.total_order_seek = true; 4762 return m_db->Get(options, m_system_cfh, key, value); 4763 } 4764 4765 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch, 4766 const rocksdb::Slice &key) const { 4767 batch->Delete(m_system_cfh, key); 4768 } 4769 4770 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const { 4771 /* Reading data dictionary should always skip bloom filter */ 4772 rocksdb::ReadOptions read_options; 4773 read_options.total_order_seek = true; 4774 return m_db->NewIterator(read_options, m_system_cfh); 4775 } 4776 4777 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch, 4778 const bool sync) const { 4779 if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED; 4780 int res = HA_EXIT_SUCCESS; 4781 rocksdb::WriteOptions options; 4782 options.sync = sync; 4783 rocksdb::TransactionDBWriteOptimizations optimize; 4784 optimize.skip_concurrency_control = true; 4785 rocksdb::Status s = m_db->Write(options, optimize, batch); 4786 res = !s.ok(); // we return true when something failed 4787 if (res) { 4788 rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT); 4789 } 4790 batch->Clear(); 4791 return res; 4792 } 4793 4794 void Rdb_dict_manager::dump_index_id(uchar *const netbuf, 4795 Rdb_key_def::DATA_DICT_TYPE dict_type, 4796 const GL_INDEX_ID &gl_index_id) { 4797 rdb_netbuf_store_uint32(netbuf, dict_type); 4798 rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE, 4799 gl_index_id.cf_id); 4800 rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE, 4801 gl_index_id.index_id); 4802 } 4803 4804 void Rdb_dict_manager::delete_with_prefix( 4805 rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type, 4806 const GL_INDEX_ID &gl_index_id) const { 4807 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 4808 dump_index_id(&key_writer, dict_type, gl_index_id); 4809 4810 delete_key(batch, key_writer.to_slice()); 4811 } 4812 4813 void Rdb_dict_manager::add_or_update_index_cf_mapping( 4814 rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const { 4815 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 4816 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, 4817 index_info->m_gl_index_id); 4818 4819 Rdb_buf_writer<256> value_writer; 4820 4821 value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST); 4822 value_writer.write_byte(index_info->m_index_type); 4823 value_writer.write_uint16(index_info->m_kv_version); 4824 value_writer.write_uint32(index_info->m_index_flags); 4825 value_writer.write_uint64(index_info->m_ttl_duration); 4826 4827 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice()); 4828 } 4829 4830 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch, 4831 const uint32_t cf_id, 4832 const uint32_t cf_flags) const { 4833 DBUG_ASSERT(batch != nullptr); 4834 4835 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer; 4836 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION); 4837 key_writer.write_uint32(cf_id); 4838 4839 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE> 4840 value_writer; 4841 value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION); 4842 value_writer.write_uint32(cf_flags); 4843 4844 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice()); 4845 } 4846 4847 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch, 4848 const GL_INDEX_ID &gl_index_id) const { 4849 delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id); 4850 delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id); 4851 delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id); 4852 } 4853 4854 bool Rdb_dict_manager::get_index_info( 4855 const GL_INDEX_ID &gl_index_id, 4856 struct Rdb_index_info *const index_info) const { 4857 if (index_info) { 4858 index_info->m_gl_index_id = gl_index_id; 4859 } 4860 4861 bool found = false; 4862 bool error = false; 4863 std::string value; 4864 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 4865 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id); 4866 4867 const rocksdb::Status &status = get_value(key_writer.to_slice(), &value); 4868 if (status.ok()) { 4869 if (!index_info) { 4870 return true; 4871 } 4872 4873 const uchar *const val = (const uchar *)value.c_str(); 4874 const uchar *ptr = val; 4875 index_info->m_index_dict_version = rdb_netbuf_to_uint16(val); 4876 ptr += RDB_SIZEOF_INDEX_INFO_VERSION; 4877 4878 switch (index_info->m_index_dict_version) { 4879 case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS: 4880 /* Sanity check to prevent reading bogus TTL record. */ 4881 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION + 4882 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION + 4883 RDB_SIZEOF_INDEX_FLAGS + 4884 ROCKSDB_SIZEOF_TTL_RECORD) { 4885 error = true; 4886 break; 4887 } 4888 index_info->m_index_type = rdb_netbuf_to_byte(ptr); 4889 ptr += RDB_SIZEOF_INDEX_TYPE; 4890 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr); 4891 ptr += RDB_SIZEOF_KV_VERSION; 4892 index_info->m_index_flags = rdb_netbuf_to_uint32(ptr); 4893 ptr += RDB_SIZEOF_INDEX_FLAGS; 4894 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr); 4895 found = true; 4896 break; 4897 4898 case Rdb_key_def::INDEX_INFO_VERSION_TTL: 4899 /* Sanity check to prevent reading bogus into TTL record. */ 4900 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION + 4901 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION + 4902 ROCKSDB_SIZEOF_TTL_RECORD) { 4903 error = true; 4904 break; 4905 } 4906 index_info->m_index_type = rdb_netbuf_to_byte(ptr); 4907 ptr += RDB_SIZEOF_INDEX_TYPE; 4908 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr); 4909 ptr += RDB_SIZEOF_KV_VERSION; 4910 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr); 4911 if ((index_info->m_kv_version == 4912 Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) && 4913 index_info->m_ttl_duration > 0) { 4914 index_info->m_index_flags = Rdb_key_def::TTL_FLAG; 4915 } 4916 found = true; 4917 break; 4918 4919 case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT: 4920 case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID: 4921 index_info->m_index_type = rdb_netbuf_to_byte(ptr); 4922 ptr += RDB_SIZEOF_INDEX_TYPE; 4923 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr); 4924 found = true; 4925 break; 4926 4927 default: 4928 error = true; 4929 break; 4930 } 4931 4932 switch (index_info->m_index_type) { 4933 case Rdb_key_def::INDEX_TYPE_PRIMARY: 4934 case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: { 4935 error = index_info->m_kv_version > 4936 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST; 4937 break; 4938 } 4939 case Rdb_key_def::INDEX_TYPE_SECONDARY: 4940 error = index_info->m_kv_version > 4941 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST; 4942 break; 4943 default: 4944 error = true; 4945 break; 4946 } 4947 } 4948 4949 if (error) { 4950 // NO_LINT_DEBUG 4951 sql_print_error( 4952 "RocksDB: Found invalid key version number (%u, %u, %u, %llu) " 4953 "from data dictionary. This should never happen " 4954 "and it may be a bug.", 4955 index_info->m_index_dict_version, index_info->m_index_type, 4956 index_info->m_kv_version, index_info->m_ttl_duration); 4957 abort(); 4958 } 4959 4960 return found; 4961 } 4962 4963 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id, 4964 uint32_t *const cf_flags) const { 4965 DBUG_ASSERT(cf_flags != nullptr); 4966 4967 bool found = false; 4968 std::string value; 4969 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer; 4970 4971 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION); 4972 key_writer.write_uint32(cf_id); 4973 4974 const rocksdb::Status status = get_value(key_writer.to_slice(), &value); 4975 4976 if (status.ok()) { 4977 const uchar *val = (const uchar *)value.c_str(); 4978 DBUG_ASSERT(val); 4979 4980 const uint16_t version = rdb_netbuf_to_uint16(val); 4981 4982 if (version == Rdb_key_def::CF_DEFINITION_VERSION) { 4983 *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE); 4984 found = true; 4985 } 4986 } 4987 4988 return found; 4989 } 4990 4991 /* 4992 Returning index ids that were marked as deleted (via DROP TABLE) but 4993 still not removed by drop_index_thread yet, or indexes that are marked as 4994 ongoing creation. 4995 */ 4996 void Rdb_dict_manager::get_ongoing_index_operation( 4997 std::unordered_set<GL_INDEX_ID> *gl_index_ids, 4998 Rdb_key_def::DATA_DICT_TYPE dd_type) const { 4999 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING || 5000 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING); 5001 5002 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer; 5003 index_writer.write_uint32(dd_type); 5004 const rocksdb::Slice index_slice = index_writer.to_slice(); 5005 5006 rocksdb::Iterator *it = new_iterator(); 5007 for (it->Seek(index_slice); it->Valid(); it->Next()) { 5008 rocksdb::Slice key = it->key(); 5009 const uchar *const ptr = (const uchar *)key.data(); 5010 5011 /* 5012 Ongoing drop/create index operations require key to be of the form: 5013 dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3) 5014 5015 This may need to be changed in the future if we want to process a new 5016 ddl_type with different format. 5017 */ 5018 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 || 5019 rdb_netbuf_to_uint32(ptr) != dd_type) { 5020 break; 5021 } 5022 5023 // We don't check version right now since currently we always store only 5024 // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value. 5025 // If increasing version number, we need to add version check logic here. 5026 GL_INDEX_ID gl_index_id; 5027 gl_index_id.cf_id = 5028 rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE); 5029 gl_index_id.index_id = 5030 rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE); 5031 gl_index_ids->insert(gl_index_id); 5032 } 5033 delete it; 5034 } 5035 5036 /* 5037 Returning true if index_id is create/delete ongoing (undergoing creation or 5038 marked as deleted via DROP TABLE but drop_index_thread has not wiped yet) 5039 or not. 5040 */ 5041 bool Rdb_dict_manager::is_index_operation_ongoing( 5042 const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const { 5043 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING || 5044 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING); 5045 5046 bool found = false; 5047 std::string value; 5048 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 5049 dump_index_id(&key_writer, dd_type, gl_index_id); 5050 5051 const rocksdb::Status status = get_value(key_writer.to_slice(), &value); 5052 if (status.ok()) { 5053 found = true; 5054 } 5055 return found; 5056 } 5057 5058 /* 5059 Adding index_id to data dictionary so that the index id is removed 5060 by drop_index_thread, or to track online index creation. 5061 */ 5062 void Rdb_dict_manager::start_ongoing_index_operation( 5063 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id, 5064 Rdb_key_def::DATA_DICT_TYPE dd_type) const { 5065 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING || 5066 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING); 5067 5068 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 5069 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer; 5070 5071 dump_index_id(&key_writer, dd_type, gl_index_id); 5072 5073 // version as needed 5074 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) { 5075 value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION); 5076 } else { 5077 value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION); 5078 } 5079 5080 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice()); 5081 } 5082 5083 /* 5084 Removing index_id from data dictionary to confirm drop_index_thread 5085 completed dropping entire key/values of the index_id 5086 */ 5087 void Rdb_dict_manager::end_ongoing_index_operation( 5088 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id, 5089 Rdb_key_def::DATA_DICT_TYPE dd_type) const { 5090 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING || 5091 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING); 5092 5093 delete_with_prefix(batch, dd_type, gl_index_id); 5094 } 5095 5096 /* 5097 Returning true if there is no target index ids to be removed 5098 by drop_index_thread 5099 */ 5100 bool Rdb_dict_manager::is_drop_index_empty() const { 5101 std::unordered_set<GL_INDEX_ID> gl_index_ids; 5102 get_ongoing_drop_indexes(&gl_index_ids); 5103 return gl_index_ids.empty(); 5104 } 5105 5106 /* 5107 This function is supposed to be called by DROP TABLE. Logging messages 5108 that dropping indexes started, and adding data dictionary so that 5109 all associated indexes to be removed 5110 */ 5111 void Rdb_dict_manager::add_drop_table( 5112 std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys, 5113 rocksdb::WriteBatch *const batch) const { 5114 std::unordered_set<GL_INDEX_ID> dropped_index_ids; 5115 for (uint32 i = 0; i < n_keys; i++) { 5116 dropped_index_ids.insert(key_descr[i]->get_gl_index_id()); 5117 } 5118 5119 add_drop_index(dropped_index_ids, batch); 5120 } 5121 5122 /* 5123 Called during inplace index drop operations. Logging messages 5124 that dropping indexes started, and adding data dictionary so that 5125 all associated indexes to be removed 5126 */ 5127 void Rdb_dict_manager::add_drop_index( 5128 const std::unordered_set<GL_INDEX_ID> &gl_index_ids, 5129 rocksdb::WriteBatch *const batch) const { 5130 for (const auto &gl_index_id : gl_index_ids) { 5131 log_start_drop_index(gl_index_id, "Begin"); 5132 start_drop_index(batch, gl_index_id); 5133 } 5134 } 5135 5136 /* 5137 Called during inplace index creation operations. Logging messages 5138 that adding indexes started, and updates data dictionary with all associated 5139 indexes to be added. 5140 */ 5141 void Rdb_dict_manager::add_create_index( 5142 const std::unordered_set<GL_INDEX_ID> &gl_index_ids, 5143 rocksdb::WriteBatch *const batch) const { 5144 for (const auto &gl_index_id : gl_index_ids) { 5145 // NO_LINT_DEBUG 5146 sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)", 5147 gl_index_id.cf_id, gl_index_id.index_id); 5148 start_create_index(batch, gl_index_id); 5149 } 5150 } 5151 5152 /* 5153 This function is supposed to be called by drop_index_thread, when it 5154 finished dropping any index, or at the completion of online index creation. 5155 */ 5156 void Rdb_dict_manager::finish_indexes_operation( 5157 const std::unordered_set<GL_INDEX_ID> &gl_index_ids, 5158 Rdb_key_def::DATA_DICT_TYPE dd_type) const { 5159 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING || 5160 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING); 5161 5162 const std::unique_ptr<rocksdb::WriteBatch> wb = begin(); 5163 rocksdb::WriteBatch *const batch = wb.get(); 5164 5165 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes; 5166 get_ongoing_create_indexes(&incomplete_create_indexes); 5167 5168 for (const auto &gl_index_id : gl_index_ids) { 5169 if (is_index_operation_ongoing(gl_index_id, dd_type)) { 5170 end_ongoing_index_operation(batch, gl_index_id, dd_type); 5171 5172 /* 5173 Remove the corresponding incomplete create indexes from data 5174 dictionary as well 5175 */ 5176 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) { 5177 if (incomplete_create_indexes.count(gl_index_id)) { 5178 end_ongoing_index_operation(batch, gl_index_id, 5179 Rdb_key_def::DDL_CREATE_INDEX_ONGOING); 5180 } 5181 } 5182 } 5183 5184 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) { 5185 delete_index_info(batch, gl_index_id); 5186 } 5187 } 5188 commit(batch); 5189 } 5190 5191 /* 5192 This function is supposed to be called when initializing 5193 Rdb_dict_manager (at startup). If there is any index ids that are 5194 drop ongoing, printing out messages for diagnostics purposes. 5195 */ 5196 void Rdb_dict_manager::resume_drop_indexes() const { 5197 std::unordered_set<GL_INDEX_ID> gl_index_ids; 5198 get_ongoing_drop_indexes(&gl_index_ids); 5199 5200 uint max_index_id_in_dict = 0; 5201 get_max_index_id(&max_index_id_in_dict); 5202 5203 for (const auto &gl_index_id : gl_index_ids) { 5204 log_start_drop_index(gl_index_id, "Resume"); 5205 if (max_index_id_in_dict < gl_index_id.index_id) { 5206 // NO_LINT_DEBUG 5207 sql_print_error( 5208 "RocksDB: Found max index id %u from data dictionary " 5209 "but also found dropped index id (%u,%u) from drop_index " 5210 "dictionary. This should never happen and is possibly a " 5211 "bug.", 5212 max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id); 5213 abort(); 5214 } 5215 } 5216 } 5217 5218 void Rdb_dict_manager::rollback_ongoing_index_creation() const { 5219 const std::unique_ptr<rocksdb::WriteBatch> wb = begin(); 5220 rocksdb::WriteBatch *const batch = wb.get(); 5221 5222 std::unordered_set<GL_INDEX_ID> gl_index_ids; 5223 get_ongoing_create_indexes(&gl_index_ids); 5224 5225 for (const auto &gl_index_id : gl_index_ids) { 5226 // NO_LINT_DEBUG 5227 sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)", 5228 gl_index_id.cf_id, gl_index_id.index_id); 5229 5230 start_drop_index(batch, gl_index_id); 5231 } 5232 5233 commit(batch); 5234 } 5235 5236 void Rdb_dict_manager::log_start_drop_table( 5237 const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys, 5238 const char *const log_action) const { 5239 for (uint32 i = 0; i < n_keys; i++) { 5240 log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action); 5241 } 5242 } 5243 5244 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id, 5245 const char *log_action) const { 5246 struct Rdb_index_info index_info; 5247 if (!get_index_info(gl_index_id, &index_info)) { 5248 /* 5249 If we don't find the index info, it could be that it's because it was a 5250 partially created index that isn't in the data dictionary yet that needs 5251 to be rolled back. 5252 */ 5253 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes; 5254 get_ongoing_create_indexes(&incomplete_create_indexes); 5255 5256 if (!incomplete_create_indexes.count(gl_index_id)) { 5257 /* If it's not a partially created index, something is very wrong. */ 5258 // NO_LINT_DEBUG 5259 sql_print_error( 5260 "RocksDB: Failed to get column family info " 5261 "from index id (%u,%u). MyRocks data dictionary may " 5262 "get corrupted.", 5263 gl_index_id.cf_id, gl_index_id.index_id); 5264 if (rocksdb_ignore_datadic_errors) 5265 { 5266 sql_print_error("RocksDB: rocksdb_ignore_datadic_errors=1, " 5267 "trying to continue"); 5268 return; 5269 } 5270 abort(); 5271 } 5272 } 5273 } 5274 5275 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const { 5276 bool found = false; 5277 std::string value; 5278 5279 const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value); 5280 if (status.ok()) { 5281 const uchar *const val = (const uchar *)value.c_str(); 5282 const uint16_t version = rdb_netbuf_to_uint16(val); 5283 if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) { 5284 *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE); 5285 found = true; 5286 } 5287 } 5288 return found; 5289 } 5290 5291 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch, 5292 const uint32_t index_id) const { 5293 DBUG_ASSERT(batch != nullptr); 5294 5295 uint32_t old_index_id = -1; 5296 if (get_max_index_id(&old_index_id)) { 5297 if (old_index_id > index_id) { 5298 // NO_LINT_DEBUG 5299 sql_print_error( 5300 "RocksDB: Found max index id %u from data dictionary " 5301 "but trying to update to older value %u. This should " 5302 "never happen and possibly a bug.", 5303 old_index_id, index_id); 5304 return true; 5305 } 5306 } 5307 5308 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE> 5309 value_writer; 5310 value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION); 5311 value_writer.write_uint32(index_id); 5312 5313 batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice()); 5314 return false; 5315 } 5316 5317 void Rdb_dict_manager::add_stats( 5318 rocksdb::WriteBatch *const batch, 5319 const std::vector<Rdb_index_stats> &stats) const { 5320 DBUG_ASSERT(batch != nullptr); 5321 5322 for (const auto &it : stats) { 5323 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 5324 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id); 5325 5326 // IndexStats::materialize takes complete care of serialization including 5327 // storing the version 5328 const auto value = 5329 Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it}); 5330 5331 batch->Put(m_system_cfh, key_writer.to_slice(), value); 5332 } 5333 } 5334 5335 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const { 5336 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 5337 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id); 5338 5339 std::string value; 5340 const rocksdb::Status status = get_value(key_writer.to_slice(), &value); 5341 if (status.ok()) { 5342 std::vector<Rdb_index_stats> v; 5343 // unmaterialize checks if the version matches 5344 if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) { 5345 return v[0]; 5346 } 5347 } 5348 5349 return Rdb_index_stats(); 5350 } 5351 5352 rocksdb::Status Rdb_dict_manager::put_auto_incr_val( 5353 rocksdb::WriteBatchBase *batch, const GL_INDEX_ID &gl_index_id, 5354 ulonglong val, bool overwrite) const { 5355 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 5356 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id); 5357 5358 // Value is constructed by storing the version and the value. 5359 Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION + 5360 ROCKSDB_SIZEOF_AUTOINC_VALUE> 5361 value_writer; 5362 value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION); 5363 value_writer.write_uint64(val); 5364 5365 if (overwrite) { 5366 return batch->Put(m_system_cfh, key_writer.to_slice(), 5367 value_writer.to_slice()); 5368 } 5369 return batch->Merge(m_system_cfh, key_writer.to_slice(), 5370 value_writer.to_slice()); 5371 } 5372 5373 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id, 5374 ulonglong *new_val) const { 5375 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer; 5376 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id); 5377 5378 std::string value; 5379 const rocksdb::Status status = get_value(key_writer.to_slice(), &value); 5380 5381 if (status.ok()) { 5382 const uchar *const val = reinterpret_cast<const uchar *>(value.data()); 5383 5384 if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) { 5385 *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION); 5386 return true; 5387 } 5388 } 5389 return false; 5390 } 5391 5392 uint Rdb_seq_generator::get_and_update_next_number( 5393 Rdb_dict_manager *const dict) { 5394 DBUG_ASSERT(dict != nullptr); 5395 5396 uint res; 5397 RDB_MUTEX_LOCK_CHECK(m_mutex); 5398 5399 res = m_next_number++; 5400 5401 const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin(); 5402 rocksdb::WriteBatch *const batch = wb.get(); 5403 5404 DBUG_ASSERT(batch != nullptr); 5405 dict->update_max_index_id(batch, res); 5406 dict->commit(batch); 5407 5408 RDB_MUTEX_UNLOCK_CHECK(m_mutex); 5409 5410 return res; 5411 } 5412 5413 } // namespace myrocks 5414