1 /*
2 Copyright (c) 2012,2013 Monty Program Ab
3 Copyright (c) 2020, MariaDB Corporation.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
17
18 #ifdef USE_PRAGMA_IMPLEMENTATION
19 #pragma implementation // gcc: Class implementation
20 #endif
21
22 /* For use of 'PRIu64': */
23 #define __STDC_FORMAT_MACROS
24
25 #include <my_global.h>
26 /* This C++ file's header file */
27 #include "./rdb_datadic.h"
28
29 #include <inttypes.h>
30 /* C++ standard header files */
31 #include <algorithm>
32 #include <array>
33 #include <limits>
34 #include <map>
35 #include <set>
36 #include <string>
37 #include <utility>
38 #include <vector>
39
40 /* MySQL header files */
41 #include "./field.h"
42 #include "./key.h"
43 #include "./m_ctype.h"
44 #include "./my_bit.h"
45 #include "./my_bitmap.h"
46 #include "./sql_table.h"
47
48 /* MyRocks header files */
49 #include "./ha_rocksdb.h"
50 #include "./ha_rocksdb_proto.h"
51 #include "./my_stacktrace.h"
52 #include "./rdb_cf_manager.h"
53 #include "./rdb_psi.h"
54 #include "./rdb_utils.h"
55
56 namespace myrocks {
57
58 void get_mem_comparable_space(const CHARSET_INFO *cs,
59 const std::vector<uchar> **xfrm, size_t *xfrm_len,
60 size_t *mb_len);
61
62 /*
63 MariaDB's replacement for FB/MySQL Field::check_field_name_match :
64 */
field_check_field_name_match(Field * field,const char * name)65 inline bool field_check_field_name_match(Field *field, const char *name)
66 {
67 return (0 == my_strcasecmp(system_charset_info,
68 field->field_name.str,
69 name));
70 }
71
72
73 /*
74 Decode current key field
75 @param fpi IN data structure contains field metadata
76 @param field IN current field
77 @param reader IN key slice reader
78 @param unp_reader IN unpack information reader
79 @return
80 HA_EXIT_SUCCESS OK
81 other HA_ERR error code
82 */
decode_field(Rdb_field_packing * fpi,Field * field,Rdb_string_reader * reader,const uchar * const default_value,Rdb_string_reader * unpack_reader)83 int Rdb_convert_to_record_key_decoder::decode_field(
84 Rdb_field_packing *fpi, Field *field, Rdb_string_reader *reader,
85 const uchar *const default_value, Rdb_string_reader *unpack_reader) {
86 if (fpi->m_maybe_null) {
87 const char *nullp;
88 if (!(nullp = reader->read(1))) {
89 return HA_EXIT_FAILURE;
90 }
91
92 if (*nullp == 0) {
93 /* Set the NULL-bit of this field */
94 field->set_null();
95 /* Also set the field to its default value */
96 memcpy(field->ptr, default_value, field->pack_length());
97 return HA_EXIT_SUCCESS;
98 } else if (*nullp == 1) {
99 field->set_notnull();
100 } else {
101 return HA_EXIT_FAILURE;
102 }
103 }
104
105 return (fpi->m_unpack_func)(fpi, field, field->ptr, reader, unpack_reader);
106 }
107
108 /*
109 Decode current key field
110
111 @param buf OUT the buf starting address
112 @param offset OUT the bytes offset when data is written
113 @param fpi IN data structure contains field metadata
114 @param table IN current table
115 @param field IN current field
116 @param has_unpack_inf IN whether contains unpack inf
117 @param reader IN key slice reader
118 @param unp_reader IN unpack information reader
119 @return
120 HA_EXIT_SUCCESS OK
121 other HA_ERR error code
122 */
decode(uchar * const buf,uint * offset,Rdb_field_packing * fpi,TABLE * table,Field * field,bool has_unpack_info,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)123 int Rdb_convert_to_record_key_decoder::decode(
124 uchar *const buf, uint *offset, Rdb_field_packing *fpi, TABLE *table,
125 Field *field, bool has_unpack_info, Rdb_string_reader *reader,
126 Rdb_string_reader *unpack_reader) {
127 DBUG_ASSERT(buf != nullptr);
128 DBUG_ASSERT(offset != nullptr);
129
130 uint field_offset = field->ptr - table->record[0];
131 *offset = field_offset;
132 uint null_offset = field->null_offset();
133 bool maybe_null = field->real_maybe_null();
134
135 field->move_field(buf + field_offset,
136 maybe_null ? buf + null_offset : nullptr, field->null_bit);
137
138 // If we need unpack info, but there is none, tell the unpack function
139 // this by passing unp_reader as nullptr. If we never read unpack_info
140 // during unpacking anyway, then there won't an error.
141 bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
142
143 int res =
144 decode_field(fpi, field, reader, table->s->default_values + field_offset,
145 maybe_missing_unpack ? nullptr : unpack_reader);
146
147 // Restore field->ptr and field->null_ptr
148 field->move_field(table->record[0] + field_offset,
149 maybe_null ? table->record[0] + null_offset : nullptr,
150 field->null_bit);
151 if (res != UNPACK_SUCCESS) {
152 return HA_ERR_ROCKSDB_CORRUPT_DATA;
153 }
154 return HA_EXIT_SUCCESS;
155 }
156
157 /*
158 Skip current key field
159
160 @param fpi IN data structure contains field metadata
161 @param field IN current field
162 @param reader IN key slice reader
163 @param unp_reader IN unpack information reader
164 @return
165 HA_EXIT_SUCCESS OK
166 other HA_ERR error code
167 */
skip(const Rdb_field_packing * fpi,const Field * field,Rdb_string_reader * reader,Rdb_string_reader * unp_reader)168 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
169 const Field *field,
170 Rdb_string_reader *reader,
171 Rdb_string_reader *unp_reader) {
172 /* It is impossible to unpack the column. Skip it. */
173 if (fpi->m_maybe_null) {
174 const char *nullp;
175 if (!(nullp = reader->read(1))) {
176 return HA_ERR_ROCKSDB_CORRUPT_DATA;
177 }
178 if (*nullp == 0) {
179 /* This is a NULL value */
180 return HA_EXIT_SUCCESS;
181 }
182 /* If NULL marker is not '0', it can be only '1' */
183 if (*nullp != 1) {
184 return HA_ERR_ROCKSDB_CORRUPT_DATA;
185 }
186 }
187 if ((fpi->m_skip_func)(fpi, field, reader)) {
188 return HA_ERR_ROCKSDB_CORRUPT_DATA;
189 }
190 // If this is a space padded varchar, we need to skip the indicator
191 // bytes for trailing bytes. They're useless since we can't restore the
192 // field anyway.
193 //
194 // There is a special case for prefixed varchars where we do not
195 // generate unpack info, because we know prefixed varchars cannot be
196 // unpacked. In this case, it is not necessary to skip.
197 if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
198 !fpi->m_unpack_info_stores_value) {
199 unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
200 }
201 return HA_EXIT_SUCCESS;
202 }
203
Rdb_key_field_iterator(const Rdb_key_def * key_def,Rdb_field_packing * pack_info,Rdb_string_reader * reader,Rdb_string_reader * unp_reader,TABLE * table,bool has_unpack_info,const MY_BITMAP * covered_bitmap,uchar * const buf)204 Rdb_key_field_iterator::Rdb_key_field_iterator(
205 const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
206 Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
207 bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
208 m_key_def = key_def;
209 m_pack_info = pack_info;
210 m_iter_index = 0;
211 m_iter_end = key_def->get_key_parts();
212 m_reader = reader;
213 m_unp_reader = unp_reader;
214 m_table = table;
215 m_has_unpack_info = has_unpack_info;
216 m_covered_bitmap = covered_bitmap;
217 m_buf = buf;
218 m_secondary_key =
219 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
220 m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
221 m_is_hidden_pk =
222 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
223 m_curr_bitmap_pos = 0;
224 m_offset = 0;
225 }
226
get_dst() const227 void *Rdb_key_field_iterator::get_dst() const { return m_buf + m_offset; }
228
get_field_index() const229 int Rdb_key_field_iterator::get_field_index() const {
230 DBUG_ASSERT(m_field != nullptr);
231 return m_field->field_index;
232 }
233
get_is_null() const234 bool Rdb_key_field_iterator::get_is_null() const { return m_is_null; }
get_field() const235 Field *Rdb_key_field_iterator::get_field() const {
236 DBUG_ASSERT(m_field != nullptr);
237 return m_field;
238 }
239
has_next()240 bool Rdb_key_field_iterator::has_next() { return m_iter_index < m_iter_end; }
241
242 /**
243 Iterate each field in the key and decode/skip one by one
244 */
next()245 int Rdb_key_field_iterator::next() {
246 int status = HA_EXIT_SUCCESS;
247 while (m_iter_index < m_iter_end) {
248 int curr_index = m_iter_index++;
249
250 m_fpi = &m_pack_info[curr_index];
251 /*
252 Hidden pk field is packed at the end of the secondary keys, but the SQL
253 layer does not know about it. Skip retrieving field if hidden pk.
254 */
255 if ((m_secondary_key && m_hidden_pk_exists &&
256 curr_index + 1 == m_iter_end) ||
257 m_is_hidden_pk) {
258 DBUG_ASSERT(m_fpi->m_unpack_func);
259 if ((m_fpi->m_skip_func)(m_fpi, nullptr, m_reader)) {
260 return HA_ERR_ROCKSDB_CORRUPT_DATA;
261 }
262 return HA_EXIT_SUCCESS;
263 }
264
265 m_field = m_fpi->get_field_in_table(m_table);
266
267 bool covered_column = true;
268 if (m_covered_bitmap != nullptr &&
269 m_field->real_type() == MYSQL_TYPE_VARCHAR && !m_fpi->m_covered) {
270 uint tmp= m_curr_bitmap_pos++;
271 covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
272 bitmap_is_set(m_covered_bitmap, tmp);
273 }
274
275 if (m_fpi->m_unpack_func && covered_column) {
276 /* It is possible to unpack this column. Do it. */
277 status = Rdb_convert_to_record_key_decoder::decode(
278 m_buf, &m_offset, m_fpi, m_table, m_field, m_has_unpack_info,
279 m_reader, m_unp_reader);
280 if (status) {
281 return status;
282 }
283 break;
284 } else {
285 status = Rdb_convert_to_record_key_decoder::skip(m_fpi, m_field, m_reader,
286 m_unp_reader);
287 if (status) {
288 return status;
289 }
290 }
291 }
292 return HA_EXIT_SUCCESS;
293 }
294
295 /*
296 Rdb_key_def class implementation
297 */
Rdb_key_def(uint indexnr_arg,uint keyno_arg,rocksdb::ColumnFamilyHandle * cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_per_partition_cf_arg,const char * _name,Rdb_index_stats _stats,uint32 index_flags_bitmap,uint32 ttl_rec_offset,uint64 ttl_duration)298 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
299 rocksdb::ColumnFamilyHandle *cf_handle_arg,
300 uint16_t index_dict_version_arg, uchar index_type_arg,
301 uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
302 bool is_per_partition_cf_arg, const char *_name,
303 Rdb_index_stats _stats, uint32 index_flags_bitmap,
304 uint32 ttl_rec_offset, uint64 ttl_duration)
305 : m_index_number(indexnr_arg),
306 m_cf_handle(cf_handle_arg),
307 m_index_dict_version(index_dict_version_arg),
308 m_index_type(index_type_arg),
309 m_kv_format_version(kv_format_version_arg),
310 m_is_reverse_cf(is_reverse_cf_arg),
311 m_is_per_partition_cf(is_per_partition_cf_arg),
312 m_name(_name),
313 m_stats(_stats),
314 m_index_flags_bitmap(index_flags_bitmap),
315 m_ttl_rec_offset(ttl_rec_offset),
316 m_ttl_duration(ttl_duration),
317 m_ttl_column(""),
318 m_pk_part_no(nullptr),
319 m_pack_info(nullptr),
320 m_keyno(keyno_arg),
321 m_key_parts(0),
322 m_ttl_pk_key_part_offset(UINT_MAX),
323 m_ttl_field_index(UINT_MAX),
324 m_prefix_extractor(nullptr),
325 m_maxlength(0) // means 'not intialized'
326 {
327 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
328 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
329 m_total_index_flags_length =
330 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
331 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
332 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
333 m_total_index_flags_length == 0);
334 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
335 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
336 m_total_index_flags_length == 0);
337 DBUG_ASSERT(m_cf_handle != nullptr);
338 }
339
Rdb_key_def(const Rdb_key_def & k)340 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
341 : m_index_number(k.m_index_number),
342 m_cf_handle(k.m_cf_handle),
343 m_is_reverse_cf(k.m_is_reverse_cf),
344 m_is_per_partition_cf(k.m_is_per_partition_cf),
345 m_name(k.m_name),
346 m_stats(k.m_stats),
347 m_index_flags_bitmap(k.m_index_flags_bitmap),
348 m_ttl_rec_offset(k.m_ttl_rec_offset),
349 m_ttl_duration(k.m_ttl_duration),
350 m_ttl_column(k.m_ttl_column),
351 m_pk_part_no(k.m_pk_part_no),
352 m_pack_info(k.m_pack_info),
353 m_keyno(k.m_keyno),
354 m_key_parts(k.m_key_parts),
355 m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
356 m_ttl_field_index(UINT_MAX),
357 m_prefix_extractor(k.m_prefix_extractor),
358 m_maxlength(k.m_maxlength) {
359 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
360 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
361 m_total_index_flags_length =
362 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
363 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
364 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
365 m_total_index_flags_length == 0);
366 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
367 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
368 m_total_index_flags_length == 0);
369 if (k.m_pack_info) {
370 const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
371 void *pack_info= my_malloc(PSI_INSTRUMENT_ME, size, MYF(0));
372 memcpy(pack_info, k.m_pack_info, size);
373 m_pack_info = reinterpret_cast<Rdb_field_packing *>(pack_info);
374 }
375
376 if (k.m_pk_part_no) {
377 const size_t size = sizeof(uint) * m_key_parts;
378 m_pk_part_no = reinterpret_cast<uint *>(my_malloc(PSI_INSTRUMENT_ME, size, MYF(0)));
379 memcpy(m_pk_part_no, k.m_pk_part_no, size);
380 }
381 }
382
~Rdb_key_def()383 Rdb_key_def::~Rdb_key_def() {
384 mysql_mutex_destroy(&m_mutex);
385
386 my_free(m_pk_part_no);
387 m_pk_part_no = nullptr;
388
389 my_free(m_pack_info);
390 m_pack_info = nullptr;
391 }
392
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)393 void Rdb_key_def::setup(const TABLE *const tbl,
394 const Rdb_tbl_def *const tbl_def) {
395 DBUG_ASSERT(tbl != nullptr);
396 DBUG_ASSERT(tbl_def != nullptr);
397
398 /*
399 Set max_length based on the table. This can be called concurrently from
400 multiple threads, so there is a mutex to protect this code.
401 */
402 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
403 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
404 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
405 if (!m_maxlength) {
406 RDB_MUTEX_LOCK_CHECK(m_mutex);
407 if (m_maxlength != 0) {
408 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
409 return;
410 }
411
412 KEY *key_info = nullptr;
413 KEY *pk_info = nullptr;
414 if (!is_hidden_pk) {
415 key_info = &tbl->key_info[m_keyno];
416 if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
417 m_name = std::string(key_info->name.str);
418 } else {
419 m_name = HIDDEN_PK_NAME;
420 }
421
422 if (secondary_key) {
423 m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts;
424 } else {
425 pk_info = nullptr;
426 m_pk_key_parts = 0;
427 }
428
429 // "unique" secondary keys support:
430 m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts;
431
432 if (secondary_key) {
433 /*
434 In most cases, SQL layer puts PK columns as invisible suffix at the
435 end of secondary key. There are cases where this doesn't happen:
436 - unique secondary indexes.
437 - partitioned tables.
438
439 Internally, we always need PK columns as suffix (and InnoDB does,
440 too, if you were wondering).
441
442 The loop below will attempt to put all PK columns at the end of key
443 definition. Columns that are already included in the index (either
444 by the user or by "extended keys" feature) are not included for the
445 second time.
446 */
447 m_key_parts += m_pk_key_parts;
448 }
449
450 if (secondary_key) {
451 m_pk_part_no = reinterpret_cast<uint *>(
452 my_malloc(PSI_INSTRUMENT_ME, sizeof(uint) * m_key_parts, MYF(0)));
453 } else {
454 m_pk_part_no = nullptr;
455 }
456
457 const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
458 m_pack_info =
459 reinterpret_cast<Rdb_field_packing *>(my_malloc(PSI_INSTRUMENT_ME, size, MYF(0)));
460
461 /*
462 Guaranteed not to error here as checks have been made already during
463 table creation.
464 */
465 Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
466 &m_ttl_field_index, true);
467
468 size_t max_len = INDEX_NUMBER_SIZE;
469 int unpack_len = 0;
470 int max_part_len = 0;
471 bool simulating_extkey = false;
472 uint dst_i = 0;
473
474 uint keyno_to_set = m_keyno;
475 uint keypart_to_set = 0;
476
477 if (is_hidden_pk) {
478 Field *field = nullptr;
479 m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
480 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
481 max_len += m_pack_info[dst_i].m_max_image_len;
482 max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
483 dst_i++;
484 } else {
485 KEY_PART_INFO *key_part = key_info->key_part;
486
487 /* this loop also loops over the 'extended key' tail */
488 for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
489 Field *const field = key_part ? key_part->field : nullptr;
490
491 if (simulating_extkey && !hidden_pk_exists) {
492 DBUG_ASSERT(secondary_key);
493 /* Check if this field is already present in the key definition */
494 bool found = false;
495 for (uint j= 0; j < key_info->ext_key_parts; j++) {
496 if (field->field_index ==
497 key_info->key_part[j].field->field_index &&
498 key_part->length == key_info->key_part[j].length) {
499 found = true;
500 break;
501 }
502 }
503
504 if (found) {
505 key_part++;
506 continue;
507 }
508 }
509
510 if (field && field->real_maybe_null()) max_len += 1; // NULL-byte
511
512 m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
513 key_part ? key_part->length : 0);
514 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
515
516 if (pk_info) {
517 m_pk_part_no[dst_i] = -1;
518 for (uint j = 0; j < m_pk_key_parts; j++) {
519 if (field->field_index == pk_info->key_part[j].field->field_index) {
520 m_pk_part_no[dst_i] = j;
521 break;
522 }
523 }
524 } else if (secondary_key && hidden_pk_exists) {
525 /*
526 The hidden pk can never be part of the sk. So it is always
527 appended to the end of the sk.
528 */
529 m_pk_part_no[dst_i] = -1;
530 if (simulating_extkey) m_pk_part_no[dst_i] = 0;
531 }
532
533 max_len += m_pack_info[dst_i].m_max_image_len;
534
535 max_part_len =
536 std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
537
538 /*
539 Check key part name here, if it matches the TTL column then we store
540 the offset of the TTL key part here.
541 */
542 if (!m_ttl_column.empty() &&
543 field_check_field_name_match(field, m_ttl_column.c_str())) {
544 DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
545 DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
546 DBUG_ASSERT(!field->real_maybe_null());
547 m_ttl_pk_key_part_offset = dst_i;
548 }
549
550 key_part++;
551 /*
552 For "unique" secondary indexes, pretend they have
553 "index extensions".
554
555 MariaDB also has this property: if an index has a partially-covered
556 column like KEY(varchar_col(N)), then the SQL layer will think it is
557 not "extended" with PK columns. The code below handles this case,
558 also.
559 */
560 if (secondary_key && src_i+1 == key_info->ext_key_parts) {
561 simulating_extkey = true;
562 if (!hidden_pk_exists) {
563 keyno_to_set = tbl->s->primary_key;
564 key_part = pk_info->key_part;
565 keypart_to_set = (uint)-1;
566 } else {
567 keyno_to_set = tbl_def->m_key_count - 1;
568 key_part = nullptr;
569 keypart_to_set = 0;
570 }
571 }
572
573 dst_i++;
574 }
575 }
576
577 m_key_parts = dst_i;
578
579 /* Initialize the memory needed by the stats structure */
580 m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
581
582 /* Cache prefix extractor for bloom filter usage later */
583 rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
584 m_prefix_extractor = opt.prefix_extractor;
585
586 /*
587 This should be the last member variable set before releasing the mutex
588 so that other threads can't see the object partially set up.
589 */
590 m_maxlength = max_len;
591
592 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
593 }
594 }
595
596 /*
597 Determine if the table has TTL enabled by parsing the table comment.
598
599 @param[IN] table_arg
600 @param[IN] tbl_def_arg
601 @param[OUT] ttl_duration Default TTL value parsed from table comment
602 */
extract_ttl_duration(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,uint64 * ttl_duration)603 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
604 const Rdb_tbl_def *const tbl_def_arg,
605 uint64 *ttl_duration) {
606 DBUG_ASSERT(table_arg != nullptr);
607 DBUG_ASSERT(tbl_def_arg != nullptr);
608 DBUG_ASSERT(ttl_duration != nullptr);
609 std::string table_comment(table_arg->s->comment.str,
610 table_arg->s->comment.length);
611
612 bool ttl_duration_per_part_match_found = false;
613 std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
614 table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
615 RDB_TTL_DURATION_QUALIFIER);
616
617 /* If we don't have a ttl duration, nothing to do here. */
618 if (ttl_duration_str.empty()) {
619 return HA_EXIT_SUCCESS;
620 }
621
622 /*
623 Catch errors where a non-integral value was used as ttl duration, strtoull
624 will return 0.
625 */
626 *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
627 if (!*ttl_duration) {
628 my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
629 return HA_EXIT_FAILURE;
630 }
631
632 return HA_EXIT_SUCCESS;
633 }
634
635 /*
636 Determine if the table has TTL enabled by parsing the table comment.
637
638 @param[IN] table_arg
639 @param[IN] tbl_def_arg
640 @param[OUT] ttl_column TTL column in the table
641 @param[IN] skip_checks Skip validation checks (when called in
642 setup())
643 */
extract_ttl_col(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,std::string * ttl_column,uint * ttl_field_index,bool skip_checks)644 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
645 const Rdb_tbl_def *const tbl_def_arg,
646 std::string *ttl_column,
647 uint *ttl_field_index, bool skip_checks) {
648 std::string table_comment(table_arg->s->comment.str,
649 table_arg->s->comment.length);
650 /*
651 Check if there is a TTL column specified. Note that this is not required
652 and if omitted, an 8-byte ttl field will be prepended to each record
653 implicitly.
654 */
655 bool ttl_col_per_part_match_found = false;
656 std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
657 table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
658 RDB_TTL_COL_QUALIFIER);
659
660 if (skip_checks) {
661 for (uint i = 0; i < table_arg->s->fields; i++) {
662 Field *const field = table_arg->field[i];
663 if (field_check_field_name_match(field, ttl_col_str.c_str())) {
664 *ttl_column = ttl_col_str;
665 *ttl_field_index = i;
666 }
667 }
668 return HA_EXIT_SUCCESS;
669 }
670
671 /* Check if TTL column exists in table */
672 if (!ttl_col_str.empty()) {
673 bool found = false;
674 for (uint i = 0; i < table_arg->s->fields; i++) {
675 Field *const field = table_arg->field[i];
676 if (field_check_field_name_match(field, ttl_col_str.c_str()) &&
677 field->real_type() == MYSQL_TYPE_LONGLONG &&
678 field->key_type() == HA_KEYTYPE_ULONGLONG &&
679 !field->real_maybe_null()) {
680 *ttl_column = ttl_col_str;
681 *ttl_field_index = i;
682 found = true;
683 break;
684 }
685 }
686
687 if (!found) {
688 my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
689 return HA_EXIT_FAILURE;
690 }
691 }
692
693 return HA_EXIT_SUCCESS;
694 }
695
gen_qualifier_for_table(const char * const qualifier,const std::string & partition_name)696 const std::string Rdb_key_def::gen_qualifier_for_table(
697 const char *const qualifier, const std::string &partition_name) {
698 bool has_partition = !partition_name.empty();
699 std::string qualifier_str = "";
700
701 if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
702 return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
703 : qualifier_str + RDB_CF_NAME_QUALIFIER +
704 RDB_QUALIFIER_VALUE_SEP;
705 } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
706 return has_partition
707 ? gen_ttl_duration_qualifier_for_partition(partition_name)
708 : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
709 RDB_QUALIFIER_VALUE_SEP;
710 } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
711 return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
712 : qualifier_str + RDB_TTL_COL_QUALIFIER +
713 RDB_QUALIFIER_VALUE_SEP;
714 } else {
715 DBUG_ASSERT(0);
716 }
717
718 return qualifier_str;
719 }
720
721 /*
722 Formats the string and returns the column family name assignment part for a
723 specific partition.
724 */
gen_cf_name_qualifier_for_partition(const std::string & prefix)725 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
726 const std::string &prefix) {
727 DBUG_ASSERT(!prefix.empty());
728
729 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
730 RDB_QUALIFIER_VALUE_SEP;
731 }
732
gen_ttl_duration_qualifier_for_partition(const std::string & prefix)733 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
734 const std::string &prefix) {
735 DBUG_ASSERT(!prefix.empty());
736
737 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
738 RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
739 }
740
gen_ttl_col_qualifier_for_partition(const std::string & prefix)741 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
742 const std::string &prefix) {
743 DBUG_ASSERT(!prefix.empty());
744
745 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
746 RDB_QUALIFIER_VALUE_SEP;
747 }
748
parse_comment_for_qualifier(const std::string & comment,const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,bool * per_part_match_found,const char * const qualifier)749 const std::string Rdb_key_def::parse_comment_for_qualifier(
750 const std::string &comment, const TABLE *const table_arg,
751 const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
752 const char *const qualifier) {
753 DBUG_ASSERT(table_arg != nullptr);
754 DBUG_ASSERT(tbl_def_arg != nullptr);
755 DBUG_ASSERT(per_part_match_found != nullptr);
756 DBUG_ASSERT(qualifier != nullptr);
757
758 std::string empty_result;
759
760 // Flag which marks if partition specific options were found.
761 *per_part_match_found = false;
762
763 if (comment.empty()) {
764 return empty_result;
765 }
766
767 // Let's fetch the comment for a index and check if there's a custom key
768 // name specified for a partition we are handling.
769 std::vector<std::string> v =
770 myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
771
772 std::string search_str = gen_qualifier_for_table(qualifier);
773
774 // If table has partitions then we need to check if user has requested
775 // qualifiers on a per partition basis.
776 //
777 // NOTE: this means if you specify a qualifier for a specific partition it
778 // will take precedence the 'table level' qualifier if one exists.
779 std::string search_str_part;
780 if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) {
781 std::string partition_name = tbl_def_arg->base_partition();
782 DBUG_ASSERT(!partition_name.empty());
783 search_str_part = gen_qualifier_for_table(qualifier, partition_name);
784 }
785
786 DBUG_ASSERT(!search_str.empty());
787
788 // Basic O(N) search for a matching assignment. At most we expect maybe
789 // ten or so elements here.
790 if (!search_str_part.empty()) {
791 for (const auto &it : v) {
792 if (it.substr(0, search_str_part.length()) == search_str_part) {
793 // We found a prefix match. Try to parse it as an assignment.
794 std::vector<std::string> tokens =
795 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
796
797 // We found a custom qualifier, it was in the form we expected it to be.
798 // Return that instead of whatever we initially wanted to return. In
799 // a case below the `foo` part will be returned to the caller.
800 //
801 // p3_cfname=foo
802 //
803 // If no value was specified then we'll return an empty string which
804 // later gets translated into using a default CF.
805 if (tokens.size() == 2) {
806 *per_part_match_found = true;
807 return tokens[1];
808 } else {
809 return empty_result;
810 }
811 }
812 }
813 }
814
815 // Do this loop again, this time searching for 'table level' qualifiers if we
816 // didn't find any partition level qualifiers above.
817 for (const auto &it : v) {
818 if (it.substr(0, search_str.length()) == search_str) {
819 std::vector<std::string> tokens =
820 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
821 if (tokens.size() == 2) {
822 return tokens[1];
823 } else {
824 return empty_result;
825 }
826 }
827 }
828
829 // If we didn't find any partitioned/non-partitioned qualifiers, return an
830 // empty string.
831 return empty_result;
832 }
833
834 /**
835 Read a memcmp key part from a slice using the passed in reader.
836
837 Returns -1 if field was null, 1 if error, 0 otherwise.
838 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const839 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
840 Rdb_string_reader *reader,
841 const uint part_num) const {
842 /* It is impossible to unpack the column. Skip it. */
843 if (m_pack_info[part_num].m_maybe_null) {
844 const char *nullp;
845 if (!(nullp = reader->read(1))) return 1;
846 if (*nullp == 0) {
847 /* This is a NULL value */
848 return -1;
849 } else {
850 /* If NULL marker is not '0', it can be only '1' */
851 if (*nullp != 1) return 1;
852 }
853 }
854
855 Rdb_field_packing *fpi = &m_pack_info[part_num];
856 DBUG_ASSERT(table_arg->s != nullptr);
857
858 bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
859 (table_arg->s->primary_key == MAX_INDEXES);
860 Field *field = nullptr;
861 if (!is_hidden_pk_part) {
862 field = fpi->get_field_in_table(table_arg);
863 }
864 if ((fpi->m_skip_func)(fpi, field, reader)) {
865 return 1;
866 }
867 return 0;
868 }
869
870 /**
871 Get a mem-comparable form of Primary Key from mem-comparable form of this key
872
873 @param
874 pk_descr Primary Key descriptor
875 key Index tuple from this key in mem-comparable form
876 pk_buffer OUT Put here mem-comparable form of the Primary Key.
877
878 @note
879 It may or may not be possible to restore primary key columns to their
880 mem-comparable form. To handle all cases, this function copies mem-
881 comparable forms directly.
882
883 RocksDB SE supports "Extended keys". This means that PK columns are present
884 at the end of every key. If the key already includes PK columns, then
885 these columns are not present at the end of the key.
886
887 Because of the above, we copy each primary key column.
888
889 @todo
890 If we checked crc32 checksums in this function, we would catch some CRC
891 violations that we currently don't. On the other hand, there is a broader
892 set of queries for which we would check the checksum twice.
893 */
894
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const895 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
896 const Rdb_key_def &pk_descr,
897 const rocksdb::Slice *const key,
898 uchar *const pk_buffer) const {
899 DBUG_ASSERT(table != nullptr);
900 DBUG_ASSERT(key != nullptr);
901 DBUG_ASSERT(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
902 DBUG_ASSERT(pk_buffer);
903
904 uint size = 0;
905 uchar *buf = pk_buffer;
906 DBUG_ASSERT(m_pk_key_parts);
907
908 /* Put the PK number */
909 rdb_netbuf_store_index(buf, pk_descr.m_index_number);
910 buf += INDEX_NUMBER_SIZE;
911 size += INDEX_NUMBER_SIZE;
912
913 const char *start_offs[MAX_REF_PARTS];
914 const char *end_offs[MAX_REF_PARTS];
915 int pk_key_part;
916 uint i;
917 Rdb_string_reader reader(key);
918
919 // Skip the index number
920 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
921
922 for (i = 0; i < m_key_parts; i++) {
923 if ((pk_key_part = m_pk_part_no[i]) != -1) {
924 start_offs[pk_key_part] = reader.get_current_ptr();
925 }
926
927 if (read_memcmp_key_part(table, &reader, i) > 0) {
928 return RDB_INVALID_KEY_LEN;
929 }
930
931 if (pk_key_part != -1) {
932 end_offs[pk_key_part] = reader.get_current_ptr();
933 }
934 }
935
936 for (i = 0; i < m_pk_key_parts; i++) {
937 const uint part_size = end_offs[i] - start_offs[i];
938 memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
939 buf += part_size;
940 size += part_size;
941 }
942
943 return size;
944 }
945
946 /**
947 Get a mem-comparable form of Secondary Key from mem-comparable form of this
948 key, without the extended primary key tail.
949
950 @param
951 key Index tuple from this key in mem-comparable form
952 sk_buffer OUT Put here mem-comparable form of the Secondary Key.
953 n_null_fields OUT Put number of null fields contained within sk entry
954 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const955 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
956 const rocksdb::Slice &key,
957 uchar *sk_buffer,
958 uint *n_null_fields) const {
959 DBUG_ASSERT(table != nullptr);
960 DBUG_ASSERT(sk_buffer != nullptr);
961 DBUG_ASSERT(n_null_fields != nullptr);
962 DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
963
964 uchar *buf = sk_buffer;
965
966 int res;
967 Rdb_string_reader reader(&key);
968 const char *start = reader.get_current_ptr();
969
970 // Skip the index number
971 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
972
973 for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
974 if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
975 return RDB_INVALID_KEY_LEN;
976 } else if (res == -1) {
977 (*n_null_fields)++;
978 }
979 }
980
981 uint sk_memcmp_len = reader.get_current_ptr() - start;
982 memcpy(buf, start, sk_memcmp_len);
983 return sk_memcmp_len;
984 }
985
986 /**
987 Convert index tuple into storage (i.e. mem-comparable) format
988
989 @detail
990 Currently this is done by unpacking into record_buffer and then
991 packing index columns into storage format.
992
993 @param pack_buffer Temporary area for packing varchar columns. Its
994 size is at least max_storage_fmt_length() bytes.
995 */
996
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,uchar * const record_buffer,const uchar * const key_tuple,const key_part_map & keypart_map) const997 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
998 uchar *const packed_tuple,
999 uchar *const record_buffer,
1000 const uchar *const key_tuple,
1001 const key_part_map &keypart_map) const {
1002 DBUG_ASSERT(tbl != nullptr);
1003 DBUG_ASSERT(pack_buffer != nullptr);
1004 DBUG_ASSERT(packed_tuple != nullptr);
1005 DBUG_ASSERT(key_tuple != nullptr);
1006
1007 /* We were given a record in KeyTupleFormat. First, save it to record */
1008 const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
1009 key_restore(record_buffer, key_tuple, &tbl->key_info[m_keyno], key_len);
1010
1011 uint n_used_parts = my_count_bits(keypart_map);
1012 if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0; // Full key is used
1013
1014 /* Then, convert the record into a mem-comparable form */
1015 return pack_record(tbl, pack_buffer, record_buffer, packed_tuple, nullptr,
1016 false, 0, n_used_parts);
1017 }
1018
1019 /**
1020 @brief
1021 Check if "unpack info" data includes checksum.
1022
1023 @detail
1024 This is used only by CHECK TABLE to count the number of rows that have
1025 checksums.
1026 */
1027
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)1028 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
1029 size_t size = unpack_info.size();
1030 if (size == 0) {
1031 return false;
1032 }
1033 const uchar *ptr = (const uchar *)unpack_info.data();
1034
1035 // Skip unpack info if present.
1036 if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
1037 const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
1038 SHIP_ASSERT(size >= skip_len);
1039
1040 size -= skip_len;
1041 ptr += skip_len;
1042 }
1043
1044 return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
1045 }
1046
1047 /*
1048 @return Number of bytes that were changed
1049 */
successor(uchar * const packed_tuple,const uint len)1050 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
1051 DBUG_ASSERT(packed_tuple != nullptr);
1052
1053 int changed = 0;
1054 uchar *p = packed_tuple + len - 1;
1055 for (; p > packed_tuple; p--) {
1056 changed++;
1057 if (*p != uchar(0xFF)) {
1058 *p = *p + 1;
1059 break;
1060 }
1061 *p = '\0';
1062 }
1063 return changed;
1064 }
1065
1066 /*
1067 @return Number of bytes that were changed
1068 */
predecessor(uchar * const packed_tuple,const uint len)1069 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
1070 DBUG_ASSERT(packed_tuple != nullptr);
1071
1072 int changed = 0;
1073 uchar *p = packed_tuple + len - 1;
1074 for (; p > packed_tuple; p--) {
1075 changed++;
1076 if (*p != uchar(0x00)) {
1077 *p = *p - 1;
1078 break;
1079 }
1080 *p = 0xFF;
1081 }
1082 return changed;
1083 }
1084
1085 static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
1086 {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
1087 {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
1088
1089 /*
1090 @return The length in bytes of the header specified by the given tag
1091 */
get_unpack_header_size(char tag)1092 size_t Rdb_key_def::get_unpack_header_size(char tag) {
1093 DBUG_ASSERT(is_unpack_data_tag(tag));
1094 return UNPACK_HEADER_SIZES.at(tag);
1095 }
1096
1097 /*
1098 Get a bitmap indicating which varchar columns must be covered for this
1099 lookup to be covered. If the bitmap is a subset of the covered bitmap, then
1100 the lookup is covered. If it can already be determined that the lookup is
1101 not covered, map->bitmap will be set to null.
1102 */
get_lookup_bitmap(const TABLE * table,MY_BITMAP * map) const1103 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
1104 DBUG_ASSERT(map->bitmap == nullptr);
1105 bitmap_init(map, nullptr, MAX_REF_PARTS, false);
1106 uint curr_bitmap_pos = 0;
1107
1108 // Indicates which columns in the read set might be covered.
1109 MY_BITMAP maybe_covered_bitmap;
1110 bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
1111
1112 for (uint i = 0; i < m_key_parts; i++) {
1113 if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
1114 continue;
1115 }
1116
1117 Field *const field = m_pack_info[i].get_field_in_table(table);
1118
1119 // Columns which are always covered are not stored in the covered bitmap so
1120 // we can ignore them here too.
1121 if (m_pack_info[i].m_covered &&
1122 bitmap_is_set(table->read_set, field->field_index)) {
1123 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1124 continue;
1125 }
1126
1127 switch (field->real_type()) {
1128 // This type may be covered depending on the record. If it was requested,
1129 // we require the covered bitmap to have this bit set.
1130 case MYSQL_TYPE_VARCHAR:
1131 if (curr_bitmap_pos < MAX_REF_PARTS) {
1132 if (bitmap_is_set(table->read_set, field->field_index)) {
1133 bitmap_set_bit(map, curr_bitmap_pos);
1134 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1135 }
1136 curr_bitmap_pos++;
1137 } else {
1138 bitmap_free(&maybe_covered_bitmap);
1139 bitmap_free(map);
1140 return;
1141 }
1142 break;
1143 // This column is a type which is never covered. If it was requested, we
1144 // know this lookup will never be covered.
1145 default:
1146 if (bitmap_is_set(table->read_set, field->field_index)) {
1147 bitmap_free(&maybe_covered_bitmap);
1148 bitmap_free(map);
1149 return;
1150 }
1151 break;
1152 }
1153 }
1154
1155 // If there are columns which are not covered in the read set, the lookup
1156 // can't be covered.
1157 if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
1158 bitmap_free(map);
1159 }
1160 bitmap_free(&maybe_covered_bitmap);
1161 }
1162
1163 /*
1164 Return true if for this secondary index
1165 - All of the requested columns are in the index
1166 - All values for columns that are prefix-only indexes are shorter or equal
1167 in length to the prefix
1168 */
covers_lookup(const rocksdb::Slice * const unpack_info,const MY_BITMAP * const lookup_bitmap) const1169 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
1170 const MY_BITMAP *const lookup_bitmap) const {
1171 DBUG_ASSERT(lookup_bitmap != nullptr);
1172 if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
1173 return false;
1174 }
1175
1176 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1177
1178 // Check if this unpack_info has a covered_bitmap
1179 const char *unpack_header = unp_reader.get_current_ptr();
1180 const bool has_covered_unpack_info =
1181 unp_reader.remaining_bytes() &&
1182 unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
1183 if (!has_covered_unpack_info ||
1184 !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
1185 return false;
1186 }
1187
1188 MY_BITMAP covered_bitmap;
1189 my_bitmap_map covered_bits;
1190 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1191 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1192 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1193 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1194
1195 return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
1196 }
1197
1198 /* Indicates that all key parts can be unpacked to cover a secondary lookup */
can_cover_lookup() const1199 bool Rdb_key_def::can_cover_lookup() const {
1200 for (uint i = 0; i < m_key_parts; i++) {
1201 if (!m_pack_info[i].m_covered) return false;
1202 }
1203 return true;
1204 }
1205
pack_field(Field * const field,Rdb_field_packing * pack_info,uchar * tuple,uchar * const packed_tuple,uchar * const pack_buffer,Rdb_string_writer * const unpack_info,uint * const n_null_fields) const1206 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
1207 uchar *tuple, uchar *const packed_tuple,
1208 uchar *const pack_buffer,
1209 Rdb_string_writer *const unpack_info,
1210 uint *const n_null_fields) const {
1211 if (field->real_maybe_null()) {
1212 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
1213 if (field->is_real_null()) {
1214 /* NULL value. store '\0' so that it sorts before non-NULL values */
1215 *tuple++ = 0;
1216 /* That's it, don't store anything else */
1217 if (n_null_fields) (*n_null_fields)++;
1218 return tuple;
1219 } else {
1220 /* Not a NULL value. Store '1' */
1221 *tuple++ = 1;
1222 }
1223 }
1224
1225 const bool create_unpack_info =
1226 (unpack_info && // we were requested to generate unpack_info
1227 pack_info->uses_unpack_info()); // and this keypart uses it
1228 Rdb_pack_field_context pack_ctx(unpack_info);
1229
1230 // Set the offset for methods which do not take an offset as an argument
1231 DBUG_ASSERT(
1232 is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
1233
1234 (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
1235
1236 /* Make "unpack info" to be stored in the value */
1237 if (create_unpack_info) {
1238 (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
1239 &pack_ctx);
1240 }
1241
1242 return tuple;
1243 }
1244
1245 /**
1246 Get index columns from the record and pack them into mem-comparable form.
1247
1248 @param
1249 tbl Table we're working on
1250 record IN Record buffer with fields in table->record format
1251 pack_buffer IN Temporary area for packing varchars. The size is
1252 at least max_storage_fmt_length() bytes.
1253 packed_tuple OUT Key in the mem-comparable form
1254 unpack_info OUT Unpack data
1255 unpack_info_len OUT Unpack data length
1256 n_key_parts Number of keyparts to process. 0 means all of them.
1257 n_null_fields OUT Number of key fields with NULL value.
1258 ttl_bytes IN Previous ttl bytes from old record for update case or
1259 current ttl bytes from just packed primary key/value
1260 @detail
1261 Some callers do not need the unpack information, they can pass
1262 unpack_info=nullptr, unpack_info_len=nullptr.
1263
1264 @return
1265 Length of the packed tuple
1266 */
1267
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool should_store_row_debug_checksums,const longlong hidden_pk_id,uint n_key_parts,uint * const n_null_fields,const char * const ttl_bytes) const1268 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
1269 const uchar *const record,
1270 uchar *const packed_tuple,
1271 Rdb_string_writer *const unpack_info,
1272 const bool should_store_row_debug_checksums,
1273 const longlong hidden_pk_id, uint n_key_parts,
1274 uint *const n_null_fields,
1275 const char *const ttl_bytes) const {
1276 DBUG_ASSERT(tbl != nullptr);
1277 DBUG_ASSERT(pack_buffer != nullptr);
1278 DBUG_ASSERT(record != nullptr);
1279 DBUG_ASSERT(packed_tuple != nullptr);
1280 // Checksums for PKs are made when record is packed.
1281 // We should never attempt to make checksum just from PK values
1282 DBUG_ASSERT_IMP(should_store_row_debug_checksums,
1283 (m_index_type == INDEX_TYPE_SECONDARY));
1284
1285 uchar *tuple = packed_tuple;
1286 size_t unpack_start_pos = size_t(-1);
1287 size_t unpack_len_pos = size_t(-1);
1288 size_t covered_bitmap_pos = size_t(-1);
1289 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1290
1291 rdb_netbuf_store_index(tuple, m_index_number);
1292 tuple += INDEX_NUMBER_SIZE;
1293
1294 // If n_key_parts is 0, it means all columns.
1295 // The following includes the 'extended key' tail.
1296 // The 'extended key' includes primary key. This is done to 'uniqify'
1297 // non-unique indexes
1298 const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1299
1300 // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1301 // hidden key part. So we skip it (its always 1 part).
1302 if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
1303 n_key_parts = m_key_parts - 1;
1304 } else if (use_all_columns) {
1305 n_key_parts = m_key_parts;
1306 }
1307
1308 if (n_null_fields) *n_null_fields = 0;
1309
1310 // Check if we need a covered bitmap. If it is certain that all key parts are
1311 // covering, we don't need one.
1312 bool store_covered_bitmap = false;
1313 if (unpack_info && use_covered_bitmap_format()) {
1314 for (uint i = 0; i < n_key_parts; i++) {
1315 if (!m_pack_info[i].m_covered) {
1316 store_covered_bitmap = true;
1317 break;
1318 }
1319 }
1320 }
1321
1322 const char tag =
1323 store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1324
1325 if (unpack_info) {
1326 unpack_info->clear();
1327
1328 if (m_index_type == INDEX_TYPE_SECONDARY &&
1329 m_total_index_flags_length > 0) {
1330 // Reserve space for index flag fields
1331 unpack_info->allocate(m_total_index_flags_length);
1332
1333 // Insert TTL timestamp
1334 if (has_ttl() && ttl_bytes) {
1335 write_index_flag_field(unpack_info,
1336 reinterpret_cast<const uchar *>(ttl_bytes),
1337 Rdb_key_def::TTL_FLAG);
1338 }
1339 }
1340
1341 unpack_start_pos = unpack_info->get_current_pos();
1342 unpack_info->write_uint8(tag);
1343 unpack_len_pos = unpack_info->get_current_pos();
1344 // we don't know the total length yet, so write a zero
1345 unpack_info->write_uint16(0);
1346
1347 if (store_covered_bitmap) {
1348 // Reserve two bytes for the covered bitmap. This will store, for key
1349 // parts which are not always covering, whether or not it is covering
1350 // for this record.
1351 covered_bitmap_pos = unpack_info->get_current_pos();
1352 unpack_info->write_uint16(0);
1353 }
1354 }
1355
1356 MY_BITMAP covered_bitmap;
1357 my_bitmap_map covered_bits;
1358 uint curr_bitmap_pos = 0;
1359 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1360
1361 for (uint i = 0; i < n_key_parts; i++) {
1362 // Fill hidden pk id into the last key part for secondary keys for tables
1363 // with no pk
1364 if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1365 m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1366 break;
1367 }
1368
1369 Field *const field = m_pack_info[i].get_field_in_table(tbl);
1370 DBUG_ASSERT(field != nullptr);
1371
1372 uint field_offset = field->ptr - tbl->record[0];
1373 uint null_offset = field->null_offset(tbl->record[0]);
1374 bool maybe_null = field->real_maybe_null();
1375
1376 field->move_field(
1377 const_cast<uchar *>(record) + field_offset,
1378 maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
1379 field->null_bit);
1380 // WARNING! Don't return without restoring field->ptr and field->null_ptr
1381
1382 tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1383 unpack_info, n_null_fields);
1384
1385 // If this key part is a prefix of a VARCHAR field, check if it's covered.
1386 if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1387 !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1388 size_t data_length = field->data_length();
1389 uint16 key_length;
1390 if (m_pk_part_no[i] == (uint)-1) {
1391 key_length = tbl->key_info[get_keyno()].key_part[i].length;
1392 } else {
1393 key_length =
1394 tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1395 }
1396
1397 if (m_pack_info[i].m_unpack_func != nullptr &&
1398 data_length <= key_length) {
1399 bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1400 }
1401 curr_bitmap_pos++;
1402 }
1403
1404 // Restore field->ptr and field->null_ptr
1405 field->move_field(tbl->record[0] + field_offset,
1406 maybe_null ? tbl->record[0] + null_offset : nullptr,
1407 field->null_bit);
1408 }
1409
1410 if (unpack_info) {
1411 const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1412 DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
1413
1414 // Don't store the unpack_info if it has only the header (that is, there's
1415 // no meaningful content).
1416 // Primary Keys are special: for them, store the unpack_info even if it's
1417 // empty (provided m_maybe_unpack_info==true, see
1418 // ha_rocksdb::convert_record_to_storage_format)
1419 if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1420 if (len == get_unpack_header_size(tag) && !covered_bits) {
1421 unpack_info->truncate(unpack_start_pos);
1422 } else if (store_covered_bitmap) {
1423 unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1424 }
1425 } else {
1426 unpack_info->write_uint16_at(unpack_len_pos, len);
1427 }
1428
1429 //
1430 // Secondary keys have key and value checksums in the value part
1431 // Primary key is a special case (the value part has non-indexed columns),
1432 // so the checksums are computed and stored by
1433 // ha_rocksdb::convert_record_to_storage_format
1434 //
1435 if (should_store_row_debug_checksums) {
1436 const uint32_t key_crc32 =
1437 my_checksum(0, packed_tuple, tuple - packed_tuple);
1438 const uint32_t val_crc32 =
1439 my_checksum(0, unpack_info->ptr(), unpack_info->get_current_pos());
1440
1441 unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1442 unpack_info->write_uint32(key_crc32);
1443 unpack_info->write_uint32(val_crc32);
1444 }
1445 }
1446
1447 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1448
1449 return tuple - packed_tuple;
1450 }
1451
1452 /**
1453 Pack the hidden primary key into mem-comparable form.
1454
1455 @param
1456 tbl Table we're working on
1457 hidden_pk_id IN New value to be packed into key
1458 packed_tuple OUT Key in the mem-comparable form
1459
1460 @return
1461 Length of the packed tuple
1462 */
1463
pack_hidden_pk(const longlong hidden_pk_id,uchar * const packed_tuple) const1464 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
1465 uchar *const packed_tuple) const {
1466 DBUG_ASSERT(packed_tuple != nullptr);
1467
1468 uchar *tuple = packed_tuple;
1469 rdb_netbuf_store_index(tuple, m_index_number);
1470 tuple += INDEX_NUMBER_SIZE;
1471 DBUG_ASSERT(m_key_parts == 1);
1472 DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
1473 m_pack_info[0].m_max_image_len));
1474
1475 m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1476
1477 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1478 return tuple - packed_tuple;
1479 }
1480
1481 /*
1482 Function of type rdb_index_field_pack_t
1483 */
1484
pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1485 void Rdb_key_def::pack_with_make_sort_key(
1486 Rdb_field_packing *const fpi, Field *const field,
1487 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1488 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1489 DBUG_ASSERT(fpi != nullptr);
1490 DBUG_ASSERT(field != nullptr);
1491 DBUG_ASSERT(dst != nullptr);
1492 DBUG_ASSERT(*dst != nullptr);
1493
1494 const int max_len = fpi->m_max_image_len;
1495 MY_BITMAP*old_map;
1496
1497 old_map= dbug_tmp_use_all_columns(field->table,
1498 &field->table->read_set);
1499 field->sort_string(*dst, max_len);
1500 dbug_tmp_restore_column_map(&field->table->read_set, old_map);
1501 *dst += max_len;
1502 }
1503
1504 /*
1505 Compares two keys without unpacking
1506
1507 @detail
1508 @return
1509 0 - Ok. column_index is the index of the first column which is different.
1510 -1 if two kes are equal
1511 1 - Data format error.
1512 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const1513 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
1514 const rocksdb::Slice *key2,
1515 std::size_t *const column_index) const {
1516 DBUG_ASSERT(key1 != nullptr);
1517 DBUG_ASSERT(key2 != nullptr);
1518 DBUG_ASSERT(column_index != nullptr);
1519
1520 // the caller should check the return value and
1521 // not rely on column_index being valid
1522 *column_index = 0xbadf00d;
1523
1524 Rdb_string_reader reader1(key1);
1525 Rdb_string_reader reader2(key2);
1526
1527 // Skip the index number
1528 if ((!reader1.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1529
1530 if ((!reader2.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1531
1532 for (uint i = 0; i < m_key_parts; i++) {
1533 const Rdb_field_packing *const fpi = &m_pack_info[i];
1534 if (fpi->m_maybe_null) {
1535 const auto nullp1 = reader1.read(1);
1536 const auto nullp2 = reader2.read(1);
1537
1538 if (nullp1 == nullptr || nullp2 == nullptr) {
1539 return HA_EXIT_FAILURE;
1540 }
1541
1542 if (*nullp1 != *nullp2) {
1543 *column_index = i;
1544 return HA_EXIT_SUCCESS;
1545 }
1546
1547 if (*nullp1 == 0) {
1548 /* This is a NULL value */
1549 continue;
1550 }
1551 }
1552
1553 const auto before_skip1 = reader1.get_current_ptr();
1554 const auto before_skip2 = reader2.get_current_ptr();
1555 DBUG_ASSERT(fpi->m_skip_func);
1556 if ((fpi->m_skip_func)(fpi, nullptr, &reader1)) {
1557 return HA_EXIT_FAILURE;
1558 }
1559 if ((fpi->m_skip_func)(fpi, nullptr, &reader2)) {
1560 return HA_EXIT_FAILURE;
1561 }
1562 const auto size1 = reader1.get_current_ptr() - before_skip1;
1563 const auto size2 = reader2.get_current_ptr() - before_skip2;
1564 if (size1 != size2) {
1565 *column_index = i;
1566 return HA_EXIT_SUCCESS;
1567 }
1568
1569 if (memcmp(before_skip1, before_skip2, size1) != 0) {
1570 *column_index = i;
1571 return HA_EXIT_SUCCESS;
1572 }
1573 }
1574
1575 *column_index = m_key_parts;
1576 return HA_EXIT_SUCCESS;
1577 }
1578
1579 /*
1580 @brief
1581 Given a zero-padded key, determine its real key length
1582
1583 @detail
1584 Fixed-size skip functions just read.
1585 */
1586
key_length(const TABLE * const table,const rocksdb::Slice & key) const1587 size_t Rdb_key_def::key_length(const TABLE *const table,
1588 const rocksdb::Slice &key) const {
1589 DBUG_ASSERT(table != nullptr);
1590
1591 Rdb_string_reader reader(&key);
1592
1593 if ((!reader.read(INDEX_NUMBER_SIZE))) {
1594 return size_t(-1);
1595 }
1596 for (uint i = 0; i < m_key_parts; i++) {
1597 const Rdb_field_packing *fpi = &m_pack_info[i];
1598 const Field *field = nullptr;
1599 if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY) {
1600 field = fpi->get_field_in_table(table);
1601 }
1602 if ((fpi->m_skip_func)(fpi, field, &reader)) {
1603 return size_t(-1);
1604 }
1605 }
1606 return key.size() - reader.remaining_bytes();
1607 }
1608
1609 /*
1610 Take mem-comparable form and unpack_info and unpack it to Table->record
1611
1612 @detail
1613 not all indexes support this
1614
1615 @return
1616 HA_EXIT_SUCCESS OK
1617 other HA_ERR error code
1618 */
1619
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool verify_row_debug_checksums) const1620 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
1621 const rocksdb::Slice *const packed_key,
1622 const rocksdb::Slice *const unpack_info,
1623 const bool verify_row_debug_checksums) const {
1624 Rdb_string_reader reader(packed_key);
1625 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1626
1627 // There is no checksuming data after unpack_info for primary keys, because
1628 // the layout there is different. The checksum is verified in
1629 // ha_rocksdb::convert_record_from_storage_format instead.
1630 DBUG_ASSERT_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
1631 !verify_row_debug_checksums);
1632
1633 // Skip the index number
1634 if ((!reader.read(INDEX_NUMBER_SIZE))) {
1635 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1636 }
1637
1638 // For secondary keys, we expect the value field to contain index flags,
1639 // unpack data, and checksum data in that order. One or all can be missing,
1640 // but they cannot be reordered.
1641 if (unp_reader.remaining_bytes()) {
1642 if (m_index_type == INDEX_TYPE_SECONDARY &&
1643 m_total_index_flags_length > 0 &&
1644 !unp_reader.read(m_total_index_flags_length)) {
1645 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1646 }
1647 }
1648
1649 const char *unpack_header = unp_reader.get_current_ptr();
1650 bool has_unpack_info =
1651 unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
1652 if (has_unpack_info) {
1653 if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
1654 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1655 }
1656 }
1657
1658 // Read the covered bitmap
1659 MY_BITMAP covered_bitmap;
1660 my_bitmap_map covered_bits;
1661 bool has_covered_bitmap =
1662 has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
1663 if (has_covered_bitmap) {
1664 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1665 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1666 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1667 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1668 }
1669
1670 int err = HA_EXIT_SUCCESS;
1671
1672
1673 Rdb_key_field_iterator iter(
1674 this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
1675 has_covered_bitmap ? &covered_bitmap : nullptr, buf);
1676 while (iter.has_next()) {
1677 err = iter.next();
1678 if (err) {
1679 return err;
1680 }
1681 }
1682
1683 /*
1684 Check checksum values if present
1685 */
1686 const char *ptr;
1687 if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
1688 if (verify_row_debug_checksums) {
1689 uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
1690 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1691 const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
1692 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1693
1694 const uint32_t computed_key_chksum =
1695 my_checksum(0, packed_key->data(), packed_key->size());
1696 const uint32_t computed_val_chksum =
1697 my_checksum(0, unpack_info->data(),
1698 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1699
1700 DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
1701 stored_key_chksum++;);
1702
1703 if (stored_key_chksum != computed_key_chksum) {
1704 report_checksum_mismatch(true, packed_key->data(), packed_key->size());
1705 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1706 }
1707
1708 if (stored_val_chksum != computed_val_chksum) {
1709 report_checksum_mismatch(false, unpack_info->data(),
1710 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1711 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1712 }
1713 } else {
1714 /* The checksums are present but we are not checking checksums */
1715 }
1716 }
1717
1718 if (reader.remaining_bytes()) return HA_ERR_ROCKSDB_CORRUPT_DATA;
1719
1720 return HA_EXIT_SUCCESS;
1721 }
1722
table_has_hidden_pk(const TABLE * const table)1723 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
1724 return table->s->primary_key == MAX_INDEXES;
1725 }
1726
report_checksum_mismatch(const bool is_key,const char * const data,const size_t data_size) const1727 void Rdb_key_def::report_checksum_mismatch(const bool is_key,
1728 const char *const data,
1729 const size_t data_size) const {
1730 // NO_LINT_DEBUG
1731 sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
1732 is_key ? "key" : "value", get_index_number());
1733
1734 const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
1735 // NO_LINT_DEBUG
1736 sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
1737 (uint64_t)data_size, buf.c_str());
1738
1739 my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1740 }
1741
index_format_min_check(const int pk_min,const int sk_min) const1742 bool Rdb_key_def::index_format_min_check(const int pk_min,
1743 const int sk_min) const {
1744 switch (m_index_type) {
1745 case INDEX_TYPE_PRIMARY:
1746 case INDEX_TYPE_HIDDEN_PRIMARY:
1747 return (m_kv_format_version >= pk_min);
1748 case INDEX_TYPE_SECONDARY:
1749 return (m_kv_format_version >= sk_min);
1750 default:
1751 DBUG_ASSERT(0);
1752 return false;
1753 }
1754 }
1755
1756 ///////////////////////////////////////////////////////////////////////////////////////////
1757 // Rdb_field_packing
1758 ///////////////////////////////////////////////////////////////////////////////////////////
1759
1760 /*
1761 Function of type rdb_index_field_skip_t
1762 */
1763
skip_max_length(const Rdb_field_packing * const fpi,const Field * const field MY_ATTRIBUTE ((__unused__)),Rdb_string_reader * const reader)1764 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
1765 const Field *const field
1766 MY_ATTRIBUTE((__unused__)),
1767 Rdb_string_reader *const reader) {
1768 if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
1769 return HA_EXIT_SUCCESS;
1770 }
1771
1772 /*
1773 (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1774 split in the middle of an UTF-8 character. See the implementation of
1775 unpack_binary_or_utf8_varchar.
1776 */
1777 #define RDB_ESCAPE_LENGTH 9
1778 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
1779 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1780 "RDB_ESCAPE_LENGTH-1 must be even.");
1781
1782 #define RDB_ENCODED_SIZE(len) \
1783 ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
1784 RDB_ESCAPE_LENGTH
1785
1786 #define RDB_LEGACY_ENCODED_SIZE(len) \
1787 ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
1788 RDB_LEGACY_ESCAPE_LENGTH
1789
1790 /*
1791 Function of type rdb_index_field_skip_t
1792 */
1793
skip_variable_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1794 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
1795 const Field *const field,
1796 Rdb_string_reader *const reader) {
1797 const uchar *ptr;
1798 bool finished = false;
1799
1800 size_t dst_len; /* How much data can be there */
1801 if (field) {
1802 const Field_varstring *const field_var =
1803 static_cast<const Field_varstring *>(field);
1804 dst_len = field_var->pack_length() - field_var->length_bytes;
1805 } else {
1806 dst_len = UINT_MAX;
1807 }
1808
1809 bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
1810
1811 /* Decode the length-emitted encoding here */
1812 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1813 uint used_bytes;
1814
1815 /* See pack_with_varchar_encoding. */
1816 if (use_legacy_format) {
1817 used_bytes = calc_unpack_legacy_variable_format(
1818 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1819 } else {
1820 used_bytes =
1821 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1822 }
1823
1824 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
1825 return HA_EXIT_FAILURE; // Corruption in the data
1826 }
1827
1828 if (finished) {
1829 break;
1830 }
1831
1832 dst_len -= used_bytes;
1833 }
1834
1835 if (!finished) {
1836 return HA_EXIT_FAILURE;
1837 }
1838
1839 return HA_EXIT_SUCCESS;
1840 }
1841
1842 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1843 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1844 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1845
1846 /*
1847 Skip a keypart that uses Variable-Length Space-Padded encoding
1848 */
1849
skip_variable_space_pad(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1850 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
1851 const Field *const field,
1852 Rdb_string_reader *const reader) {
1853 const uchar *ptr;
1854 bool finished = false;
1855
1856 size_t dst_len = UINT_MAX; /* How much data can be there */
1857
1858 if (field) {
1859 const Field_varstring *const field_var =
1860 static_cast<const Field_varstring *>(field);
1861 dst_len = field_var->pack_length() - field_var->length_bytes;
1862 }
1863
1864 /* Decode the length-emitted encoding here */
1865 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1866 // See pack_with_varchar_space_pad
1867 const uchar c = ptr[fpi->m_segment_size - 1];
1868 if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1869 // This is the last segment
1870 finished = true;
1871 break;
1872 } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1873 c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1874 // This is not the last segment
1875 if ((fpi->m_segment_size - 1) > dst_len) {
1876 // The segment is full of data but the table field can't hold that
1877 // much! This must be data corruption.
1878 return HA_EXIT_FAILURE;
1879 }
1880 dst_len -= (fpi->m_segment_size - 1);
1881 } else {
1882 // Encountered a value that's none of the VARCHAR_CMP* constants
1883 // It's data corruption.
1884 return HA_EXIT_FAILURE;
1885 }
1886 }
1887 return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1888 }
1889
1890 /*
1891 Function of type rdb_index_field_unpack_t
1892 */
1893
unpack_integer(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))1894 int Rdb_key_def::unpack_integer(
1895 Rdb_field_packing *const fpi, Field *const field, uchar *const to,
1896 Rdb_string_reader *const reader,
1897 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
1898 const int length = fpi->m_max_image_len;
1899
1900 const uchar *from;
1901 if (!(from = (const uchar *)reader->read(length))) {
1902 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1903 }
1904
1905 #ifdef WORDS_BIGENDIAN
1906 {
1907 if (static_cast<Field_num *>(field)->unsigned_flag) {
1908 to[0] = from[0];
1909 } else {
1910 to[0] = static_cast<char>(from[0] ^ 128); // Reverse the sign bit.
1911 }
1912 memcpy(to + 1, from + 1, length - 1);
1913 }
1914 #else
1915 {
1916 const int sign_byte = from[0];
1917 if (static_cast<Field_num *>(field)->unsigned_flag) {
1918 to[length - 1] = sign_byte;
1919 } else {
1920 to[length - 1] =
1921 static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
1922 }
1923 for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
1924 }
1925 #endif
1926 return UNPACK_SUCCESS;
1927 }
1928
1929 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)1930 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1931 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1932 // A few systems store the most-significant _word_ first on little-endian
1933 dst[0] = src[3];
1934 dst[1] = src[2];
1935 dst[2] = src[1];
1936 dst[3] = src[0];
1937 dst[4] = src[7];
1938 dst[5] = src[6];
1939 dst[6] = src[5];
1940 dst[7] = src[4];
1941 #else
1942 dst[0] = src[7];
1943 dst[1] = src[6];
1944 dst[2] = src[5];
1945 dst[3] = src[4];
1946 dst[4] = src[3];
1947 dst[5] = src[2];
1948 dst[6] = src[1];
1949 dst[7] = src[0];
1950 #endif
1951 }
1952
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)1953 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1954 dst[0] = src[3];
1955 dst[1] = src[2];
1956 dst[2] = src[1];
1957 dst[3] = src[0];
1958 }
1959 #else
1960 #define rdb_swap_double_bytes nullptr
1961 #define rdb_swap_float_bytes nullptr
1962 #endif
1963
unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t size,const int exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))1964 int Rdb_key_def::unpack_floating_point(
1965 uchar *const dst, Rdb_string_reader *const reader, const size_t size,
1966 const int exp_digit, const uchar *const zero_pattern,
1967 const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
1968 const uchar *const from = (const uchar *)reader->read(size);
1969 if (from == nullptr) {
1970 /* Mem-comparable image doesn't have enough bytes */
1971 return UNPACK_FAILURE;
1972 }
1973
1974 /* Check to see if the value is zero */
1975 if (memcmp(from, zero_pattern, size) == 0) {
1976 memcpy(dst, zero_val, size);
1977 return UNPACK_SUCCESS;
1978 }
1979
1980 #if defined(WORDS_BIGENDIAN)
1981 // On big-endian, output can go directly into result
1982 uchar *const tmp = dst;
1983 #else
1984 // Otherwise use a temporary buffer to make byte-swapping easier later
1985 uchar tmp[8];
1986 #endif
1987
1988 memcpy(tmp, from, size);
1989
1990 if (tmp[0] & 0x80) {
1991 // If the high bit is set the original value was positive so
1992 // remove the high bit and subtract one from the exponent.
1993 ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1994 exp_part &= 0x7FFF; // clear high bit;
1995 exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent
1996 tmp[0] = (uchar)(exp_part >> 8);
1997 tmp[1] = (uchar)exp_part;
1998 } else {
1999 // Otherwise the original value was negative and all bytes have been
2000 // negated.
2001 for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
2002 }
2003
2004 #if !defined(WORDS_BIGENDIAN)
2005 // On little-endian, swap the bytes around
2006 swap_func(dst, tmp);
2007 #else
2008 DBUG_ASSERT(swap_func == nullptr);
2009 #endif
2010
2011 return UNPACK_SUCCESS;
2012 }
2013
2014 #if !defined(DBL_EXP_DIG)
2015 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
2016 #endif
2017
2018 /*
2019 Function of type rdb_index_field_unpack_t
2020
2021 Unpack a double by doing the reverse action of change_double_for_sort
2022 (sql/filesort.cc). Note that this only works on IEEE values.
2023 Note also that this code assumes that NaN and +/-Infinity are never
2024 allowed in the database.
2025 */
unpack_double(Rdb_field_packing * const fpi MY_ATTRIBUTE ((__unused__)),Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2026 int Rdb_key_def::unpack_double(
2027 Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
2028 Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr,
2029 Rdb_string_reader *const reader,
2030 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2031 static double zero_val = 0.0;
2032 static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
2033
2034 return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
2035 zero_pattern, (const uchar *)&zero_val,
2036 rdb_swap_double_bytes);
2037 }
2038
2039 #if !defined(FLT_EXP_DIG)
2040 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
2041 #endif
2042
2043 /*
2044 Function of type rdb_index_field_unpack_t
2045
2046 Unpack a float by doing the reverse action of Field_float::make_sort_key
2047 (sql/field.cc). Note that this only works on IEEE values.
2048 Note also that this code assumes that NaN and +/-Infinity are never
2049 allowed in the database.
2050 */
unpack_float(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2051 int Rdb_key_def::unpack_float(
2052 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2053 uchar *const field_ptr, Rdb_string_reader *const reader,
2054 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2055 static float zero_val = 0.0;
2056 static const uchar zero_pattern[4] = {128, 0, 0, 0};
2057
2058 return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
2059 zero_pattern, (const uchar *)&zero_val,
2060 rdb_swap_float_bytes);
2061 }
2062
2063 /*
2064 Function of type rdb_index_field_unpack_t used to
2065 Unpack by doing the reverse action to Field_newdate::make_sort_key.
2066 */
2067
unpack_newdate(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2068 int Rdb_key_def::unpack_newdate(
2069 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2070 uchar *const field_ptr, Rdb_string_reader *const reader,
2071 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2072 const char *from;
2073 DBUG_ASSERT(fpi->m_max_image_len == 3);
2074
2075 if (!(from = reader->read(3))) {
2076 /* Mem-comparable image doesn't have enough bytes */
2077 return UNPACK_FAILURE;
2078 }
2079
2080 field_ptr[0] = from[2];
2081 field_ptr[1] = from[1];
2082 field_ptr[2] = from[0];
2083 return UNPACK_SUCCESS;
2084 }
2085
2086 /*
2087 Function of type rdb_index_field_unpack_t, used to
2088 Unpack the string by copying it over.
2089 This is for BINARY(n) where the value occupies the whole length.
2090 */
2091
unpack_binary_str(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2092 int Rdb_key_def::unpack_binary_str(
2093 Rdb_field_packing *const fpi, Field *const field, uchar *const to,
2094 Rdb_string_reader *const reader,
2095 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2096 const char *from;
2097 if (!(from = reader->read(fpi->m_max_image_len))) {
2098 /* Mem-comparable image doesn't have enough bytes */
2099 return UNPACK_FAILURE;
2100 }
2101
2102 memcpy(to, from, fpi->m_max_image_len);
2103 return UNPACK_SUCCESS;
2104 }
2105
2106 /*
2107 Function of type rdb_index_field_unpack_t.
2108 For UTF-8, we need to convert 2-byte wide-character entities back into
2109 UTF8 sequences.
2110 */
2111
unpack_utf8_str(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2112 int Rdb_key_def::unpack_utf8_str(
2113 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2114 Rdb_string_reader *const reader,
2115 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2116 my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
2117 const uchar *src;
2118 if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
2119 /* Mem-comparable image doesn't have enough bytes */
2120 return UNPACK_FAILURE;
2121 }
2122
2123 const uchar *const src_end = src + fpi->m_max_image_len;
2124 uchar *const dst_end = dst + field->pack_length();
2125
2126 while (src < src_end) {
2127 my_wc_t wc = (src[0] << 8) | src[1];
2128 src += 2;
2129 int res = cset->wc_mb(wc, dst, dst_end);
2130 DBUG_ASSERT(res > 0 && res <= 3);
2131 if (res < 0) return UNPACK_FAILURE;
2132 dst += res;
2133 }
2134
2135 cset->fill(reinterpret_cast<char *>(dst), dst_end - dst,
2136 cset->pad_char);
2137 return UNPACK_SUCCESS;
2138 }
2139
2140 /*
2141 This is the original algorithm to encode a variable binary field. It
2142 sets a flag byte every Nth byte. The flag value is (255 - #pad) where
2143 #pad is the number of padding bytes that were needed (0 if all N-1
2144 bytes were used).
2145
2146 If N=8 and the field is:
2147 * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2148 * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2149 And the 4 byte string compares as greater than the 3 byte string
2150
2151 Unfortunately the algorithm has a flaw. If the input is exactly a
2152 multiple of N-1, an extra N bytes are written. Since we usually use
2153 N=9, an 8 byte input will generate 18 bytes of output instead of the
2154 9 bytes of output that is optimal.
2155
2156 See pack_variable_format for the newer algorithm.
2157 */
pack_legacy_variable_format(const uchar * src,size_t src_len,uchar ** dst)2158 void Rdb_key_def::pack_legacy_variable_format(
2159 const uchar *src, // The data to encode
2160 size_t src_len, // The length of the data to encode
2161 uchar **dst) // The location to encode the data
2162 {
2163 size_t copy_len;
2164 size_t padding_bytes;
2165 uchar *ptr = *dst;
2166
2167 do {
2168 copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2169 padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2170 memcpy(ptr, src, copy_len);
2171 ptr += copy_len;
2172 src += copy_len;
2173 // pad with zeros if necessary
2174 if (padding_bytes > 0) {
2175 memset(ptr, 0, padding_bytes);
2176 ptr += padding_bytes;
2177 }
2178
2179 *(ptr++) = 255 - padding_bytes;
2180
2181 src_len -= copy_len;
2182 } while (padding_bytes == 0);
2183
2184 *dst = ptr;
2185 }
2186
2187 /*
2188 This is the new algorithm. Similarly to the legacy format the input
2189 is split up into N-1 bytes and a flag byte is used as the Nth byte
2190 in the output.
2191
2192 - If the previous segment needed any padding the flag is set to the
2193 number of bytes used (0..N-2). 0 is possible in the first segment
2194 if the input is 0 bytes long.
2195 - If no padding was used and there is no more data left in the input
2196 the flag is set to N-1
2197 - If no padding was used and there is still data left in the input the
2198 flag is set to N.
2199
2200 For N=9, the following input values encode to the specified
2201 outout (where 'X' indicates a byte of the original input):
2202 - 0 bytes is encoded as 0 0 0 0 0 0 0 0 0
2203 - 1 byte is encoded as X 0 0 0 0 0 0 0 1
2204 - 2 bytes is encoded as X X 0 0 0 0 0 0 2
2205 - 7 bytes is encoded as X X X X X X X 0 7
2206 - 8 bytes is encoded as X X X X X X X X 8
2207 - 9 bytes is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2208 - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2209 */
pack_variable_format(const uchar * src,size_t src_len,uchar ** dst)2210 void Rdb_key_def::pack_variable_format(
2211 const uchar *src, // The data to encode
2212 size_t src_len, // The length of the data to encode
2213 uchar **dst) // The location to encode the data
2214 {
2215 uchar *ptr = *dst;
2216
2217 for (;;) {
2218 // Figure out how many bytes to copy, copy them and adjust pointers
2219 const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2220 memcpy(ptr, src, copy_len);
2221 ptr += copy_len;
2222 src += copy_len;
2223 src_len -= copy_len;
2224
2225 // Are we at the end of the input?
2226 if (src_len == 0) {
2227 // pad with zeros if necessary;
2228 const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2229 if (padding_bytes > 0) {
2230 memset(ptr, 0, padding_bytes);
2231 ptr += padding_bytes;
2232 }
2233
2234 // Put the flag byte (0 - N-1) in the output
2235 *(ptr++) = (uchar)copy_len;
2236 break;
2237 }
2238
2239 // We have more data - put the flag byte (N) in and continue
2240 *(ptr++) = RDB_ESCAPE_LENGTH;
2241 }
2242
2243 *dst = ptr;
2244 }
2245
2246 /*
2247 Function of type rdb_index_field_pack_t
2248 */
2249
pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))2250 void Rdb_key_def::pack_with_varchar_encoding(
2251 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2252 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
2253 const CHARSET_INFO *const charset = field->charset();
2254 Field_varstring *const field_var = (Field_varstring *)field;
2255
2256 const size_t value_length = (field_var->length_bytes == 1)
2257 ? (uint)*field->ptr
2258 : uint2korr(field->ptr);
2259 size_t xfrm_len = charset->strnxfrm(
2260 buf, fpi->m_max_image_len, field_var->char_length(),
2261 field_var->ptr + field_var->length_bytes, value_length, 0);
2262
2263 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2264 if (fpi->m_use_legacy_varbinary_format) {
2265 pack_legacy_variable_format(buf, xfrm_len, dst);
2266 } else {
2267 pack_variable_format(buf, xfrm_len, dst);
2268 }
2269 }
2270
2271 /*
2272 Compare the string in [buf..buf_end) with a string that is an infinite
2273 sequence of strings in space_xfrm
2274 */
2275
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)2276 static int rdb_compare_string_with_spaces(
2277 const uchar *buf, const uchar *const buf_end,
2278 const std::vector<uchar> *const space_xfrm) {
2279 int cmp = 0;
2280 while (buf < buf_end) {
2281 size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2282 if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
2283 buf += bytes;
2284 }
2285 return cmp;
2286 }
2287
2288 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2289 /*
2290 Pack the data with Variable-Length Space-Padded Encoding.
2291
2292 The encoding is there to meet two goals:
2293
2294 Goal#1. Comparison. The SQL standard says
2295
2296 " If the collation for the comparison has the PAD SPACE characteristic,
2297 for the purposes of the comparison, the shorter value is effectively
2298 extended to the length of the longer by concatenation of <space>s on the
2299 right.
2300
2301 At the moment, all MySQL collations except one have the PAD SPACE
2302 characteristic. The exception is the "binary" collation that is used by
2303 [VAR]BINARY columns. (Note that binary collations for specific charsets,
2304 like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2305 the PAD SPACE characteristic).
2306
2307 Goal#2 is to preserve the number of trailing spaces in the original value.
2308
2309 This is achieved by using the following encoding:
2310 The key part:
2311 - Stores mem-comparable image of the column
2312 - It is stored in chunks of fpi->m_segment_size bytes (*)
2313 = If the remainder of the chunk is not occupied, it is padded with mem-
2314 comparable image of the space character (cs->pad_char to be precise).
2315 - The last byte of the chunk shows how the rest of column's mem-comparable
2316 image would compare to mem-comparable image of the column extended with
2317 spaces. There are three possible values.
2318 - VARCHAR_CMP_LESS_THAN_SPACES,
2319 - VARCHAR_CMP_EQUAL_TO_SPACES
2320 - VARCHAR_CMP_GREATER_THAN_SPACES
2321
2322 VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2323 is spaces, or something that sorts as spaces, so there is no reason to store
2324 it).
2325
2326 Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2327
2328 'abcd\0' => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0 ' <VARCHAR_CMP_EQUAL> ]
2329 'abcd' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2330 'abcd ' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2331 'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2332
2333 As mentioned above, the last chunk is padded with mem-comparable images of
2334 cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2335
2336 fpi->m_segment_size depends on the used collation. It is chosen to be such
2337 that no mem-comparable image of space will ever stretch across the segments
2338 (see get_segment_size_from_collation).
2339
2340 == The value part (aka unpack_info) ==
2341 The value part stores the number of space characters that one needs to add
2342 when unpacking the string.
2343 - If the number is positive, it means add this many spaces at the end
2344 - If the number is negative, it means padding has added extra spaces which
2345 must be removed.
2346
2347 Storage considerations
2348 - depending on column's max size, the number may occupy 1 or 2 bytes
2349 - the number of spaces that need to be removed is not more than
2350 RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2351 then store it as unsigned.
2352
2353 @seealso
2354 unpack_binary_or_utf8_varchar_space_pad
2355 unpack_simple_varchar_space_pad
2356 dummy_make_unpack_info
2357 skip_variable_space_pad
2358 */
2359
pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)2360 void Rdb_key_def::pack_with_varchar_space_pad(
2361 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2362 Rdb_pack_field_context *const pack_ctx) {
2363 Rdb_string_writer *const unpack_info = pack_ctx->writer;
2364 const CHARSET_INFO *const charset = field->charset();
2365 const auto field_var = static_cast<Field_varstring *>(field);
2366
2367 const size_t value_length = (field_var->length_bytes == 1)
2368 ? (uint)*field->ptr
2369 : uint2korr(field->ptr);
2370
2371 const size_t trimmed_len = charset->lengthsp(
2372 (const char *)field_var->ptr + field_var->length_bytes,
2373 value_length);
2374 const size_t xfrm_len = charset->strnxfrm(
2375 buf, fpi->m_max_image_len, field_var->char_length(),
2376 field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2377
2378 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2379 uchar *const buf_end = buf + xfrm_len;
2380
2381 size_t encoded_size = 0;
2382 uchar *ptr = *dst;
2383 size_t padding_bytes;
2384 while (true) {
2385 const size_t copy_len =
2386 std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2387 padding_bytes = fpi->m_segment_size - 1 - copy_len;
2388 memcpy(ptr, buf, copy_len);
2389 ptr += copy_len;
2390 buf += copy_len;
2391
2392 if (padding_bytes) {
2393 memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2394 ptr += padding_bytes;
2395 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment
2396 } else {
2397 // Compare the string suffix with a hypothetical infinite string of
2398 // spaces. It could be that the first difference is beyond the end of
2399 // current chunk.
2400 const int cmp =
2401 rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2402
2403 if (cmp < 0) {
2404 *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2405 } else if (cmp > 0) {
2406 *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2407 } else {
2408 // It turns out all the rest are spaces.
2409 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2410 }
2411 }
2412 encoded_size += fpi->m_segment_size;
2413
2414 if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
2415 }
2416
2417 // m_unpack_info_stores_value means unpack_info stores the whole original
2418 // value. There is no need to store the number of trimmed/padded endspaces
2419 // in that case.
2420 if (unpack_info && !fpi->m_unpack_info_stores_value) {
2421 // (value_length - trimmed_len) is the number of trimmed space *characters*
2422 // then, padding_bytes is the number of *bytes* added as padding
2423 // then, we add 8, because we don't store negative values.
2424 DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
2425 DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
2426 const size_t removed_chars =
2427 RDB_TRIMMED_CHARS_OFFSET +
2428 (value_length - trimmed_len) / fpi->space_mb_len -
2429 padding_bytes / fpi->space_xfrm_len;
2430
2431 if (fpi->m_unpack_info_uses_two_bytes) {
2432 unpack_info->write_uint16(removed_chars);
2433 } else {
2434 DBUG_ASSERT(removed_chars < 0x100);
2435 unpack_info->write_uint8(removed_chars);
2436 }
2437 }
2438
2439 *dst += encoded_size;
2440 }
2441
2442 /*
2443 Calculate the number of used bytes in the chunk and whether this is the
2444 last chunk in the input. This is based on the old legacy format - see
2445 pack_legacy_variable_format.
2446 */
calc_unpack_legacy_variable_format(uchar flag,bool * done)2447 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
2448 uint pad = 255 - flag;
2449 uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2450 if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2451 return (uint)-1;
2452 }
2453
2454 *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2455 return used_bytes;
2456 }
2457
2458 /*
2459 Calculate the number of used bytes in the chunk and whether this is the
2460 last chunk in the input. This is based on the new format - see
2461 pack_variable_format.
2462 */
calc_unpack_variable_format(uchar flag,bool * done)2463 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
2464 // Check for invalid flag values
2465 if (flag > RDB_ESCAPE_LENGTH) {
2466 return (uint)-1;
2467 }
2468
2469 // Values from 1 to N-1 indicate this is the last chunk and that is how
2470 // many bytes were used
2471 if (flag < RDB_ESCAPE_LENGTH) {
2472 *done = true;
2473 return flag;
2474 }
2475
2476 // A value of N means we used N-1 bytes and had more to go
2477 *done = false;
2478 return RDB_ESCAPE_LENGTH - 1;
2479 }
2480
2481 /*
2482 Unpack data that has charset information. Each two bytes of the input is
2483 treated as a wide-character and converted to its multibyte equivalent in
2484 the output.
2485 */
unpack_charset(const CHARSET_INFO * cset,const uchar * src,uint src_len,uchar * dst,uint dst_len,uint * used_bytes)2486 static int unpack_charset(
2487 const CHARSET_INFO *cset, // character set information
2488 const uchar *src, // source data to unpack
2489 uint src_len, // length of source data
2490 uchar *dst, // destination of unpacked data
2491 uint dst_len, // length of destination data
2492 uint *used_bytes) // output number of bytes used
2493 {
2494 if (src_len & 1) {
2495 /*
2496 UTF-8 characters are encoded into two-byte entities. There is no way
2497 we can have an odd number of bytes after encoding.
2498 */
2499 return UNPACK_FAILURE;
2500 }
2501
2502 uchar *dst_end = dst + dst_len;
2503 uint used = 0;
2504
2505 for (uint ii = 0; ii < src_len; ii += 2) {
2506 my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2507 int res = cset->wc_mb(wc, dst + used, dst_end);
2508 DBUG_ASSERT(res > 0 && res <= 3);
2509 if (res < 0) {
2510 return UNPACK_FAILURE;
2511 }
2512
2513 used += res;
2514 }
2515
2516 *used_bytes = used;
2517 return UNPACK_SUCCESS;
2518 }
2519
2520 /*
2521 Function of type rdb_index_field_unpack_t
2522 */
2523
unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2524 int Rdb_key_def::unpack_binary_or_utf8_varchar(
2525 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2526 Rdb_string_reader *const reader,
2527 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2528 const uchar *ptr;
2529 size_t len = 0;
2530 bool finished = false;
2531 uchar *d0 = dst;
2532 Field_varstring *const field_var = (Field_varstring *)field;
2533 dst += field_var->length_bytes;
2534 // How much we can unpack
2535 size_t dst_len = field_var->pack_length() - field_var->length_bytes;
2536
2537 bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
2538
2539 /* Decode the length-emitted encoding here */
2540 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2541 uint used_bytes;
2542
2543 /* See pack_with_varchar_encoding. */
2544 if (use_legacy_format) {
2545 used_bytes = calc_unpack_legacy_variable_format(
2546 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2547 } else {
2548 used_bytes =
2549 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2550 }
2551
2552 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2553 return UNPACK_FAILURE; // Corruption in the data
2554 }
2555
2556 /*
2557 Now, we need to decode used_bytes of data and append them to the value.
2558 */
2559 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2560 int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst,
2561 dst_len, &used_bytes);
2562 if (err != UNPACK_SUCCESS) {
2563 return err;
2564 }
2565 } else {
2566 memcpy(dst, ptr, used_bytes);
2567 }
2568
2569 dst += used_bytes;
2570 dst_len -= used_bytes;
2571 len += used_bytes;
2572
2573 if (finished) {
2574 break;
2575 }
2576 }
2577
2578 if (!finished) {
2579 return UNPACK_FAILURE;
2580 }
2581
2582 /* Save the length */
2583 if (field_var->length_bytes == 1) {
2584 d0[0] = (uchar)len;
2585 } else {
2586 DBUG_ASSERT(field_var->length_bytes == 2);
2587 int2store(d0, len);
2588 }
2589 return UNPACK_SUCCESS;
2590 }
2591
2592 /*
2593 @seealso
2594 pack_with_varchar_space_pad - packing function
2595 unpack_simple_varchar_space_pad - unpacking function for 'simple'
2596 charsets.
2597 skip_variable_space_pad - skip function
2598 */
unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2599 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
2600 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2601 Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2602 const uchar *ptr;
2603 size_t len = 0;
2604 bool finished = false;
2605 Field_varstring *const field_var = static_cast<Field_varstring *>(field);
2606 uchar *d0 = dst;
2607 uchar *dst_end = dst + field_var->pack_length();
2608 dst += field_var->length_bytes;
2609
2610 uint space_padding_bytes = 0;
2611 uint extra_spaces;
2612 if ((fpi->m_unpack_info_uses_two_bytes
2613 ? unp_reader->read_uint16(&extra_spaces)
2614 : unp_reader->read_uint8(&extra_spaces))) {
2615 return UNPACK_FAILURE;
2616 }
2617
2618 if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
2619 space_padding_bytes =
2620 -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
2621 extra_spaces = 0;
2622 } else {
2623 extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
2624 }
2625
2626 space_padding_bytes *= fpi->space_xfrm_len;
2627
2628 /* Decode the length-emitted encoding here */
2629 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2630 const char last_byte = ptr[fpi->m_segment_size - 1];
2631 size_t used_bytes;
2632 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment
2633 {
2634 if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2635 return UNPACK_FAILURE; // Cannot happen, corrupted data
2636 }
2637 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2638 finished = true;
2639 } else {
2640 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2641 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2642 return UNPACK_FAILURE; // Invalid value
2643 }
2644 used_bytes = fpi->m_segment_size - 1;
2645 }
2646
2647 // Now, need to decode used_bytes of data and append them to the value.
2648 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2649 if (used_bytes & 1) {
2650 /*
2651 UTF-8 characters are encoded into two-byte entities. There is no way
2652 we can have an odd number of bytes after encoding.
2653 */
2654 return UNPACK_FAILURE;
2655 }
2656
2657 const uchar *src = ptr;
2658 const uchar *const src_end = ptr + used_bytes;
2659 while (src < src_end) {
2660 my_wc_t wc = (src[0] << 8) | src[1];
2661 src += 2;
2662 const CHARSET_INFO *cset = fpi->m_varchar_charset;
2663 int res = cset->wc_mb(wc, dst, dst_end);
2664 DBUG_ASSERT(res <= 3);
2665 if (res <= 0) return UNPACK_FAILURE;
2666 dst += res;
2667 len += res;
2668 }
2669 } else {
2670 if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
2671 memcpy(dst, ptr, used_bytes);
2672 dst += used_bytes;
2673 len += used_bytes;
2674 }
2675
2676 if (finished) {
2677 if (extra_spaces) {
2678 // Both binary and UTF-8 charset store space as ' ',
2679 // so the following is ok:
2680 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2681 memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
2682 len += extra_spaces;
2683 }
2684 break;
2685 }
2686 }
2687
2688 if (!finished) return UNPACK_FAILURE;
2689
2690 /* Save the length */
2691 if (field_var->length_bytes == 1) {
2692 d0[0] = (uchar)len;
2693 } else {
2694 DBUG_ASSERT(field_var->length_bytes == 2);
2695 int2store(d0, len);
2696 }
2697 return UNPACK_SUCCESS;
2698 }
2699
2700 /////////////////////////////////////////////////////////////////////////
2701
2702 /*
2703 Function of type rdb_make_unpack_info_t
2704 */
2705
make_unpack_unknown(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2706 void Rdb_key_def::make_unpack_unknown(
2707 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2708 const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2709 pack_ctx->writer->write(field->ptr, field->pack_length());
2710 }
2711
2712 /*
2713 This point of this function is only to indicate that unpack_info is
2714 available.
2715
2716 The actual unpack_info data is produced by the function that packs the key,
2717 that is, pack_with_varchar_space_pad.
2718 */
2719
dummy_make_unpack_info(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * field MY_ATTRIBUTE ((__unused__)),Rdb_pack_field_context * pack_ctx MY_ATTRIBUTE ((__unused__)))2720 void Rdb_key_def::dummy_make_unpack_info(
2721 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2722 const Field *field MY_ATTRIBUTE((__unused__)),
2723 Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
2724 // Do nothing
2725 }
2726
2727 /*
2728 Function of type rdb_index_field_unpack_t
2729 */
2730
unpack_unknown(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2731 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi,
2732 Field *const field, uchar *const dst,
2733 Rdb_string_reader *const reader,
2734 Rdb_string_reader *const unp_reader) {
2735 const uchar *ptr;
2736 const uint len = fpi->m_unpack_data_len;
2737 // We don't use anything from the key, so skip over it.
2738 if (skip_max_length(fpi, field, reader)) {
2739 return UNPACK_FAILURE;
2740 }
2741
2742 DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
2743
2744 if ((ptr = (const uchar *)unp_reader->read(len))) {
2745 memcpy(dst, ptr, len);
2746 return UNPACK_SUCCESS;
2747 }
2748 return UNPACK_FAILURE;
2749 }
2750
2751 /*
2752 Function of type rdb_make_unpack_info_t
2753 */
2754
make_unpack_unknown_varchar(const Rdb_collation_codec * const codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2755 void Rdb_key_def::make_unpack_unknown_varchar(
2756 const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
2757 const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2758 const auto f = static_cast<const Field_varstring *>(field);
2759 uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2760 len += f->length_bytes;
2761 pack_ctx->writer->write(field->ptr, len);
2762 }
2763
2764 /*
2765 Function of type rdb_index_field_unpack_t
2766
2767 @detail
2768 Unpack a key part in an "unknown" collation from its
2769 (mem_comparable_form, unpack_info) form.
2770
2771 "Unknown" means we have no clue about how mem_comparable_form is made from
2772 the original string, so we keep the whole original string in the unpack_info.
2773
2774 @seealso
2775 make_unpack_unknown, unpack_unknown
2776 */
2777
unpack_unknown_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2778 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
2779 Field *const field, uchar *dst,
2780 Rdb_string_reader *const reader,
2781 Rdb_string_reader *const unp_reader) {
2782 const uchar *ptr;
2783 uchar *const d0 = dst;
2784 const auto f = static_cast<Field_varstring *>(field);
2785 dst += f->length_bytes;
2786 const uint len_bytes = f->length_bytes;
2787 // We don't use anything from the key, so skip over it.
2788 if ((fpi->m_skip_func)(fpi, field, reader)) {
2789 return UNPACK_FAILURE;
2790 }
2791
2792 DBUG_ASSERT(len_bytes > 0);
2793 DBUG_ASSERT(unp_reader != nullptr);
2794
2795 if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
2796 memcpy(d0, ptr, len_bytes);
2797 const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
2798 if ((ptr = (const uchar *)unp_reader->read(len))) {
2799 memcpy(dst, ptr, len);
2800 return UNPACK_SUCCESS;
2801 }
2802 }
2803 return UNPACK_FAILURE;
2804 }
2805
2806 /*
2807 Write unpack_data for a "simple" collation
2808 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)2809 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
2810 const Rdb_collation_codec *const codec,
2811 const uchar *const src,
2812 const size_t src_len) {
2813 for (uint i = 0; i < src_len; i++) {
2814 writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
2815 }
2816 }
2817
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len,uchar * const dst)2818 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
2819 const Rdb_collation_codec *const codec,
2820 const uchar *const src, const size_t src_len,
2821 uchar *const dst) {
2822 for (uint i = 0; i < src_len; i++) {
2823 if (codec->m_dec_size[src[i]] > 0) {
2824 uint *ret;
2825 DBUG_ASSERT(reader != nullptr);
2826
2827 if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
2828 return UNPACK_FAILURE;
2829 }
2830 dst[i] = codec->m_dec_idx[*ret][src[i]];
2831 } else {
2832 dst[i] = codec->m_dec_idx[0][src[i]];
2833 }
2834 }
2835
2836 return UNPACK_SUCCESS;
2837 }
2838
2839 /*
2840 Function of type rdb_make_unpack_info_t
2841
2842 @detail
2843 Make unpack_data for VARCHAR(n) in a "simple" charset.
2844 */
2845
make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2846 void Rdb_key_def::make_unpack_simple_varchar(
2847 const Rdb_collation_codec *const codec, const Field *const field,
2848 Rdb_pack_field_context *const pack_ctx) {
2849 const auto f = static_cast<const Field_varstring *>(field);
2850 uchar *const src = f->ptr + f->length_bytes;
2851 const size_t src_len =
2852 f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2853 Rdb_bit_writer bit_writer(pack_ctx->writer);
2854 // The std::min compares characters with bytes, but for simple collations,
2855 // mbmaxlen = 1.
2856 rdb_write_unpack_simple(&bit_writer, codec, src,
2857 std::min((size_t)f->char_length(), src_len));
2858 }
2859
2860 /*
2861 Function of type rdb_index_field_unpack_t
2862
2863 @seealso
2864 pack_with_varchar_space_pad - packing function
2865 unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
2866 */
2867
unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2868 int Rdb_key_def::unpack_simple_varchar_space_pad(
2869 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2870 Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2871 const uchar *ptr;
2872 size_t len = 0;
2873 bool finished = false;
2874 uchar *d0 = dst;
2875 const Field_varstring *const field_var =
2876 static_cast<Field_varstring *>(field);
2877 // For simple collations, char_length is also number of bytes.
2878 DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
2879 uchar *dst_end = dst + field_var->pack_length();
2880 dst += field_var->length_bytes;
2881 Rdb_bit_reader bit_reader(unp_reader);
2882
2883 uint space_padding_bytes = 0;
2884 uint extra_spaces;
2885 DBUG_ASSERT(unp_reader != nullptr);
2886
2887 if ((fpi->m_unpack_info_uses_two_bytes
2888 ? unp_reader->read_uint16(&extra_spaces)
2889 : unp_reader->read_uint8(&extra_spaces))) {
2890 return UNPACK_FAILURE;
2891 }
2892
2893 if (extra_spaces <= 8) {
2894 space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2895 extra_spaces = 0;
2896 } else {
2897 extra_spaces -= 8;
2898 }
2899
2900 space_padding_bytes *= fpi->space_xfrm_len;
2901
2902 /* Decode the length-emitted encoding here */
2903 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2904 const char last_byte =
2905 ptr[fpi->m_segment_size - 1]; // number of padding bytes
2906 size_t used_bytes;
2907 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2908 // this is the last one
2909 if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2910 return UNPACK_FAILURE; // Cannot happen, corrupted data
2911 }
2912 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2913 finished = true;
2914 } else {
2915 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2916 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2917 return UNPACK_FAILURE;
2918 }
2919 used_bytes = fpi->m_segment_size - 1;
2920 }
2921
2922 if (dst + used_bytes > dst_end) {
2923 // The value on disk is longer than the field definition allows?
2924 return UNPACK_FAILURE;
2925 }
2926
2927 uint ret;
2928 if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2929 used_bytes, dst)) != UNPACK_SUCCESS) {
2930 return ret;
2931 }
2932
2933 dst += used_bytes;
2934 len += used_bytes;
2935
2936 if (finished) {
2937 if (extra_spaces) {
2938 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2939 // pad_char has a 1-byte form in all charsets that
2940 // are handled by rdb_init_collation_mapping.
2941 memset(dst, field_var->charset()->pad_char, extra_spaces);
2942 len += extra_spaces;
2943 }
2944 break;
2945 }
2946 }
2947
2948 if (!finished) return UNPACK_FAILURE;
2949
2950 /* Save the length */
2951 if (field_var->length_bytes == 1) {
2952 d0[0] = (uchar)len;
2953 } else {
2954 DBUG_ASSERT(field_var->length_bytes == 2);
2955 int2store(d0, len);
2956 }
2957 return UNPACK_SUCCESS;
2958 }
2959
2960 /*
2961 Function of type rdb_make_unpack_info_t
2962
2963 @detail
2964 Make unpack_data for CHAR(n) value in a "simple" charset.
2965 It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2966
2967 @seealso
2968 The VARCHAR variant is in make_unpack_simple_varchar
2969 */
2970
make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2971 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
2972 const Field *const field,
2973 Rdb_pack_field_context *const pack_ctx) {
2974 const uchar *const src = field->ptr;
2975 Rdb_bit_writer bit_writer(pack_ctx->writer);
2976 rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2977 }
2978
2979 /*
2980 Function of type rdb_index_field_unpack_t
2981 */
2982
unpack_simple(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2983 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi,
2984 Field *const field MY_ATTRIBUTE((__unused__)),
2985 uchar *const dst,
2986 Rdb_string_reader *const reader,
2987 Rdb_string_reader *const unp_reader) {
2988 const uchar *ptr;
2989 const uint len = fpi->m_max_image_len;
2990 Rdb_bit_reader bit_reader(unp_reader);
2991
2992 if (!(ptr = (const uchar *)reader->read(len))) {
2993 return UNPACK_FAILURE;
2994 }
2995
2996 return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2997 fpi->m_charset_codec, ptr, len, dst);
2998 }
2999
3000 // See Rdb_charset_space_info::spaces_xfrm
3001 const int RDB_SPACE_XFRM_SIZE = 32;
3002
3003 // A class holding information about how space character is represented in a
3004 // charset.
3005 class Rdb_charset_space_info {
3006 public:
3007 Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
3008 Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
3009 Rdb_charset_space_info() = default;
3010
3011 // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
3012 std::vector<uchar> spaces_xfrm;
3013
3014 // length(strxfrm(' '))
3015 size_t space_xfrm_len;
3016
3017 // length of the space character itself
3018 // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
3019 // (length=2)
3020 size_t space_mb_len;
3021 };
3022
3023 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
3024 rdb_mem_comparable_space;
3025
3026 /*
3027 @brief
3028 For a given charset, get
3029 - strxfrm(' '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
3030 - length of strxfrm(charset, ' ')
3031 - length of the space character in the charset
3032
3033 @param cs IN Charset to get the space for
3034 @param ptr OUT A few space characters
3035 @param len OUT Return length of the space (in bytes)
3036
3037 @detail
3038 It is tempting to pre-generate mem-comparable form of space character for
3039 every charset on server startup.
3040 One can't do that: some charsets are not initialized until somebody
3041 attempts to use them (e.g. create or open a table that has a field that
3042 uses the charset).
3043 */
3044
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)3045 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
3046 const std::vector<uchar> **xfrm,
3047 size_t *const xfrm_len,
3048 size_t *const mb_len) {
3049 DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
3050 if (!rdb_mem_comparable_space[cs->number].get()) {
3051 RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
3052 if (!rdb_mem_comparable_space[cs->number].get()) {
3053 // Upper bound of how many bytes can be occupied by multi-byte form of a
3054 // character in any charset.
3055 const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
3056 DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
3057
3058 // multi-byte form of the ' ' (space) character
3059 uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
3060
3061 const size_t space_mb_len = cs->wc_mb(
3062 (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
3063
3064 // mem-comparable image of the space character
3065 std::array<uchar, 20> space;
3066
3067 const size_t space_len = cs->strnxfrm(
3068 space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
3069 Rdb_charset_space_info *const info = new Rdb_charset_space_info;
3070 info->space_xfrm_len = space_len;
3071 info->space_mb_len = space_mb_len;
3072 while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
3073 info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
3074 space.data() + space_len);
3075 }
3076 rdb_mem_comparable_space[cs->number].reset(info);
3077 }
3078 RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
3079 }
3080
3081 *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
3082 *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
3083 *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
3084 }
3085
3086 mysql_mutex_t rdb_mem_cmp_space_mutex;
3087
3088 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
3089 rdb_collation_data;
3090 mysql_mutex_t rdb_collation_data_mutex;
3091
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)3092 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
3093 return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 &&
3094 !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD));
3095 }
3096
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)3097 static const Rdb_collation_codec *rdb_init_collation_mapping(
3098 const my_core::CHARSET_INFO *const cs) {
3099 DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
3100 const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
3101
3102 if (codec == nullptr && rdb_is_collation_supported(cs)) {
3103 RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
3104
3105 codec = rdb_collation_data[cs->number];
3106 if (codec == nullptr) {
3107 Rdb_collation_codec *cur = nullptr;
3108
3109 // Compute reverse mapping for simple collations.
3110 if (rdb_is_collation_supported(cs)) {
3111 cur = new Rdb_collation_codec;
3112 std::map<uchar, std::vector<uchar>> rev_map;
3113 size_t max_conflict_size = 0;
3114 for (int src = 0; src < 256; src++) {
3115 uchar dst = cs->sort_order[src];
3116 rev_map[dst].push_back(src);
3117 max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
3118 }
3119 cur->m_dec_idx.resize(max_conflict_size);
3120
3121 for (auto const &p : rev_map) {
3122 uchar dst = p.first;
3123 for (uint idx = 0; idx < p.second.size(); idx++) {
3124 uchar src = p.second[idx];
3125 uchar bits =
3126 my_bit_log2_uint32(my_round_up_to_next_power(p.second.size()));
3127 cur->m_enc_idx[src] = idx;
3128 cur->m_enc_size[src] = bits;
3129 cur->m_dec_size[dst] = bits;
3130 cur->m_dec_idx[idx][dst] = src;
3131 }
3132 }
3133
3134 cur->m_make_unpack_info_func = {Rdb_key_def::make_unpack_simple_varchar,
3135 Rdb_key_def::make_unpack_simple};
3136 cur->m_unpack_func = {Rdb_key_def::unpack_simple_varchar_space_pad,
3137 Rdb_key_def::unpack_simple};
3138 } else {
3139 // Out of luck for now.
3140 }
3141
3142 if (cur != nullptr) {
3143 codec = cur;
3144 cur->m_cs = cs;
3145 rdb_collation_data[cs->number] = cur;
3146 }
3147 }
3148
3149 RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3150 }
3151
3152 return codec;
3153 }
3154
get_segment_size_from_collation(const CHARSET_INFO * const cs)3155 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3156 int ret;
3157 if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN ||
3158 cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) {
3159 /*
3160 In these collations, a character produces one weight, which is 3 bytes.
3161 Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3162 get 3*3+1=10
3163 */
3164 ret = 10;
3165 } else {
3166 /*
3167 All other collations. There are two classes:
3168 - Unicode-based, except for collations mentioned in the if-condition.
3169 For these all weights are 2 bytes long, a character may produce 0..8
3170 weights.
3171 in any case, 8 bytes of payload in the segment guarantee that the last
3172 space character won't span across segments.
3173
3174 - Collations not based on unicode. These have length(strxfrm(' '))=1,
3175 there nothing to worry about.
3176
3177 In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3178 */
3179 ret = 9;
3180 }
3181 DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
3182 return ret;
3183 }
3184
3185 /*
3186 @brief
3187 Setup packing of index field into its mem-comparable form
3188
3189 @detail
3190 - It is possible produce mem-comparable form for any datatype.
3191 - Some datatypes also allow to unpack the original value from its
3192 mem-comparable form.
3193 = Some of these require extra information to be stored in "unpack_info".
3194 unpack_info is not a part of mem-comparable form, it is only used to
3195 restore the original value
3196
3197 @param
3198 field IN field to be packed/un-packed
3199
3200 @return
3201 TRUE - Field can be read with index-only reads
3202 FALSE - Otherwise
3203 */
3204
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint keynr_arg,const uint key_part_arg,const uint16 key_length)3205 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3206 const Field *const field, const uint keynr_arg,
3207 const uint key_part_arg,
3208 const uint16 key_length) {
3209 int res = false;
3210 enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3211
3212 m_keynr = keynr_arg;
3213 m_key_part = key_part_arg;
3214
3215 m_maybe_null = field ? field->real_maybe_null() : false;
3216 m_unpack_func = nullptr;
3217 m_make_unpack_info_func = nullptr;
3218 m_unpack_data_len = 0;
3219 space_xfrm = nullptr; // safety
3220 // whether to use legacy format for varchar
3221 m_use_legacy_varbinary_format = false;
3222 // ha_rocksdb::index_flags() will pass key_descr == null to
3223 // see whether field(column) can be read-only reads through return value,
3224 // but the legacy vs. new varchar format doesn't affect return value.
3225 // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
3226 if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3227 m_use_legacy_varbinary_format = true;
3228 }
3229 /* Calculate image length. By default, is is pack_length() */
3230 m_max_image_len =
3231 field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3232 m_skip_func = Rdb_key_def::skip_max_length;
3233 m_pack_func = Rdb_key_def::pack_with_make_sort_key;
3234
3235 m_covered = false;
3236
3237 switch (type) {
3238 case MYSQL_TYPE_LONGLONG:
3239 case MYSQL_TYPE_LONG:
3240 case MYSQL_TYPE_INT24:
3241 case MYSQL_TYPE_SHORT:
3242 case MYSQL_TYPE_TINY:
3243 m_unpack_func = Rdb_key_def::unpack_integer;
3244 m_covered = true;
3245 return true;
3246
3247 case MYSQL_TYPE_DOUBLE:
3248 m_unpack_func = Rdb_key_def::unpack_double;
3249 m_covered = true;
3250 return true;
3251
3252 case MYSQL_TYPE_FLOAT:
3253 m_unpack_func = Rdb_key_def::unpack_float;
3254 m_covered = true;
3255 return true;
3256
3257 case MYSQL_TYPE_NEWDECIMAL:
3258 /*
3259 Decimal is packed with Field_new_decimal::make_sort_key, which just
3260 does memcpy.
3261 Unpacking decimal values was supported only after fix for issue#253,
3262 because of that ha_rocksdb::get_storage_type() handles decimal values
3263 in a special way.
3264 */
3265 case MYSQL_TYPE_DATETIME2:
3266 case MYSQL_TYPE_TIMESTAMP2:
3267 /* These are packed with Field_temporal_with_date_and_timef::make_sort_key
3268 */
3269 case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
3270 case MYSQL_TYPE_YEAR: /* YEAR is packed with Field_tiny::make_sort_key */
3271 /* Everything that comes here is packed with just a memcpy(). */
3272 m_unpack_func = Rdb_key_def::unpack_binary_str;
3273 m_covered = true;
3274 return true;
3275
3276 case MYSQL_TYPE_NEWDATE:
3277 /*
3278 This is packed by Field_newdate::make_sort_key. It assumes the data is
3279 3 bytes, and packing is done by swapping the byte order (for both big-
3280 and little-endian)
3281 */
3282 m_unpack_func = Rdb_key_def::unpack_newdate;
3283 m_covered = true;
3284 return true;
3285 case MYSQL_TYPE_TINY_BLOB:
3286 case MYSQL_TYPE_MEDIUM_BLOB:
3287 case MYSQL_TYPE_LONG_BLOB:
3288 case MYSQL_TYPE_BLOB: {
3289 if (key_descr) {
3290 // The my_charset_bin collation is special in that it will consider
3291 // shorter strings sorting as less than longer strings.
3292 //
3293 // See Field_blob::make_sort_key for details.
3294 m_max_image_len =
3295 key_length + (field->charset()->number == COLLATION_BINARY
3296 ? reinterpret_cast<const Field_blob *>(field)
3297 ->pack_length_no_ptr()
3298 : 0);
3299 // Return false because indexes on text/blob will always require
3300 // a prefix. With a prefix, the optimizer will not be able to do an
3301 // index-only scan since there may be content occuring after the prefix
3302 // length.
3303 return false;
3304 }
3305 break;
3306 }
3307 default:
3308 break;
3309 }
3310
3311 m_unpack_info_stores_value = false;
3312 /* Handle [VAR](CHAR|BINARY) */
3313
3314 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3315 /*
3316 For CHAR-based columns, check how strxfrm image will take.
3317 field->field_length = field->char_length() * cs->mbmaxlen.
3318 */
3319 const CHARSET_INFO *cs = field->charset();
3320 m_max_image_len = cs->strnxfrmlen(type == MYSQL_TYPE_STRING ?
3321 field->pack_length() :
3322 field->field_length);
3323 }
3324 const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3325 const CHARSET_INFO *cs = field->charset();
3326 // max_image_len before chunking is taken into account
3327 const int max_image_len_before_chunks = m_max_image_len;
3328
3329 if (is_varchar) {
3330 // The default for varchar is variable-length, without space-padding for
3331 // comparisons
3332 m_varchar_charset = cs;
3333 m_skip_func = Rdb_key_def::skip_variable_length;
3334 m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3335 if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3336 m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3337 } else {
3338 // Calculate the maximum size of the short section plus the
3339 // maximum size of the long section
3340 m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3341 }
3342
3343 const auto field_var = static_cast<const Field_varstring *>(field);
3344 m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3345 }
3346
3347 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3348 // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3349 // information about character-based datatypes are compared.
3350 bool use_unknown_collation = false;
3351 DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3352 use_unknown_collation = true;);
3353
3354 if (cs->number == COLLATION_BINARY) {
3355 // - SQL layer pads BINARY(N) so that it always is N bytes long.
3356 // - For VARBINARY(N), values may have different lengths, so we're using
3357 // variable-length encoding. This is also the only charset where the
3358 // values are not space-padded for comparison.
3359 m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
3360 : Rdb_key_def::unpack_binary_str;
3361 res = true;
3362 } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) {
3363 // For _bin collations, mem-comparable form of the string is the string
3364 // itself.
3365
3366 if (is_varchar) {
3367 // VARCHARs - are compared as if they were space-padded - but are
3368 // not actually space-padded (reading the value back produces the
3369 // original value, without the padding)
3370 m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3371 m_skip_func = Rdb_key_def::skip_variable_space_pad;
3372 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3373 m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
3374 m_segment_size = get_segment_size_from_collation(cs);
3375 m_max_image_len =
3376 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3377 m_segment_size;
3378 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3379 &space_mb_len);
3380 } else {
3381 // SQL layer pads CHAR(N) values to their maximum length.
3382 // We just store that and restore it back.
3383 m_unpack_func = (cs->number == COLLATION_LATIN1_BIN)
3384 ? Rdb_key_def::unpack_binary_str
3385 : Rdb_key_def::unpack_utf8_str;
3386 }
3387 res = true;
3388 } else {
3389 // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3390
3391 res = true; // index-only scans are possible
3392 m_unpack_data_len = is_varchar ? 0 : field->field_length;
3393 const uint idx = is_varchar ? 0 : 1;
3394 const Rdb_collation_codec *codec = nullptr;
3395
3396 if (is_varchar) {
3397 // VARCHAR requires space-padding for doing comparisons
3398 //
3399 // The check for cs->levels_for_order is to catch
3400 // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3401 // that Variable-Length Space Padded Encoding can't handle.
3402 // It is not expected to work for any other multi-level collations,
3403 // either.
3404 // Currently we handle these collations as NO_PAD, even if they have
3405 // PAD_SPACE attribute.
3406 if (cs->levels_for_order == 1) {
3407 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3408 m_skip_func = Rdb_key_def::skip_variable_space_pad;
3409 m_segment_size = get_segment_size_from_collation(cs);
3410 m_max_image_len =
3411 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3412 m_segment_size;
3413 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3414 &space_mb_len);
3415 } else {
3416 // NO_LINT_DEBUG
3417 sql_print_warning(
3418 "RocksDB: you're trying to create an index "
3419 "with a multi-level collation %s",
3420 cs->name);
3421 // NO_LINT_DEBUG
3422 sql_print_warning(
3423 "MyRocks will handle this collation internally "
3424 " as if it had a NO_PAD attribute.");
3425 m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3426 m_skip_func = Rdb_key_def::skip_variable_length;
3427 }
3428 }
3429
3430 if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
3431 // The collation allows to store extra information in the unpack_info
3432 // which can be used to restore the original value from the
3433 // mem-comparable form.
3434 m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
3435 m_unpack_func = codec->m_unpack_func[idx];
3436 m_charset_codec = codec;
3437 } else if (use_unknown_collation) {
3438 // We have no clue about how this collation produces mem-comparable
3439 // form. Our way of restoring the original value is to keep a copy of
3440 // the original value in unpack_info.
3441 m_unpack_info_stores_value = true;
3442 m_make_unpack_info_func = is_varchar
3443 ? Rdb_key_def::make_unpack_unknown_varchar
3444 : Rdb_key_def::make_unpack_unknown;
3445 m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
3446 : Rdb_key_def::unpack_unknown;
3447 } else {
3448 // Same as above: we don't know how to restore the value from its
3449 // mem-comparable form.
3450 // Here, we just indicate to the SQL layer we can't do it.
3451 DBUG_ASSERT(m_unpack_func == nullptr);
3452 m_unpack_info_stores_value = false;
3453 res = false; // Indicate that index-only reads are not possible
3454 }
3455 }
3456
3457 // Make an adjustment: if this column is partially covered, tell the SQL
3458 // layer we can't do index-only scans. Later when we perform an index read,
3459 // we'll check on a record-by-record basis if we can do an index-only scan
3460 // or not.
3461 uint field_length;
3462 if (field->table) {
3463 field_length = field->table->field[field->field_index]->field_length;
3464 } else {
3465 field_length = field->field_length;
3466 }
3467
3468 if (field_length != key_length) {
3469 res = false;
3470 // If this index doesn't support covered bitmaps, then we won't know
3471 // during a read if the column is actually covered or not. If so, we need
3472 // to assume the column isn't covered and skip it during unpacking.
3473 //
3474 // If key_descr == NULL, then this is a dummy field and we probably don't
3475 // need to perform this step. However, to preserve the behavior before
3476 // this change, we'll only skip this step if we have an index which
3477 // supports covered bitmaps.
3478 if (!key_descr || !key_descr->use_covered_bitmap_format()) {
3479 m_unpack_func = nullptr;
3480 m_make_unpack_info_func = nullptr;
3481 m_unpack_info_stores_value = true;
3482 }
3483 }
3484 }
3485
3486 m_covered = res;
3487 return res;
3488 }
3489
get_field_in_table(const TABLE * const tbl) const3490 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
3491 return tbl->key_info[m_keynr].key_part[m_key_part].field;
3492 }
3493
fill_hidden_pk_val(uchar ** dst,const longlong hidden_pk_id) const3494 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
3495 const longlong hidden_pk_id) const {
3496 DBUG_ASSERT(m_max_image_len == 8);
3497
3498 String to;
3499 rdb_netstr_append_uint64(&to, hidden_pk_id);
3500 memcpy(*dst, to.ptr(), m_max_image_len);
3501
3502 *dst += m_max_image_len;
3503 }
3504
3505 ///////////////////////////////////////////////////////////////////////////////////////////
3506 // Rdb_ddl_manager
3507 ///////////////////////////////////////////////////////////////////////////////////////////
3508
~Rdb_tbl_def()3509 Rdb_tbl_def::~Rdb_tbl_def() {
3510 auto ddl_manager = rdb_get_ddl_manager();
3511 /* Don't free key definitions */
3512 if (m_key_descr_arr) {
3513 for (uint i = 0; i < m_key_count; i++) {
3514 if (ddl_manager && m_key_descr_arr[i]) {
3515 ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
3516 }
3517
3518 m_key_descr_arr[i] = nullptr;
3519 }
3520
3521 delete[] m_key_descr_arr;
3522 m_key_descr_arr = nullptr;
3523 }
3524 }
3525
3526 /*
3527 Put table definition DDL entry. Actual write is done at
3528 Rdb_dict_manager::commit.
3529
3530 We write
3531 dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
3532
3533 Where key entries are a tuple of
3534 ( cf_id, index_nr )
3535 */
3536
put_dict(Rdb_dict_manager * const dict,rocksdb::WriteBatch * const batch,const rocksdb::Slice & key)3537 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
3538 rocksdb::WriteBatch *const batch,
3539 const rocksdb::Slice &key) {
3540 StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
3541 indexes.alloc(Rdb_key_def::VERSION_SIZE +
3542 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
3543 rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
3544
3545 for (uint i = 0; i < m_key_count; i++) {
3546 const Rdb_key_def &kd = *m_key_descr_arr[i];
3547
3548 uchar flags =
3549 (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
3550 (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0);
3551
3552 const uint cf_id = kd.get_cf()->GetID();
3553 /*
3554 If cf_id already exists, cf_flags must be the same.
3555 To prevent race condition, reading/modifying/committing CF flags
3556 need to be protected by mutex (dict_manager->lock()).
3557 When RocksDB supports transaction with pessimistic concurrency
3558 control, we can switch to use it and removing mutex.
3559 */
3560 uint existing_cf_flags;
3561 const std::string cf_name = kd.get_cf()->GetName();
3562
3563 if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
3564 // For the purposes of comparison we'll clear the partitioning bit. The
3565 // intent here is to make sure that both partitioned and non-partitioned
3566 // tables can refer to the same CF.
3567 existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3568 flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3569
3570 if (existing_cf_flags != flags) {
3571 my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags,
3572 existing_cf_flags);
3573 return true;
3574 }
3575 } else {
3576 dict->add_cf_flags(batch, cf_id, flags);
3577 }
3578
3579 rdb_netstr_append_uint32(&indexes, cf_id);
3580
3581 uint32 index_number = kd.get_index_number();
3582 rdb_netstr_append_uint32(&indexes, index_number);
3583
3584 struct Rdb_index_info index_info;
3585 index_info.m_gl_index_id = {cf_id, index_number};
3586 index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
3587 index_info.m_index_type = kd.m_index_type;
3588 index_info.m_kv_version = kd.m_kv_format_version;
3589 index_info.m_index_flags = kd.m_index_flags_bitmap;
3590 index_info.m_ttl_duration = kd.m_ttl_duration;
3591
3592 dict->add_or_update_index_cf_mapping(batch, &index_info);
3593 }
3594
3595 const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
3596
3597 dict->put_key(batch, key, svalue);
3598 return false;
3599 }
3600
get_create_time()3601 time_t Rdb_tbl_def::get_create_time() {
3602 time_t create_time = m_create_time;
3603
3604 if (create_time == CREATE_TIME_UNKNOWN) {
3605 // Read it from the .frm file. It's not a problem if several threads do this
3606 // concurrently
3607 char path[FN_REFLEN];
3608 snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
3609 m_dbname.c_str(), m_tablename.c_str(), reg_ext);
3610 unpack_filename(path,path);
3611 MY_STAT f_stat;
3612 if (my_stat(path, &f_stat, MYF(0)))
3613 create_time = f_stat.st_ctime;
3614 else
3615 create_time = 0; // will be shown as SQL NULL
3616 m_create_time = create_time;
3617 }
3618 return create_time;
3619 }
3620
3621 // Length that each index flag takes inside the record.
3622 // Each index in the array maps to the enum INDEX_FLAG
3623 static const std::array<uint, 1> index_flag_lengths = {
3624 {ROCKSDB_SIZEOF_TTL_RECORD}};
3625
has_index_flag(uint32 index_flags,enum INDEX_FLAG flag)3626 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
3627 return flag & index_flags;
3628 }
3629
calculate_index_flag_offset(uint32 index_flags,enum INDEX_FLAG flag,uint * const length)3630 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
3631 enum INDEX_FLAG flag,
3632 uint *const length) {
3633 DBUG_ASSERT_IMP(flag != MAX_FLAG,
3634 Rdb_key_def::has_index_flag(index_flags, flag));
3635
3636 uint offset = 0;
3637 for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
3638 int mask = 1 << bit;
3639
3640 /* Exit once we've reached the proper flag */
3641 if (flag & mask) {
3642 if (length != nullptr) {
3643 *length = index_flag_lengths[bit];
3644 }
3645 break;
3646 }
3647
3648 if (index_flags & mask) {
3649 offset += index_flag_lengths[bit];
3650 }
3651 }
3652
3653 return offset;
3654 }
3655
write_index_flag_field(Rdb_string_writer * const buf,const uchar * const val,enum INDEX_FLAG flag) const3656 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
3657 const uchar *const val,
3658 enum INDEX_FLAG flag) const {
3659 uint len;
3660 uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
3661 DBUG_ASSERT(offset + len <= buf->get_current_pos());
3662 memcpy(buf->ptr() + offset, val, len);
3663 }
3664
check_if_is_mysql_system_table()3665 void Rdb_tbl_def::check_if_is_mysql_system_table() {
3666 static const char *const system_dbs[] = {
3667 "mysql",
3668 "performance_schema",
3669 "information_schema",
3670 };
3671
3672 m_is_mysql_system_table = false;
3673 for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
3674 if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
3675 m_is_mysql_system_table = true;
3676 break;
3677 }
3678 }
3679 }
3680
check_and_set_read_free_rpl_table()3681 void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
3682 m_is_read_free_rpl_table =
3683 #if 0 // MARIAROCKS_NOT_YET : read-free replication is not supported
3684 rdb_read_free_regex_handler.matches(base_tablename());
3685 #else
3686 false;
3687 #endif
3688 }
3689
set_name(const std::string & name)3690 void Rdb_tbl_def::set_name(const std::string &name) {
3691 int err MY_ATTRIBUTE((__unused__));
3692
3693 m_dbname_tablename = name;
3694 err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
3695 &m_partition);
3696 DBUG_ASSERT(err == 0);
3697
3698 check_if_is_mysql_system_table();
3699 }
3700
get_autoincr_gl_index_id()3701 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
3702 for (uint i = 0; i < m_key_count; i++) {
3703 auto &k = m_key_descr_arr[i];
3704 if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
3705 k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
3706 return k->get_gl_index_id();
3707 }
3708 }
3709
3710 // Every table must have a primary key, even if it's hidden.
3711 abort();
3712 return GL_INDEX_ID();
3713 }
3714
erase_index_num(const GL_INDEX_ID & gl_index_id)3715 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
3716 m_index_num_to_keydef.erase(gl_index_id);
3717 }
3718
add_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3719 void Rdb_ddl_manager::add_uncommitted_keydefs(
3720 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3721 mysql_rwlock_wrlock(&m_rwlock);
3722 for (const auto &index : indexes) {
3723 m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
3724 }
3725 mysql_rwlock_unlock(&m_rwlock);
3726 }
3727
remove_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3728 void Rdb_ddl_manager::remove_uncommitted_keydefs(
3729 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3730 mysql_rwlock_wrlock(&m_rwlock);
3731 for (const auto &index : indexes) {
3732 m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
3733 }
3734 mysql_rwlock_unlock(&m_rwlock);
3735 }
3736
3737 namespace // anonymous namespace = not visible outside this source file
3738 {
3739 struct Rdb_validate_tbls : public Rdb_tables_scanner {
3740 using tbl_info_t = std::pair<std::string, bool>;
3741 using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
3742
3743 tbl_list_t m_list;
3744
3745 int add_table(Rdb_tbl_def *tdef) override;
3746
3747 bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
3748
3749 bool scan_for_frms(const std::string &datadir, const std::string &dbname,
3750 bool *has_errors);
3751
3752 bool check_frm_file(const std::string &fullpath, const std::string &dbname,
3753 const std::string &tablename, bool *has_errors);
3754 };
3755 } // anonymous namespace
3756
3757 /*
3758 Get a list of tables that we expect to have .frm files for. This will use the
3759 information just read from the RocksDB data dictionary.
3760 */
add_table(Rdb_tbl_def * tdef)3761 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
3762 DBUG_ASSERT(tdef != nullptr);
3763
3764 /* Add the database/table into the list that are not temp table */
3765 if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) {
3766 bool is_partition = tdef->base_partition().size() != 0;
3767 m_list[tdef->base_dbname()].insert(
3768 tbl_info_t(tdef->base_tablename(), is_partition));
3769 }
3770
3771 return HA_EXIT_SUCCESS;
3772 }
3773
3774 /*
3775 Access the .frm file for this dbname/tablename and see if it is a RocksDB
3776 table (or partition table).
3777 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)3778 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
3779 const std::string &dbname,
3780 const std::string &tablename,
3781 bool *has_errors) {
3782 /* Check this .frm file to see what engine it uses */
3783 String fullfilename(fullpath.c_str(), &my_charset_bin);
3784 fullfilename.append(FN_DIRSEP);
3785 fullfilename.append(tablename.c_str());
3786 fullfilename.append(".frm");
3787
3788 /*
3789 This function will return the legacy_db_type of the table. Currently
3790 it does not reference the first parameter (THD* thd), but if it ever
3791 did in the future we would need to make a version that does it without
3792 the connection handle as we don't have one here.
3793 */
3794 char eng_type_buf[NAME_CHAR_LEN+1];
3795 LEX_CSTRING eng_type_str = {eng_type_buf, 0};
3796 enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type_str);
3797 if (type == TABLE_TYPE_UNKNOWN) {
3798 // NO_LINT_DEBUG
3799 sql_print_warning("RocksDB: Failed to open/read .from file: %s",
3800 fullfilename.ptr());
3801 return false;
3802 }
3803
3804 if (type == TABLE_TYPE_NORMAL) {
3805 /* For a RocksDB table do we have a reference in the data dictionary? */
3806 if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) {
3807 /*
3808 Attempt to remove the table entry from the list of tables. If this
3809 fails then we know we had a .frm file that wasn't registered in RocksDB.
3810 */
3811 tbl_info_t element(tablename, false);
3812 if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
3813 // NO_LINT_DEBUG
3814 sql_print_warning(
3815 "RocksDB: Schema mismatch - "
3816 "A .frm file exists for table %s.%s, "
3817 "but that table is not registered in RocksDB",
3818 dbname.c_str(), tablename.c_str());
3819 *has_errors = true;
3820 }
3821 } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) {
3822 /*
3823 For partition tables, see if it is in the m_list as a partition,
3824 but don't generate an error if it isn't there - we don't know that the
3825 .frm is for RocksDB.
3826 */
3827 if (m_list.count(dbname) > 0) {
3828 m_list[dbname].erase(tbl_info_t(tablename, true));
3829 }
3830 }
3831 }
3832
3833 return true;
3834 }
3835
3836 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)3837 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
3838 const std::string &dbname,
3839 bool *has_errors) {
3840 bool result = true;
3841 std::string fullpath = datadir + dbname;
3842 struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
3843
3844 /* Access the directory */
3845 if (dir_info == nullptr) {
3846 // NO_LINT_DEBUG
3847 sql_print_warning("RocksDB: Could not open database directory: %s",
3848 fullpath.c_str());
3849 return false;
3850 }
3851
3852 /* Scan through the files in the directory */
3853 struct fileinfo *file_info = dir_info->dir_entry;
3854 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3855 /* Find .frm files that are not temp files (those that contain '#sql') */
3856 const char *ext = strrchr(file_info->name, '.');
3857 if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
3858 strcmp(ext, ".frm") == 0) {
3859 std::string tablename =
3860 std::string(file_info->name, ext - file_info->name);
3861
3862 /* Check to see if the .frm file is from RocksDB */
3863 if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
3864 result = false;
3865 break;
3866 }
3867 }
3868 }
3869
3870 /* Remove any databases who have no more tables listed */
3871 if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
3872 m_list.erase(dbname);
3873 }
3874
3875 /* Release the directory entry */
3876 my_dirend(dir_info);
3877
3878 return result;
3879 }
3880
3881 /*
3882 Scan the datadir for all databases (subdirectories) and get a list of .frm
3883 files they contain
3884 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)3885 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
3886 bool *has_errors) {
3887 bool result = true;
3888 struct st_my_dir *dir_info;
3889 struct fileinfo *file_info;
3890
3891 dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
3892 if (dir_info == nullptr) {
3893 // NO_LINT_DEBUG
3894 sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
3895 return false;
3896 }
3897
3898 file_info = dir_info->dir_entry;
3899 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3900 /* Ignore files/dirs starting with '.' */
3901 if (file_info->name[0] == '.') continue;
3902
3903 /* Ignore all non-directory files */
3904 if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
3905
3906 /* Scan all the .frm files in the directory */
3907 if (!scan_for_frms(datadir, file_info->name, has_errors)) {
3908 result = false;
3909 break;
3910 }
3911 }
3912
3913 /* Release the directory info */
3914 my_dirend(dir_info);
3915
3916 return result;
3917 }
3918
3919 /*
3920 Validate that all auto increment values in the data dictionary are on a
3921 supported version.
3922 */
validate_auto_incr()3923 bool Rdb_ddl_manager::validate_auto_incr() {
3924 std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
3925
3926 uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
3927 rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
3928 const rocksdb::Slice auto_incr_entry_slice(
3929 reinterpret_cast<char *>(auto_incr_entry),
3930 Rdb_key_def::INDEX_NUMBER_SIZE);
3931 for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
3932 const rocksdb::Slice key = it->key();
3933 const rocksdb::Slice val = it->value();
3934 GL_INDEX_ID gl_index_id;
3935
3936 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
3937 memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
3938 break;
3939 }
3940
3941 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
3942 return false;
3943 }
3944
3945 if (val.size() <= Rdb_key_def::VERSION_SIZE) {
3946 return false;
3947 }
3948
3949 // Check if we have orphaned entries for whatever reason by cross
3950 // referencing ddl entries.
3951 auto ptr = reinterpret_cast<const uchar *>(key.data());
3952 ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
3953 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
3954 if (!m_dict->get_index_info(gl_index_id, nullptr)) {
3955 // NO_LINT_DEBUG
3956 sql_print_warning(
3957 "RocksDB: AUTOINC mismatch - "
3958 "Index number (%u, %u) found in AUTOINC "
3959 "but does not exist as a DDL entry",
3960 gl_index_id.cf_id, gl_index_id.index_id);
3961 return false;
3962 }
3963
3964 ptr = reinterpret_cast<const uchar *>(val.data());
3965 const int version = rdb_netbuf_read_uint16(&ptr);
3966 if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
3967 // NO_LINT_DEBUG
3968 sql_print_warning(
3969 "RocksDB: AUTOINC mismatch - "
3970 "Index number (%u, %u) found in AUTOINC "
3971 "is on unsupported version %d",
3972 gl_index_id.cf_id, gl_index_id.index_id, version);
3973 return false;
3974 }
3975 }
3976
3977 if (!it->status().ok()) {
3978 return false;
3979 }
3980
3981 return true;
3982 }
3983
3984 /*
3985 Validate that all the tables in the RocksDB database dictionary match the .frm
3986 files in the datadir
3987 */
validate_schemas(void)3988 bool Rdb_ddl_manager::validate_schemas(void) {
3989 bool has_errors = false;
3990 const std::string datadir = std::string(mysql_real_data_home);
3991 Rdb_validate_tbls table_list;
3992
3993 /* Get the list of tables from the database dictionary */
3994 if (scan_for_tables(&table_list) != 0) {
3995 return false;
3996 }
3997
3998 /* Compare that to the list of actual .frm files */
3999 if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
4000 return false;
4001 }
4002
4003 /*
4004 Any tables left in the tables list are ones that are registered in RocksDB
4005 but don't have .frm files.
4006 */
4007 for (const auto &db : table_list.m_list) {
4008 for (const auto &table : db.second) {
4009 // NO_LINT_DEBUG
4010 sql_print_warning(
4011 "RocksDB: Schema mismatch - "
4012 "Table %s.%s is registered in RocksDB "
4013 "but does not have a .frm file",
4014 db.first.c_str(), table.first.c_str());
4015 has_errors = true;
4016 }
4017 }
4018
4019 return !has_errors;
4020 }
4021
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t validate_tables)4022 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4023 Rdb_cf_manager *const cf_manager,
4024 const uint32_t validate_tables) {
4025 m_dict = dict_arg;
4026 mysql_rwlock_init(0, &m_rwlock);
4027
4028 /* Read the data dictionary and populate the hash */
4029 uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4030 rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4031 const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
4032 Rdb_key_def::INDEX_NUMBER_SIZE);
4033
4034 /* Reading data dictionary should always skip bloom filter */
4035 rocksdb::Iterator *it = m_dict->new_iterator();
4036 int i = 0;
4037
4038 uint max_index_id_in_dict = 0;
4039 m_dict->get_max_index_id(&max_index_id_in_dict);
4040
4041 for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
4042 const uchar *ptr;
4043 const uchar *ptr_end;
4044 const rocksdb::Slice key = it->key();
4045 const rocksdb::Slice val = it->value();
4046
4047 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4048 memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4049 break;
4050 }
4051
4052 if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
4053 // NO_LINT_DEBUG
4054 sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
4055 (int)key.size());
4056 return true;
4057 }
4058
4059 Rdb_tbl_def *const tdef =
4060 new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
4061
4062 // Now, read the DDLs.
4063 const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
4064 if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
4065 // NO_LINT_DEBUG
4066 sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
4067 tdef->full_tablename().c_str());
4068 return true;
4069 }
4070 tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
4071 tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
4072
4073 ptr = reinterpret_cast<const uchar *>(val.data());
4074 const int version = rdb_netbuf_read_uint16(&ptr);
4075 if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
4076 // NO_LINT_DEBUG
4077 sql_print_error(
4078 "RocksDB: DDL ENTRY Version was not expected."
4079 "Expected: %d, Actual: %d",
4080 Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
4081 return true;
4082 }
4083 ptr_end = ptr + real_val_size;
4084 for (uint keyno = 0; ptr < ptr_end; keyno++) {
4085 GL_INDEX_ID gl_index_id;
4086 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4087 uint flags = 0;
4088 struct Rdb_index_info index_info;
4089 if (!m_dict->get_index_info(gl_index_id, &index_info)) {
4090 // NO_LINT_DEBUG
4091 sql_print_error(
4092 "RocksDB: Could not get index information "
4093 "for Index Number (%u,%u), table %s",
4094 gl_index_id.cf_id, gl_index_id.index_id,
4095 tdef->full_tablename().c_str());
4096 return true;
4097 }
4098 if (max_index_id_in_dict < gl_index_id.index_id) {
4099 // NO_LINT_DEBUG
4100 sql_print_error(
4101 "RocksDB: Found max index id %u from data dictionary "
4102 "but also found larger index id %u from dictionary. "
4103 "This should never happen and possibly a bug.",
4104 max_index_id_in_dict, gl_index_id.index_id);
4105 return true;
4106 }
4107 if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
4108 // NO_LINT_DEBUG
4109 sql_print_error(
4110 "RocksDB: Could not get Column Family Flags "
4111 "for CF Number %d, table %s",
4112 gl_index_id.cf_id, tdef->full_tablename().c_str());
4113 return true;
4114 }
4115
4116 if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
4117 // The per-index cf option is deprecated. Make sure we don't have the
4118 // flag set in any existing database. NO_LINT_DEBUG
4119 // NO_LINT_DEBUG
4120 sql_print_error(
4121 "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
4122 "number %d, table %s",
4123 gl_index_id.cf_id, tdef->full_tablename().c_str());
4124 }
4125
4126 rocksdb::ColumnFamilyHandle *const cfh =
4127 cf_manager->get_cf(gl_index_id.cf_id);
4128 DBUG_ASSERT(cfh != nullptr);
4129
4130 uint32 ttl_rec_offset =
4131 Rdb_key_def::has_index_flag(index_info.m_index_flags,
4132 Rdb_key_def::TTL_FLAG)
4133 ? Rdb_key_def::calculate_index_flag_offset(
4134 index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
4135 : UINT_MAX;
4136
4137 /*
4138 We can't fully initialize Rdb_key_def object here, because full
4139 initialization requires that there is an open TABLE* where we could
4140 look at Field* objects and set max_length and other attributes
4141 */
4142 tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
4143 gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
4144 index_info.m_index_type, index_info.m_kv_version,
4145 flags & Rdb_key_def::REVERSE_CF_FLAG,
4146 flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
4147 m_dict->get_stats(gl_index_id), index_info.m_index_flags,
4148 ttl_rec_offset, index_info.m_ttl_duration);
4149 }
4150 put(tdef);
4151 i++;
4152 }
4153
4154 /*
4155 If validate_tables is greater than 0 run the validation. Only fail the
4156 initialzation if the setting is 1. If the setting is 2 we continue.
4157 */
4158 if (validate_tables > 0) {
4159 std::string msg;
4160 if (!validate_schemas()) {
4161 msg =
4162 "RocksDB: Problems validating data dictionary "
4163 "against .frm files, exiting";
4164 } else if (!validate_auto_incr()) {
4165 msg =
4166 "RocksDB: Problems validating auto increment values in "
4167 "data dictionary, exiting";
4168 }
4169 if (validate_tables == 1 && !msg.empty()) {
4170 // NO_LINT_DEBUG
4171 sql_print_error("%s", msg.c_str());
4172 return true;
4173 }
4174 }
4175
4176 // index ids used by applications should not conflict with
4177 // data dictionary index ids
4178 if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4179 max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4180 }
4181
4182 m_sequence.init(max_index_id_in_dict + 1);
4183
4184 if (!it->status().ok()) {
4185 rdb_log_status_error(it->status(), "Table_store load error");
4186 return true;
4187 }
4188 delete it;
4189 // NO_LINT_DEBUG
4190 sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4191 i);
4192 return false;
4193 }
4194
find(const std::string & table_name,const bool lock)4195 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4196 const bool lock) {
4197 if (lock) {
4198 mysql_rwlock_rdlock(&m_rwlock);
4199 }
4200
4201 Rdb_tbl_def *rec = nullptr;
4202 const auto it = m_ddl_map.find(table_name);
4203 if (it != m_ddl_map.end()) {
4204 rec = it->second;
4205 }
4206
4207 if (lock) {
4208 mysql_rwlock_unlock(&m_rwlock);
4209 }
4210
4211 return rec;
4212 }
4213
4214 // this is a safe version of the find() function below. It acquires a read
4215 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4216 // are finding it. Copying it into 'ret' increments the count making sure
4217 // that the object will not be discarded until we are finished with it.
safe_find(GL_INDEX_ID gl_index_id)4218 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
4219 GL_INDEX_ID gl_index_id) {
4220 std::shared_ptr<const Rdb_key_def> ret(nullptr);
4221
4222 mysql_rwlock_rdlock(&m_rwlock);
4223
4224 auto it = m_index_num_to_keydef.find(gl_index_id);
4225 if (it != m_index_num_to_keydef.end()) {
4226 const auto table_def = find(it->second.first, false);
4227 if (table_def && it->second.second < table_def->m_key_count) {
4228 const auto &kd = table_def->m_key_descr_arr[it->second.second];
4229 if (kd->max_storage_fmt_length() != 0) {
4230 ret = kd;
4231 }
4232 }
4233 } else {
4234 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4235 if (it != m_index_num_to_uncommitted_keydef.end()) {
4236 const auto &kd = it->second;
4237 if (kd->max_storage_fmt_length() != 0) {
4238 ret = kd;
4239 }
4240 }
4241 }
4242
4243 mysql_rwlock_unlock(&m_rwlock);
4244
4245 return ret;
4246 }
4247
4248 // this method assumes at least read-only lock on m_rwlock
find(GL_INDEX_ID gl_index_id)4249 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
4250 GL_INDEX_ID gl_index_id) {
4251 auto it = m_index_num_to_keydef.find(gl_index_id);
4252 if (it != m_index_num_to_keydef.end()) {
4253 auto table_def = find(it->second.first, false);
4254 if (table_def) {
4255 if (it->second.second < table_def->m_key_count) {
4256 return table_def->m_key_descr_arr[it->second.second];
4257 }
4258 }
4259 } else {
4260 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4261 if (it != m_index_num_to_uncommitted_keydef.end()) {
4262 return it->second;
4263 }
4264 }
4265
4266 static std::shared_ptr<Rdb_key_def> empty = nullptr;
4267
4268 return empty;
4269 }
4270
4271 // this method returns the name of the table based on an index id. It acquires
4272 // a read lock on m_rwlock.
safe_get_table_name(const GL_INDEX_ID & gl_index_id)4273 const std::string Rdb_ddl_manager::safe_get_table_name(
4274 const GL_INDEX_ID &gl_index_id) {
4275 std::string ret;
4276 mysql_rwlock_rdlock(&m_rwlock);
4277 auto it = m_index_num_to_keydef.find(gl_index_id);
4278 if (it != m_index_num_to_keydef.end()) {
4279 ret = it->second.first;
4280 }
4281 mysql_rwlock_unlock(&m_rwlock);
4282 return ret;
4283 }
4284
set_stats(const std::unordered_map<GL_INDEX_ID,Rdb_index_stats> & stats)4285 void Rdb_ddl_manager::set_stats(
4286 const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4287 mysql_rwlock_wrlock(&m_rwlock);
4288 for (auto src : stats) {
4289 const auto &keydef = find(src.second.m_gl_index_id);
4290 if (keydef) {
4291 keydef->m_stats = src.second;
4292 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4293 }
4294 }
4295 mysql_rwlock_unlock(&m_rwlock);
4296 }
4297
adjust_stats(const std::vector<Rdb_index_stats> & new_data,const std::vector<Rdb_index_stats> & deleted_data)4298 void Rdb_ddl_manager::adjust_stats(
4299 const std::vector<Rdb_index_stats> &new_data,
4300 const std::vector<Rdb_index_stats> &deleted_data) {
4301 mysql_rwlock_wrlock(&m_rwlock);
4302 int i = 0;
4303 for (const auto &data : {new_data, deleted_data}) {
4304 for (const auto &src : data) {
4305 const auto &keydef = find(src.m_gl_index_id);
4306 if (keydef) {
4307 keydef->m_stats.m_distinct_keys_per_prefix.resize(
4308 keydef->get_key_parts());
4309 keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4310 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4311 }
4312 }
4313 i++;
4314 }
4315 const bool should_save_stats = !m_stats2store.empty();
4316 mysql_rwlock_unlock(&m_rwlock);
4317 if (should_save_stats) {
4318 // Queue an async persist_stats(false) call to the background thread.
4319 rdb_queue_save_stats_request();
4320 }
4321 }
4322
persist_stats(const bool sync)4323 void Rdb_ddl_manager::persist_stats(const bool sync) {
4324 mysql_rwlock_wrlock(&m_rwlock);
4325 const auto local_stats2store = std::move(m_stats2store);
4326 m_stats2store.clear();
4327 mysql_rwlock_unlock(&m_rwlock);
4328
4329 // Persist stats
4330 const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4331 std::vector<Rdb_index_stats> stats;
4332 std::transform(local_stats2store.begin(), local_stats2store.end(),
4333 std::back_inserter(stats),
4334 [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4335 return s.second;
4336 });
4337 m_dict->add_stats(wb.get(), stats);
4338 m_dict->commit(wb.get(), sync);
4339 }
4340
4341 /*
4342 Put table definition of `tbl` into the mapping, and also write it to the
4343 on-disk data dictionary.
4344 */
4345
put_and_write(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch)4346 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
4347 rocksdb::WriteBatch *const batch) {
4348 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
4349
4350 buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4351
4352 const std::string &dbname_tablename = tbl->full_tablename();
4353 buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4354
4355 int res;
4356 if ((res = tbl->put_dict(m_dict, batch, buf_writer.to_slice()))) {
4357 return res;
4358 }
4359 if ((res = put(tbl))) {
4360 return res;
4361 }
4362 return HA_EXIT_SUCCESS;
4363 }
4364
4365 /* Return 0 - ok, other value - error */
4366 /* TODO:
4367 This function modifies m_ddl_map and m_index_num_to_keydef.
4368 However, these changes need to be reversed if dict_manager.commit fails
4369 See the discussion here: https://reviews.facebook.net/D35925#inline-259167
4370 Tracked by https://github.com/facebook/mysql-5.6/issues/33
4371 */
put(Rdb_tbl_def * const tbl,const bool lock)4372 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
4373 Rdb_tbl_def *rec;
4374 const std::string &dbname_tablename = tbl->full_tablename();
4375
4376 if (lock) mysql_rwlock_wrlock(&m_rwlock);
4377
4378 // We have to do this find because 'tbl' is not yet in the list. We need
4379 // to find the one we are replacing ('rec')
4380 rec = find(dbname_tablename, false);
4381 if (rec) {
4382 // Free the old record.
4383 delete rec;
4384 m_ddl_map.erase(dbname_tablename);
4385 }
4386 m_ddl_map.emplace(dbname_tablename, tbl);
4387
4388 for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
4389 m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
4390 std::make_pair(dbname_tablename, keyno);
4391 }
4392 tbl->check_and_set_read_free_rpl_table();
4393
4394 if (lock) mysql_rwlock_unlock(&m_rwlock);
4395 return 0;
4396 }
4397
remove(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch,const bool lock)4398 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
4399 rocksdb::WriteBatch *const batch,
4400 const bool lock) {
4401 if (lock) mysql_rwlock_wrlock(&m_rwlock);
4402
4403 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
4404 key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4405 const std::string &dbname_tablename = tbl->full_tablename();
4406 key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4407
4408 m_dict->delete_key(batch, key_writer.to_slice());
4409
4410 const auto it = m_ddl_map.find(dbname_tablename);
4411 if (it != m_ddl_map.end()) {
4412 // Free Rdb_tbl_def
4413 delete it->second;
4414
4415 m_ddl_map.erase(it);
4416 }
4417
4418 if (lock) mysql_rwlock_unlock(&m_rwlock);
4419 }
4420
rename(const std::string & from,const std::string & to,rocksdb::WriteBatch * const batch)4421 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
4422 rocksdb::WriteBatch *const batch) {
4423 Rdb_tbl_def *rec;
4424 Rdb_tbl_def *new_rec;
4425 bool res = true;
4426 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
4427
4428 mysql_rwlock_wrlock(&m_rwlock);
4429 if (!(rec = find(from, false))) {
4430 mysql_rwlock_unlock(&m_rwlock);
4431 return true;
4432 }
4433
4434 new_rec = new Rdb_tbl_def(to);
4435
4436 new_rec->m_key_count = rec->m_key_count;
4437 new_rec->m_auto_incr_val =
4438 rec->m_auto_incr_val.load(std::memory_order_relaxed);
4439 new_rec->m_key_descr_arr = rec->m_key_descr_arr;
4440
4441 new_rec->m_hidden_pk_val =
4442 rec->m_hidden_pk_val.load(std::memory_order_relaxed);
4443
4444 // so that it's not free'd when deleting the old rec
4445 rec->m_key_descr_arr = nullptr;
4446
4447 // Create a new key
4448 new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4449
4450 const std::string &dbname_tablename = new_rec->full_tablename();
4451 new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4452
4453 // Create a key to add
4454 if (!new_rec->put_dict(m_dict, batch, new_buf_writer.to_slice())) {
4455 remove(rec, batch, false);
4456 put(new_rec, false);
4457 res = false; // ok
4458 }
4459
4460 mysql_rwlock_unlock(&m_rwlock);
4461 return res;
4462 }
4463
cleanup()4464 void Rdb_ddl_manager::cleanup() {
4465 for (const auto &kv : m_ddl_map) {
4466 delete kv.second;
4467 }
4468 m_ddl_map.clear();
4469
4470 mysql_rwlock_destroy(&m_rwlock);
4471 m_sequence.cleanup();
4472 }
4473
scan_for_tables(Rdb_tables_scanner * const tables_scanner)4474 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
4475 int ret;
4476 Rdb_tbl_def *rec;
4477
4478 DBUG_ASSERT(tables_scanner != nullptr);
4479
4480 mysql_rwlock_rdlock(&m_rwlock);
4481
4482 ret = 0;
4483
4484 for (const auto &kv : m_ddl_map) {
4485 rec = kv.second;
4486 ret = tables_scanner->add_table(rec);
4487 if (ret) break;
4488 }
4489
4490 mysql_rwlock_unlock(&m_rwlock);
4491 return ret;
4492 }
4493
4494 /*
4495 Rdb_binlog_manager class implementation
4496 */
4497
init(Rdb_dict_manager * const dict_arg)4498 bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) {
4499 DBUG_ASSERT(dict_arg != nullptr);
4500 m_dict = dict_arg;
4501
4502 m_key_writer.reset();
4503 m_key_writer.write_index(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER);
4504 m_key_slice = m_key_writer.to_slice();
4505 return false;
4506 }
4507
cleanup()4508 void Rdb_binlog_manager::cleanup() {}
4509
4510 /**
4511 Set binlog name, pos and optionally gtid into WriteBatch.
4512 This function should be called as part of transaction commit,
4513 since binlog info is set only at transaction commit.
4514 Actual write into RocksDB is not done here, so checking if
4515 write succeeded or not is not possible here.
4516 @param binlog_name Binlog name
4517 @param binlog_pos Binlog pos
4518 @param batch WriteBatch
4519 */
update(const char * const binlog_name,const my_off_t binlog_pos,rocksdb::WriteBatchBase * const batch)4520 void Rdb_binlog_manager::update(const char *const binlog_name,
4521 const my_off_t binlog_pos,
4522 rocksdb::WriteBatchBase *const batch) {
4523 if (binlog_name && binlog_pos) {
4524 // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024
4525 const size_t RDB_MAX_BINLOG_INFO_LEN = 1024;
4526 Rdb_buf_writer<RDB_MAX_BINLOG_INFO_LEN> value_writer;
4527
4528 // store version
4529 value_writer.write_uint16(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION);
4530
4531 // store binlog file name length
4532 DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN);
4533 const uint16_t binlog_name_len = strlen(binlog_name);
4534 value_writer.write_uint16(binlog_name_len);
4535
4536 // store binlog file name
4537 value_writer.write(binlog_name, binlog_name_len);
4538
4539 // store binlog pos
4540 value_writer.write_uint32(binlog_pos);
4541
4542 #ifdef MARIADB_MERGE_2019
4543 // store binlog gtid length.
4544 // If gtid was not set, store 0 instead
4545 const uint16_t binlog_max_gtid_len =
4546 binlog_max_gtid ? strlen(binlog_max_gtid) : 0;
4547 value_writer.write_uint16(binlog_max_gtid_len);
4548
4549 if (binlog_max_gtid_len > 0) {
4550 // store binlog gtid
4551 value_writer.write(binlog_max_gtid, binlog_max_gtid_len);
4552 }
4553 #endif
4554
4555 m_dict->put_key(batch, m_key_slice, value_writer.to_slice());
4556 }
4557 }
4558
4559 /**
4560 Read binlog committed entry stored in RocksDB, then unpack
4561 @param[OUT] binlog_name Binlog name
4562 @param[OUT] binlog_pos Binlog pos
4563 @param[OUT] binlog_gtid Binlog GTID
4564 @return
4565 true is binlog info was found (valid behavior)
4566 false otherwise
4567 */
read(char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4568 bool Rdb_binlog_manager::read(char *const binlog_name,
4569 my_off_t *const binlog_pos,
4570 char *const binlog_gtid) const {
4571 bool ret = false;
4572 if (binlog_name) {
4573 std::string value;
4574 rocksdb::Status status = m_dict->get_value(m_key_slice, &value);
4575 if (status.ok()) {
4576 if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos,
4577 binlog_gtid)) {
4578 ret = true;
4579 }
4580 }
4581 }
4582 return ret;
4583 }
4584
4585 /**
4586 Unpack value then split into binlog_name, binlog_pos (and binlog_gtid)
4587 @param[IN] value Binlog state info fetched from RocksDB
4588 @param[OUT] binlog_name Binlog name
4589 @param[OUT] binlog_pos Binlog pos
4590 @param[OUT] binlog_gtid Binlog GTID
4591 @return true on error
4592 */
unpack_value(const uchar * const value,size_t value_size_arg,char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4593 bool Rdb_binlog_manager::unpack_value(const uchar *const value,
4594 size_t value_size_arg,
4595 char *const binlog_name,
4596 my_off_t *const binlog_pos,
4597 char *const binlog_gtid) const {
4598 uint pack_len = 0;
4599 intmax_t value_size= value_size_arg;
4600
4601 DBUG_ASSERT(binlog_pos != nullptr);
4602
4603 if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0)
4604 return true;
4605 // read version
4606 const uint16_t version = rdb_netbuf_to_uint16(value);
4607
4608 pack_len += Rdb_key_def::VERSION_SIZE;
4609 if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION) return true;
4610
4611 if ((value_size -= sizeof(uint16)) < 0)
4612 return true;
4613
4614 // read binlog file name length
4615 const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len);
4616 pack_len += sizeof(uint16);
4617
4618 if (binlog_name_len >= (FN_REFLEN+1))
4619 return true;
4620
4621 if ((value_size -= binlog_name_len) < 0)
4622 return true;
4623
4624 if (binlog_name_len) {
4625 // read and set binlog name
4626 memcpy(binlog_name, value + pack_len, binlog_name_len);
4627 binlog_name[binlog_name_len] = '\0';
4628 pack_len += binlog_name_len;
4629
4630 if ((value_size -= sizeof(uint32)) < 0)
4631 return true;
4632 // read and set binlog pos
4633 *binlog_pos = rdb_netbuf_to_uint32(value + pack_len);
4634 pack_len += sizeof(uint32);
4635
4636 if ((value_size -= sizeof(uint16)) < 0)
4637 return true;
4638 // read gtid length
4639 const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len);
4640 pack_len += sizeof(uint16);
4641
4642 if (binlog_gtid_len >= GTID_BUF_LEN)
4643 return true;
4644 if ((value_size -= binlog_gtid_len) < 0)
4645 return true;
4646
4647 if (binlog_gtid && binlog_gtid_len > 0) {
4648 // read and set gtid
4649 memcpy(binlog_gtid, value + pack_len, binlog_gtid_len);
4650 binlog_gtid[binlog_gtid_len] = '\0';
4651 pack_len += binlog_gtid_len;
4652 }
4653 }
4654 return false;
4655 }
4656
4657 /**
4658 Inserts a row into mysql.slave_gtid_info table. Doing this inside
4659 storage engine is more efficient than inserting/updating through MySQL.
4660
4661 @param[IN] id Primary key of the table.
4662 @param[IN] db Database name. This is column 2 of the table.
4663 @param[IN] gtid Gtid in human readable form. This is column 3 of the table.
4664 @param[IN] write_batch Handle to storage engine writer.
4665 */
update_slave_gtid_info(const uint id,const char * const db,const char * const gtid,rocksdb::WriteBatchBase * const write_batch)4666 void Rdb_binlog_manager::update_slave_gtid_info(
4667 const uint id, const char *const db, const char *const gtid,
4668 rocksdb::WriteBatchBase *const write_batch) {
4669 if (id && db && gtid) {
4670 // Make sure that if the slave_gtid_info table exists we have a
4671 // pointer to it via m_slave_gtid_info_tbl.
4672 if (!m_slave_gtid_info_tbl.load()) {
4673 m_slave_gtid_info_tbl.store(
4674 rdb_get_ddl_manager()->find("mysql.slave_gtid_info"));
4675 }
4676 if (!m_slave_gtid_info_tbl.load()) {
4677 // slave_gtid_info table is not present. Simply return.
4678 return;
4679 }
4680 DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1);
4681
4682 const std::shared_ptr<const Rdb_key_def> &kd =
4683 m_slave_gtid_info_tbl.load()->m_key_descr_arr[0];
4684 String value;
4685
4686 // Build key
4687 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE + 4> key_writer;
4688 key_writer.write_index(kd->get_index_number());
4689 key_writer.write_uint32(id);
4690
4691 // Build value
4692 Rdb_buf_writer<128> value_writer;
4693 DBUG_ASSERT(gtid);
4694 const uint db_len = strlen(db);
4695 const uint gtid_len = strlen(gtid);
4696 // 1 byte used for flags. Empty here.
4697 value_writer.write_byte(0);
4698
4699 // Write column 1.
4700 DBUG_ASSERT(strlen(db) <= 64);
4701 value_writer.write_byte(db_len);
4702 value_writer.write(db, db_len);
4703
4704 // Write column 2.
4705 DBUG_ASSERT(gtid_len <= 56);
4706 value_writer.write_byte(gtid_len);
4707 value_writer.write(gtid, gtid_len);
4708
4709 write_batch->Put(kd->get_cf(), key_writer.to_slice(),
4710 value_writer.to_slice());
4711 }
4712 }
4713
init(rocksdb::TransactionDB * const rdb_dict,Rdb_cf_manager * const cf_manager)4714 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
4715 Rdb_cf_manager *const cf_manager) {
4716 DBUG_ASSERT(rdb_dict != nullptr);
4717 DBUG_ASSERT(cf_manager != nullptr);
4718
4719 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
4720
4721 m_db = rdb_dict;
4722
4723 m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME);
4724 rocksdb::ColumnFamilyHandle *default_cfh =
4725 cf_manager->get_cf(DEFAULT_CF_NAME);
4726
4727 // System CF and default CF should be initialized
4728 if (m_system_cfh == nullptr || default_cfh == nullptr) {
4729 return HA_EXIT_FAILURE;
4730 }
4731
4732 rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
4733
4734 m_key_slice_max_index_id =
4735 rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
4736 Rdb_key_def::INDEX_NUMBER_SIZE);
4737
4738 resume_drop_indexes();
4739 rollback_ongoing_index_creation();
4740
4741 // Initialize system CF and default CF flags
4742 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
4743 rocksdb::WriteBatch *const batch = wb.get();
4744
4745 add_cf_flags(batch, m_system_cfh->GetID(), 0);
4746 add_cf_flags(batch, default_cfh->GetID(), 0);
4747 commit(batch);
4748
4749 return HA_EXIT_SUCCESS;
4750 }
4751
begin() const4752 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
4753 return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
4754 }
4755
put_key(rocksdb::WriteBatchBase * const batch,const rocksdb::Slice & key,const rocksdb::Slice & value) const4756 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
4757 const rocksdb::Slice &key,
4758 const rocksdb::Slice &value) const {
4759 batch->Put(m_system_cfh, key, value);
4760 }
4761
get_value(const rocksdb::Slice & key,std::string * const value) const4762 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
4763 std::string *const value) const {
4764 rocksdb::ReadOptions options;
4765 options.total_order_seek = true;
4766 return m_db->Get(options, m_system_cfh, key, value);
4767 }
4768
delete_key(rocksdb::WriteBatchBase * batch,const rocksdb::Slice & key) const4769 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
4770 const rocksdb::Slice &key) const {
4771 batch->Delete(m_system_cfh, key);
4772 }
4773
new_iterator() const4774 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
4775 /* Reading data dictionary should always skip bloom filter */
4776 rocksdb::ReadOptions read_options;
4777 read_options.total_order_seek = true;
4778 return m_db->NewIterator(read_options, m_system_cfh);
4779 }
4780
commit(rocksdb::WriteBatch * const batch,const bool sync) const4781 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
4782 const bool sync) const {
4783 if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
4784 int res = HA_EXIT_SUCCESS;
4785 rocksdb::WriteOptions options;
4786 options.sync = sync;
4787 rocksdb::TransactionDBWriteOptimizations optimize;
4788 optimize.skip_concurrency_control = true;
4789 rocksdb::Status s = m_db->Write(options, optimize, batch);
4790 res = !s.ok(); // we return true when something failed
4791 if (res) {
4792 rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
4793 }
4794 batch->Clear();
4795 return res;
4796 }
4797
dump_index_id(uchar * const netbuf,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id)4798 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
4799 Rdb_key_def::DATA_DICT_TYPE dict_type,
4800 const GL_INDEX_ID &gl_index_id) {
4801 rdb_netbuf_store_uint32(netbuf, dict_type);
4802 rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
4803 gl_index_id.cf_id);
4804 rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
4805 gl_index_id.index_id);
4806 }
4807
delete_with_prefix(rocksdb::WriteBatch * const batch,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id) const4808 void Rdb_dict_manager::delete_with_prefix(
4809 rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
4810 const GL_INDEX_ID &gl_index_id) const {
4811 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4812 dump_index_id(&key_writer, dict_type, gl_index_id);
4813
4814 delete_key(batch, key_writer.to_slice());
4815 }
4816
add_or_update_index_cf_mapping(rocksdb::WriteBatch * batch,struct Rdb_index_info * const index_info) const4817 void Rdb_dict_manager::add_or_update_index_cf_mapping(
4818 rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
4819 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4820 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
4821 index_info->m_gl_index_id);
4822
4823 Rdb_buf_writer<256> value_writer;
4824
4825 value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
4826 value_writer.write_byte(index_info->m_index_type);
4827 value_writer.write_uint16(index_info->m_kv_version);
4828 value_writer.write_uint32(index_info->m_index_flags);
4829 value_writer.write_uint64(index_info->m_ttl_duration);
4830
4831 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4832 }
4833
add_cf_flags(rocksdb::WriteBatch * const batch,const uint32_t cf_id,const uint32_t cf_flags) const4834 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
4835 const uint32_t cf_id,
4836 const uint32_t cf_flags) const {
4837 DBUG_ASSERT(batch != nullptr);
4838
4839 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4840 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4841 key_writer.write_uint32(cf_id);
4842
4843 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
4844 value_writer;
4845 value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
4846 value_writer.write_uint32(cf_flags);
4847
4848 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4849 }
4850
delete_index_info(rocksdb::WriteBatch * batch,const GL_INDEX_ID & gl_index_id) const4851 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
4852 const GL_INDEX_ID &gl_index_id) const {
4853 delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
4854 delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
4855 delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
4856 }
4857
get_index_info(const GL_INDEX_ID & gl_index_id,struct Rdb_index_info * const index_info) const4858 bool Rdb_dict_manager::get_index_info(
4859 const GL_INDEX_ID &gl_index_id,
4860 struct Rdb_index_info *const index_info) const {
4861 if (index_info) {
4862 index_info->m_gl_index_id = gl_index_id;
4863 }
4864
4865 bool found = false;
4866 bool error = false;
4867 std::string value;
4868 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4869 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
4870
4871 const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
4872 if (status.ok()) {
4873 if (!index_info) {
4874 return true;
4875 }
4876
4877 const uchar *const val = (const uchar *)value.c_str();
4878 const uchar *ptr = val;
4879 index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
4880 ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
4881
4882 switch (index_info->m_index_dict_version) {
4883 case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
4884 /* Sanity check to prevent reading bogus TTL record. */
4885 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4886 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4887 RDB_SIZEOF_INDEX_FLAGS +
4888 ROCKSDB_SIZEOF_TTL_RECORD) {
4889 error = true;
4890 break;
4891 }
4892 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4893 ptr += RDB_SIZEOF_INDEX_TYPE;
4894 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4895 ptr += RDB_SIZEOF_KV_VERSION;
4896 index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
4897 ptr += RDB_SIZEOF_INDEX_FLAGS;
4898 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4899 found = true;
4900 break;
4901
4902 case Rdb_key_def::INDEX_INFO_VERSION_TTL:
4903 /* Sanity check to prevent reading bogus into TTL record. */
4904 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4905 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4906 ROCKSDB_SIZEOF_TTL_RECORD) {
4907 error = true;
4908 break;
4909 }
4910 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4911 ptr += RDB_SIZEOF_INDEX_TYPE;
4912 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4913 ptr += RDB_SIZEOF_KV_VERSION;
4914 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4915 if ((index_info->m_kv_version ==
4916 Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
4917 index_info->m_ttl_duration > 0) {
4918 index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
4919 }
4920 found = true;
4921 break;
4922
4923 case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
4924 case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
4925 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4926 ptr += RDB_SIZEOF_INDEX_TYPE;
4927 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4928 found = true;
4929 break;
4930
4931 default:
4932 error = true;
4933 break;
4934 }
4935
4936 switch (index_info->m_index_type) {
4937 case Rdb_key_def::INDEX_TYPE_PRIMARY:
4938 case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
4939 error = index_info->m_kv_version >
4940 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
4941 break;
4942 }
4943 case Rdb_key_def::INDEX_TYPE_SECONDARY:
4944 error = index_info->m_kv_version >
4945 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
4946 break;
4947 default:
4948 error = true;
4949 break;
4950 }
4951 }
4952
4953 if (error) {
4954 // NO_LINT_DEBUG
4955 sql_print_error(
4956 "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
4957 "from data dictionary. This should never happen "
4958 "and it may be a bug.",
4959 index_info->m_index_dict_version, index_info->m_index_type,
4960 index_info->m_kv_version, index_info->m_ttl_duration);
4961 abort();
4962 }
4963
4964 return found;
4965 }
4966
get_cf_flags(const uint32_t cf_id,uint32_t * const cf_flags) const4967 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
4968 uint32_t *const cf_flags) const {
4969 DBUG_ASSERT(cf_flags != nullptr);
4970
4971 bool found = false;
4972 std::string value;
4973 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4974
4975 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4976 key_writer.write_uint32(cf_id);
4977
4978 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
4979
4980 if (status.ok()) {
4981 const uchar *val = (const uchar *)value.c_str();
4982 DBUG_ASSERT(val);
4983
4984 const uint16_t version = rdb_netbuf_to_uint16(val);
4985
4986 if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
4987 *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
4988 found = true;
4989 }
4990 }
4991
4992 return found;
4993 }
4994
4995 /*
4996 Returning index ids that were marked as deleted (via DROP TABLE) but
4997 still not removed by drop_index_thread yet, or indexes that are marked as
4998 ongoing creation.
4999 */
get_ongoing_index_operation(std::unordered_set<GL_INDEX_ID> * gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const5000 void Rdb_dict_manager::get_ongoing_index_operation(
5001 std::unordered_set<GL_INDEX_ID> *gl_index_ids,
5002 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5003 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5004 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5005
5006 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
5007 index_writer.write_uint32(dd_type);
5008 const rocksdb::Slice index_slice = index_writer.to_slice();
5009
5010 rocksdb::Iterator *it = new_iterator();
5011 for (it->Seek(index_slice); it->Valid(); it->Next()) {
5012 rocksdb::Slice key = it->key();
5013 const uchar *const ptr = (const uchar *)key.data();
5014
5015 /*
5016 Ongoing drop/create index operations require key to be of the form:
5017 dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
5018
5019 This may need to be changed in the future if we want to process a new
5020 ddl_type with different format.
5021 */
5022 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
5023 rdb_netbuf_to_uint32(ptr) != dd_type) {
5024 break;
5025 }
5026
5027 // We don't check version right now since currently we always store only
5028 // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
5029 // If increasing version number, we need to add version check logic here.
5030 GL_INDEX_ID gl_index_id;
5031 gl_index_id.cf_id =
5032 rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5033 gl_index_id.index_id =
5034 rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
5035 gl_index_ids->insert(gl_index_id);
5036 }
5037 delete it;
5038 }
5039
5040 /*
5041 Returning true if index_id is create/delete ongoing (undergoing creation or
5042 marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
5043 or not.
5044 */
is_index_operation_ongoing(const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5045 bool Rdb_dict_manager::is_index_operation_ongoing(
5046 const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5047 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5048 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5049
5050 bool found = false;
5051 std::string value;
5052 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5053 dump_index_id(&key_writer, dd_type, gl_index_id);
5054
5055 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5056 if (status.ok()) {
5057 found = true;
5058 }
5059 return found;
5060 }
5061
5062 /*
5063 Adding index_id to data dictionary so that the index id is removed
5064 by drop_index_thread, or to track online index creation.
5065 */
start_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5066 void Rdb_dict_manager::start_ongoing_index_operation(
5067 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5068 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5069 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5070 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5071
5072 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5073 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
5074
5075 dump_index_id(&key_writer, dd_type, gl_index_id);
5076
5077 // version as needed
5078 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5079 value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
5080 } else {
5081 value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
5082 }
5083
5084 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5085 }
5086
5087 /*
5088 Removing index_id from data dictionary to confirm drop_index_thread
5089 completed dropping entire key/values of the index_id
5090 */
end_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5091 void Rdb_dict_manager::end_ongoing_index_operation(
5092 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5093 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5094 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5095 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5096
5097 delete_with_prefix(batch, dd_type, gl_index_id);
5098 }
5099
5100 /*
5101 Returning true if there is no target index ids to be removed
5102 by drop_index_thread
5103 */
is_drop_index_empty() const5104 bool Rdb_dict_manager::is_drop_index_empty() const {
5105 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5106 get_ongoing_drop_indexes(&gl_index_ids);
5107 return gl_index_ids.empty();
5108 }
5109
5110 /*
5111 This function is supposed to be called by DROP TABLE. Logging messages
5112 that dropping indexes started, and adding data dictionary so that
5113 all associated indexes to be removed
5114 */
add_drop_table(std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,rocksdb::WriteBatch * const batch) const5115 void Rdb_dict_manager::add_drop_table(
5116 std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5117 rocksdb::WriteBatch *const batch) const {
5118 std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5119 for (uint32 i = 0; i < n_keys; i++) {
5120 dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5121 }
5122
5123 add_drop_index(dropped_index_ids, batch);
5124 }
5125
5126 /*
5127 Called during inplace index drop operations. Logging messages
5128 that dropping indexes started, and adding data dictionary so that
5129 all associated indexes to be removed
5130 */
add_drop_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5131 void Rdb_dict_manager::add_drop_index(
5132 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5133 rocksdb::WriteBatch *const batch) const {
5134 for (const auto &gl_index_id : gl_index_ids) {
5135 log_start_drop_index(gl_index_id, "Begin");
5136 start_drop_index(batch, gl_index_id);
5137 }
5138 }
5139
5140 /*
5141 Called during inplace index creation operations. Logging messages
5142 that adding indexes started, and updates data dictionary with all associated
5143 indexes to be added.
5144 */
add_create_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5145 void Rdb_dict_manager::add_create_index(
5146 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5147 rocksdb::WriteBatch *const batch) const {
5148 for (const auto &gl_index_id : gl_index_ids) {
5149 // NO_LINT_DEBUG
5150 sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)",
5151 gl_index_id.cf_id, gl_index_id.index_id);
5152 start_create_index(batch, gl_index_id);
5153 }
5154 }
5155
5156 /*
5157 This function is supposed to be called by drop_index_thread, when it
5158 finished dropping any index, or at the completion of online index creation.
5159 */
finish_indexes_operation(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const5160 void Rdb_dict_manager::finish_indexes_operation(
5161 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5162 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5163 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5164 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5165
5166 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5167 rocksdb::WriteBatch *const batch = wb.get();
5168
5169 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5170 get_ongoing_create_indexes(&incomplete_create_indexes);
5171
5172 for (const auto &gl_index_id : gl_index_ids) {
5173 if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5174 end_ongoing_index_operation(batch, gl_index_id, dd_type);
5175
5176 /*
5177 Remove the corresponding incomplete create indexes from data
5178 dictionary as well
5179 */
5180 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5181 if (incomplete_create_indexes.count(gl_index_id)) {
5182 end_ongoing_index_operation(batch, gl_index_id,
5183 Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5184 }
5185 }
5186 }
5187
5188 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5189 delete_index_info(batch, gl_index_id);
5190 }
5191 }
5192 commit(batch);
5193 }
5194
5195 /*
5196 This function is supposed to be called when initializing
5197 Rdb_dict_manager (at startup). If there is any index ids that are
5198 drop ongoing, printing out messages for diagnostics purposes.
5199 */
resume_drop_indexes() const5200 void Rdb_dict_manager::resume_drop_indexes() const {
5201 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5202 get_ongoing_drop_indexes(&gl_index_ids);
5203
5204 uint max_index_id_in_dict = 0;
5205 get_max_index_id(&max_index_id_in_dict);
5206
5207 for (const auto &gl_index_id : gl_index_ids) {
5208 log_start_drop_index(gl_index_id, "Resume");
5209 if (max_index_id_in_dict < gl_index_id.index_id) {
5210 // NO_LINT_DEBUG
5211 sql_print_error(
5212 "RocksDB: Found max index id %u from data dictionary "
5213 "but also found dropped index id (%u,%u) from drop_index "
5214 "dictionary. This should never happen and is possibly a "
5215 "bug.",
5216 max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
5217 abort();
5218 }
5219 }
5220 }
5221
rollback_ongoing_index_creation() const5222 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5223 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5224 rocksdb::WriteBatch *const batch = wb.get();
5225
5226 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5227 get_ongoing_create_indexes(&gl_index_ids);
5228
5229 for (const auto &gl_index_id : gl_index_ids) {
5230 // NO_LINT_DEBUG
5231 sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)",
5232 gl_index_id.cf_id, gl_index_id.index_id);
5233
5234 start_drop_index(batch, gl_index_id);
5235 }
5236
5237 commit(batch);
5238 }
5239
log_start_drop_table(const std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,const char * const log_action) const5240 void Rdb_dict_manager::log_start_drop_table(
5241 const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5242 const char *const log_action) const {
5243 for (uint32 i = 0; i < n_keys; i++) {
5244 log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5245 }
5246 }
5247
log_start_drop_index(GL_INDEX_ID gl_index_id,const char * log_action) const5248 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5249 const char *log_action) const {
5250 struct Rdb_index_info index_info;
5251 if (!get_index_info(gl_index_id, &index_info)) {
5252 /*
5253 If we don't find the index info, it could be that it's because it was a
5254 partially created index that isn't in the data dictionary yet that needs
5255 to be rolled back.
5256 */
5257 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5258 get_ongoing_create_indexes(&incomplete_create_indexes);
5259
5260 if (!incomplete_create_indexes.count(gl_index_id)) {
5261 /* If it's not a partially created index, something is very wrong. */
5262 // NO_LINT_DEBUG
5263 sql_print_error(
5264 "RocksDB: Failed to get column family info "
5265 "from index id (%u,%u). MyRocks data dictionary may "
5266 "get corrupted.",
5267 gl_index_id.cf_id, gl_index_id.index_id);
5268 if (rocksdb_ignore_datadic_errors)
5269 {
5270 sql_print_error("RocksDB: rocksdb_ignore_datadic_errors=1, "
5271 "trying to continue");
5272 return;
5273 }
5274 abort();
5275 }
5276 }
5277 }
5278
get_max_index_id(uint32_t * const index_id) const5279 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5280 bool found = false;
5281 std::string value;
5282
5283 const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5284 if (status.ok()) {
5285 const uchar *const val = (const uchar *)value.c_str();
5286 const uint16_t version = rdb_netbuf_to_uint16(val);
5287 if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5288 *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5289 found = true;
5290 }
5291 }
5292 return found;
5293 }
5294
update_max_index_id(rocksdb::WriteBatch * const batch,const uint32_t index_id) const5295 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5296 const uint32_t index_id) const {
5297 DBUG_ASSERT(batch != nullptr);
5298
5299 uint32_t old_index_id = -1;
5300 if (get_max_index_id(&old_index_id)) {
5301 if (old_index_id > index_id) {
5302 // NO_LINT_DEBUG
5303 sql_print_error(
5304 "RocksDB: Found max index id %u from data dictionary "
5305 "but trying to update to older value %u. This should "
5306 "never happen and possibly a bug.",
5307 old_index_id, index_id);
5308 return true;
5309 }
5310 }
5311
5312 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5313 value_writer;
5314 value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
5315 value_writer.write_uint32(index_id);
5316
5317 batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
5318 return false;
5319 }
5320
add_stats(rocksdb::WriteBatch * const batch,const std::vector<Rdb_index_stats> & stats) const5321 void Rdb_dict_manager::add_stats(
5322 rocksdb::WriteBatch *const batch,
5323 const std::vector<Rdb_index_stats> &stats) const {
5324 DBUG_ASSERT(batch != nullptr);
5325
5326 for (const auto &it : stats) {
5327 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5328 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5329
5330 // IndexStats::materialize takes complete care of serialization including
5331 // storing the version
5332 const auto value =
5333 Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5334
5335 batch->Put(m_system_cfh, key_writer.to_slice(), value);
5336 }
5337 }
5338
get_stats(GL_INDEX_ID gl_index_id) const5339 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5340 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5341 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5342
5343 std::string value;
5344 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5345 if (status.ok()) {
5346 std::vector<Rdb_index_stats> v;
5347 // unmaterialize checks if the version matches
5348 if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5349 return v[0];
5350 }
5351 }
5352
5353 return Rdb_index_stats();
5354 }
5355
put_auto_incr_val(rocksdb::WriteBatchBase * batch,const GL_INDEX_ID & gl_index_id,ulonglong val,bool overwrite) const5356 rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
5357 rocksdb::WriteBatchBase *batch, const GL_INDEX_ID &gl_index_id,
5358 ulonglong val, bool overwrite) const {
5359 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5360 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5361
5362 // Value is constructed by storing the version and the value.
5363 Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5364 ROCKSDB_SIZEOF_AUTOINC_VALUE>
5365 value_writer;
5366 value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
5367 value_writer.write_uint64(val);
5368
5369 if (overwrite) {
5370 return batch->Put(m_system_cfh, key_writer.to_slice(),
5371 value_writer.to_slice());
5372 }
5373 return batch->Merge(m_system_cfh, key_writer.to_slice(),
5374 value_writer.to_slice());
5375 }
5376
get_auto_incr_val(const GL_INDEX_ID & gl_index_id,ulonglong * new_val) const5377 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5378 ulonglong *new_val) const {
5379 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5380 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5381
5382 std::string value;
5383 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5384
5385 if (status.ok()) {
5386 const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5387
5388 if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5389 *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5390 return true;
5391 }
5392 }
5393 return false;
5394 }
5395
get_and_update_next_number(Rdb_dict_manager * const dict)5396 uint Rdb_seq_generator::get_and_update_next_number(
5397 Rdb_dict_manager *const dict) {
5398 DBUG_ASSERT(dict != nullptr);
5399
5400 uint res;
5401 RDB_MUTEX_LOCK_CHECK(m_mutex);
5402
5403 res = m_next_number++;
5404
5405 const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
5406 rocksdb::WriteBatch *const batch = wb.get();
5407
5408 DBUG_ASSERT(batch != nullptr);
5409 dict->update_max_index_id(batch, res);
5410 dict->commit(batch);
5411
5412 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
5413
5414 return res;
5415 }
5416
5417 } // namespace myrocks
5418