1 /*
2 Copyright (c) 2012,2013 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation // gcc: Class implementation
19 #endif
20
21 /* For use of 'PRIu64': */
22 #define __STDC_FORMAT_MACROS
23
24 #include <my_global.h>
25 /* This C++ file's header file */
26 #include "./rdb_datadic.h"
27
28 #include <inttypes.h>
29 /* C++ standard header files */
30 #include <algorithm>
31 #include <array>
32 #include <limits>
33 #include <map>
34 #include <set>
35 #include <string>
36 #include <utility>
37 #include <vector>
38
39 /* MySQL header files */
40 #include "./field.h"
41 #include "./key.h"
42 #include "./m_ctype.h"
43 #include "./my_bit.h"
44 #include "./my_bitmap.h"
45 #include "./sql_table.h"
46
47 /* MyRocks header files */
48 #include "./ha_rocksdb.h"
49 #include "./ha_rocksdb_proto.h"
50 #include "./my_stacktrace.h"
51 #include "./rdb_cf_manager.h"
52 #include "./rdb_psi.h"
53 #include "./rdb_utils.h"
54
55 namespace myrocks {
56
57 void get_mem_comparable_space(const CHARSET_INFO *cs,
58 const std::vector<uchar> **xfrm, size_t *xfrm_len,
59 size_t *mb_len);
60
61 /*
62 MariaDB's replacement for FB/MySQL Field::check_field_name_match :
63 */
field_check_field_name_match(Field * field,const char * name)64 inline bool field_check_field_name_match(Field *field, const char *name)
65 {
66 return (0 == my_strcasecmp(system_charset_info,
67 field->field_name.str,
68 name));
69 }
70
71
72 /*
73 Decode current key field
74 @param fpi IN data structure contains field metadata
75 @param field IN current field
76 @param reader IN key slice reader
77 @param unp_reader IN unpack information reader
78 @return
79 HA_EXIT_SUCCESS OK
80 other HA_ERR error code
81 */
decode_field(Rdb_field_packing * fpi,Field * field,Rdb_string_reader * reader,const uchar * const default_value,Rdb_string_reader * unpack_reader)82 int Rdb_convert_to_record_key_decoder::decode_field(
83 Rdb_field_packing *fpi, Field *field, Rdb_string_reader *reader,
84 const uchar *const default_value, Rdb_string_reader *unpack_reader) {
85 if (fpi->m_maybe_null) {
86 const char *nullp;
87 if (!(nullp = reader->read(1))) {
88 return HA_EXIT_FAILURE;
89 }
90
91 if (*nullp == 0) {
92 /* Set the NULL-bit of this field */
93 field->set_null();
94 /* Also set the field to its default value */
95 memcpy(field->ptr, default_value, field->pack_length());
96 return HA_EXIT_SUCCESS;
97 } else if (*nullp == 1) {
98 field->set_notnull();
99 } else {
100 return HA_EXIT_FAILURE;
101 }
102 }
103
104 return (fpi->m_unpack_func)(fpi, field, field->ptr, reader, unpack_reader);
105 }
106
107 /*
108 Decode current key field
109
110 @param buf OUT the buf starting address
111 @param offset OUT the bytes offset when data is written
112 @param fpi IN data structure contains field metadata
113 @param table IN current table
114 @param field IN current field
115 @param has_unpack_inf IN whether contains unpack inf
116 @param reader IN key slice reader
117 @param unp_reader IN unpack information reader
118 @return
119 HA_EXIT_SUCCESS OK
120 other HA_ERR error code
121 */
decode(uchar * const buf,uint * offset,Rdb_field_packing * fpi,TABLE * table,Field * field,bool has_unpack_info,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)122 int Rdb_convert_to_record_key_decoder::decode(
123 uchar *const buf, uint *offset, Rdb_field_packing *fpi, TABLE *table,
124 Field *field, bool has_unpack_info, Rdb_string_reader *reader,
125 Rdb_string_reader *unpack_reader) {
126 DBUG_ASSERT(buf != nullptr);
127 DBUG_ASSERT(offset != nullptr);
128
129 uint field_offset = field->ptr - table->record[0];
130 *offset = field_offset;
131 uint null_offset = field->null_offset();
132 bool maybe_null = field->real_maybe_null();
133
134 field->move_field(buf + field_offset,
135 maybe_null ? buf + null_offset : nullptr, field->null_bit);
136
137 // If we need unpack info, but there is none, tell the unpack function
138 // this by passing unp_reader as nullptr. If we never read unpack_info
139 // during unpacking anyway, then there won't an error.
140 bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
141
142 int res =
143 decode_field(fpi, field, reader, table->s->default_values + field_offset,
144 maybe_missing_unpack ? nullptr : unpack_reader);
145
146 // Restore field->ptr and field->null_ptr
147 field->move_field(table->record[0] + field_offset,
148 maybe_null ? table->record[0] + null_offset : nullptr,
149 field->null_bit);
150 if (res != UNPACK_SUCCESS) {
151 return HA_ERR_ROCKSDB_CORRUPT_DATA;
152 }
153 return HA_EXIT_SUCCESS;
154 }
155
156 /*
157 Skip current key field
158
159 @param fpi IN data structure contains field metadata
160 @param field IN current field
161 @param reader IN key slice reader
162 @param unp_reader IN unpack information reader
163 @return
164 HA_EXIT_SUCCESS OK
165 other HA_ERR error code
166 */
skip(const Rdb_field_packing * fpi,const Field * field,Rdb_string_reader * reader,Rdb_string_reader * unp_reader)167 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
168 const Field *field,
169 Rdb_string_reader *reader,
170 Rdb_string_reader *unp_reader) {
171 /* It is impossible to unpack the column. Skip it. */
172 if (fpi->m_maybe_null) {
173 const char *nullp;
174 if (!(nullp = reader->read(1))) {
175 return HA_ERR_ROCKSDB_CORRUPT_DATA;
176 }
177 if (*nullp == 0) {
178 /* This is a NULL value */
179 return HA_EXIT_SUCCESS;
180 }
181 /* If NULL marker is not '0', it can be only '1' */
182 if (*nullp != 1) {
183 return HA_ERR_ROCKSDB_CORRUPT_DATA;
184 }
185 }
186 if ((fpi->m_skip_func)(fpi, field, reader)) {
187 return HA_ERR_ROCKSDB_CORRUPT_DATA;
188 }
189 // If this is a space padded varchar, we need to skip the indicator
190 // bytes for trailing bytes. They're useless since we can't restore the
191 // field anyway.
192 //
193 // There is a special case for prefixed varchars where we do not
194 // generate unpack info, because we know prefixed varchars cannot be
195 // unpacked. In this case, it is not necessary to skip.
196 if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
197 !fpi->m_unpack_info_stores_value) {
198 unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
199 }
200 return HA_EXIT_SUCCESS;
201 }
202
Rdb_key_field_iterator(const Rdb_key_def * key_def,Rdb_field_packing * pack_info,Rdb_string_reader * reader,Rdb_string_reader * unp_reader,TABLE * table,bool has_unpack_info,const MY_BITMAP * covered_bitmap,uchar * const buf)203 Rdb_key_field_iterator::Rdb_key_field_iterator(
204 const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
205 Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
206 bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
207 m_key_def = key_def;
208 m_pack_info = pack_info;
209 m_iter_index = 0;
210 m_iter_end = key_def->get_key_parts();
211 m_reader = reader;
212 m_unp_reader = unp_reader;
213 m_table = table;
214 m_has_unpack_info = has_unpack_info;
215 m_covered_bitmap = covered_bitmap;
216 m_buf = buf;
217 m_secondary_key =
218 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
219 m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
220 m_is_hidden_pk =
221 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
222 m_curr_bitmap_pos = 0;
223 m_offset = 0;
224 }
225
get_dst() const226 void *Rdb_key_field_iterator::get_dst() const { return m_buf + m_offset; }
227
get_field_index() const228 int Rdb_key_field_iterator::get_field_index() const {
229 DBUG_ASSERT(m_field != nullptr);
230 return m_field->field_index;
231 }
232
get_is_null() const233 bool Rdb_key_field_iterator::get_is_null() const { return m_is_null; }
get_field() const234 Field *Rdb_key_field_iterator::get_field() const {
235 DBUG_ASSERT(m_field != nullptr);
236 return m_field;
237 }
238
has_next()239 bool Rdb_key_field_iterator::has_next() { return m_iter_index < m_iter_end; }
240
241 /**
242 Iterate each field in the key and decode/skip one by one
243 */
next()244 int Rdb_key_field_iterator::next() {
245 int status = HA_EXIT_SUCCESS;
246 while (m_iter_index < m_iter_end) {
247 int curr_index = m_iter_index++;
248
249 m_fpi = &m_pack_info[curr_index];
250 /*
251 Hidden pk field is packed at the end of the secondary keys, but the SQL
252 layer does not know about it. Skip retrieving field if hidden pk.
253 */
254 if ((m_secondary_key && m_hidden_pk_exists &&
255 curr_index + 1 == m_iter_end) ||
256 m_is_hidden_pk) {
257 DBUG_ASSERT(m_fpi->m_unpack_func);
258 if ((m_fpi->m_skip_func)(m_fpi, nullptr, m_reader)) {
259 return HA_ERR_ROCKSDB_CORRUPT_DATA;
260 }
261 return HA_EXIT_SUCCESS;
262 }
263
264 m_field = m_fpi->get_field_in_table(m_table);
265
266 bool covered_column = true;
267 if (m_covered_bitmap != nullptr &&
268 m_field->real_type() == MYSQL_TYPE_VARCHAR && !m_fpi->m_covered) {
269 covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
270 bitmap_is_set(m_covered_bitmap, m_curr_bitmap_pos++);
271 }
272
273 if (m_fpi->m_unpack_func && covered_column) {
274 /* It is possible to unpack this column. Do it. */
275 status = Rdb_convert_to_record_key_decoder::decode(
276 m_buf, &m_offset, m_fpi, m_table, m_field, m_has_unpack_info,
277 m_reader, m_unp_reader);
278 if (status) {
279 return status;
280 }
281 break;
282 } else {
283 status = Rdb_convert_to_record_key_decoder::skip(m_fpi, m_field, m_reader,
284 m_unp_reader);
285 if (status) {
286 return status;
287 }
288 }
289 }
290 return HA_EXIT_SUCCESS;
291 }
292
293 /*
294 Rdb_key_def class implementation
295 */
Rdb_key_def(uint indexnr_arg,uint keyno_arg,rocksdb::ColumnFamilyHandle * cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_per_partition_cf_arg,const char * _name,Rdb_index_stats _stats,uint32 index_flags_bitmap,uint32 ttl_rec_offset,uint64 ttl_duration)296 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
297 rocksdb::ColumnFamilyHandle *cf_handle_arg,
298 uint16_t index_dict_version_arg, uchar index_type_arg,
299 uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
300 bool is_per_partition_cf_arg, const char *_name,
301 Rdb_index_stats _stats, uint32 index_flags_bitmap,
302 uint32 ttl_rec_offset, uint64 ttl_duration)
303 : m_index_number(indexnr_arg),
304 m_cf_handle(cf_handle_arg),
305 m_index_dict_version(index_dict_version_arg),
306 m_index_type(index_type_arg),
307 m_kv_format_version(kv_format_version_arg),
308 m_is_reverse_cf(is_reverse_cf_arg),
309 m_is_per_partition_cf(is_per_partition_cf_arg),
310 m_name(_name),
311 m_stats(_stats),
312 m_index_flags_bitmap(index_flags_bitmap),
313 m_ttl_rec_offset(ttl_rec_offset),
314 m_ttl_duration(ttl_duration),
315 m_ttl_column(""),
316 m_pk_part_no(nullptr),
317 m_pack_info(nullptr),
318 m_keyno(keyno_arg),
319 m_key_parts(0),
320 m_ttl_pk_key_part_offset(UINT_MAX),
321 m_ttl_field_index(UINT_MAX),
322 m_prefix_extractor(nullptr),
323 m_maxlength(0) // means 'not intialized'
324 {
325 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
326 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
327 m_total_index_flags_length =
328 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
329 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
330 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
331 m_total_index_flags_length == 0);
332 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
333 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
334 m_total_index_flags_length == 0);
335 DBUG_ASSERT(m_cf_handle != nullptr);
336 }
337
Rdb_key_def(const Rdb_key_def & k)338 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
339 : m_index_number(k.m_index_number),
340 m_cf_handle(k.m_cf_handle),
341 m_is_reverse_cf(k.m_is_reverse_cf),
342 m_is_per_partition_cf(k.m_is_per_partition_cf),
343 m_name(k.m_name),
344 m_stats(k.m_stats),
345 m_index_flags_bitmap(k.m_index_flags_bitmap),
346 m_ttl_rec_offset(k.m_ttl_rec_offset),
347 m_ttl_duration(k.m_ttl_duration),
348 m_ttl_column(k.m_ttl_column),
349 m_pk_part_no(k.m_pk_part_no),
350 m_pack_info(k.m_pack_info),
351 m_keyno(k.m_keyno),
352 m_key_parts(k.m_key_parts),
353 m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
354 m_ttl_field_index(UINT_MAX),
355 m_prefix_extractor(k.m_prefix_extractor),
356 m_maxlength(k.m_maxlength) {
357 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
358 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
359 m_total_index_flags_length =
360 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
361 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
362 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
363 m_total_index_flags_length == 0);
364 DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
365 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
366 m_total_index_flags_length == 0);
367 if (k.m_pack_info) {
368 const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
369 void *pack_info= my_malloc(size, MYF(0));
370 memcpy(pack_info, k.m_pack_info, size);
371 m_pack_info = reinterpret_cast<Rdb_field_packing *>(pack_info);
372 }
373
374 if (k.m_pk_part_no) {
375 const size_t size = sizeof(uint) * m_key_parts;
376 m_pk_part_no = reinterpret_cast<uint *>(my_malloc(size, MYF(0)));
377 memcpy(m_pk_part_no, k.m_pk_part_no, size);
378 }
379 }
380
~Rdb_key_def()381 Rdb_key_def::~Rdb_key_def() {
382 mysql_mutex_destroy(&m_mutex);
383
384 my_free(m_pk_part_no);
385 m_pk_part_no = nullptr;
386
387 my_free(m_pack_info);
388 m_pack_info = nullptr;
389 }
390
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)391 void Rdb_key_def::setup(const TABLE *const tbl,
392 const Rdb_tbl_def *const tbl_def) {
393 DBUG_ASSERT(tbl != nullptr);
394 DBUG_ASSERT(tbl_def != nullptr);
395
396 /*
397 Set max_length based on the table. This can be called concurrently from
398 multiple threads, so there is a mutex to protect this code.
399 */
400 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
401 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
402 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
403 if (!m_maxlength) {
404 RDB_MUTEX_LOCK_CHECK(m_mutex);
405 if (m_maxlength != 0) {
406 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
407 return;
408 }
409
410 KEY *key_info = nullptr;
411 KEY *pk_info = nullptr;
412 if (!is_hidden_pk) {
413 key_info = &tbl->key_info[m_keyno];
414 if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
415 m_name = std::string(key_info->name.str);
416 } else {
417 m_name = HIDDEN_PK_NAME;
418 }
419
420 if (secondary_key) {
421 m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts;
422 } else {
423 pk_info = nullptr;
424 m_pk_key_parts = 0;
425 }
426
427 // "unique" secondary keys support:
428 m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts;
429
430 if (secondary_key) {
431 /*
432 In most cases, SQL layer puts PK columns as invisible suffix at the
433 end of secondary key. There are cases where this doesn't happen:
434 - unique secondary indexes.
435 - partitioned tables.
436
437 Internally, we always need PK columns as suffix (and InnoDB does,
438 too, if you were wondering).
439
440 The loop below will attempt to put all PK columns at the end of key
441 definition. Columns that are already included in the index (either
442 by the user or by "extended keys" feature) are not included for the
443 second time.
444 */
445 m_key_parts += m_pk_key_parts;
446 }
447
448 if (secondary_key) {
449 m_pk_part_no = reinterpret_cast<uint *>(
450 my_malloc(sizeof(uint) * m_key_parts, MYF(0)));
451 } else {
452 m_pk_part_no = nullptr;
453 }
454
455 const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
456 m_pack_info =
457 reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
458
459 /*
460 Guaranteed not to error here as checks have been made already during
461 table creation.
462 */
463 Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
464 &m_ttl_field_index, true);
465
466 size_t max_len = INDEX_NUMBER_SIZE;
467 int unpack_len = 0;
468 int max_part_len = 0;
469 bool simulating_extkey = false;
470 uint dst_i = 0;
471
472 uint keyno_to_set = m_keyno;
473 uint keypart_to_set = 0;
474
475 if (is_hidden_pk) {
476 Field *field = nullptr;
477 m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
478 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
479 max_len += m_pack_info[dst_i].m_max_image_len;
480 max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
481 dst_i++;
482 } else {
483 KEY_PART_INFO *key_part = key_info->key_part;
484
485 /* this loop also loops over the 'extended key' tail */
486 for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
487 Field *const field = key_part ? key_part->field : nullptr;
488
489 if (simulating_extkey && !hidden_pk_exists) {
490 DBUG_ASSERT(secondary_key);
491 /* Check if this field is already present in the key definition */
492 bool found = false;
493 for (uint j= 0; j < key_info->ext_key_parts; j++) {
494 if (field->field_index ==
495 key_info->key_part[j].field->field_index &&
496 key_part->length == key_info->key_part[j].length) {
497 found = true;
498 break;
499 }
500 }
501
502 if (found) {
503 key_part++;
504 continue;
505 }
506 }
507
508 if (field && field->real_maybe_null()) max_len += 1; // NULL-byte
509
510 m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
511 key_part ? key_part->length : 0);
512 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
513
514 if (pk_info) {
515 m_pk_part_no[dst_i] = -1;
516 for (uint j = 0; j < m_pk_key_parts; j++) {
517 if (field->field_index == pk_info->key_part[j].field->field_index) {
518 m_pk_part_no[dst_i] = j;
519 break;
520 }
521 }
522 } else if (secondary_key && hidden_pk_exists) {
523 /*
524 The hidden pk can never be part of the sk. So it is always
525 appended to the end of the sk.
526 */
527 m_pk_part_no[dst_i] = -1;
528 if (simulating_extkey) m_pk_part_no[dst_i] = 0;
529 }
530
531 max_len += m_pack_info[dst_i].m_max_image_len;
532
533 max_part_len =
534 std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
535
536 /*
537 Check key part name here, if it matches the TTL column then we store
538 the offset of the TTL key part here.
539 */
540 if (!m_ttl_column.empty() &&
541 field_check_field_name_match(field, m_ttl_column.c_str())) {
542 DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
543 DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
544 DBUG_ASSERT(!field->real_maybe_null());
545 m_ttl_pk_key_part_offset = dst_i;
546 }
547
548 key_part++;
549 /*
550 For "unique" secondary indexes, pretend they have
551 "index extensions".
552
553 MariaDB also has this property: if an index has a partially-covered
554 column like KEY(varchar_col(N)), then the SQL layer will think it is
555 not "extended" with PK columns. The code below handles this case,
556 also.
557 */
558 if (secondary_key && src_i+1 == key_info->ext_key_parts) {
559 simulating_extkey = true;
560 if (!hidden_pk_exists) {
561 keyno_to_set = tbl->s->primary_key;
562 key_part = pk_info->key_part;
563 keypart_to_set = (uint)-1;
564 } else {
565 keyno_to_set = tbl_def->m_key_count - 1;
566 key_part = nullptr;
567 keypart_to_set = 0;
568 }
569 }
570
571 dst_i++;
572 }
573 }
574
575 m_key_parts = dst_i;
576
577 /* Initialize the memory needed by the stats structure */
578 m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
579
580 /* Cache prefix extractor for bloom filter usage later */
581 rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
582 m_prefix_extractor = opt.prefix_extractor;
583
584 /*
585 This should be the last member variable set before releasing the mutex
586 so that other threads can't see the object partially set up.
587 */
588 m_maxlength = max_len;
589
590 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
591 }
592 }
593
594 /*
595 Determine if the table has TTL enabled by parsing the table comment.
596
597 @param[IN] table_arg
598 @param[IN] tbl_def_arg
599 @param[OUT] ttl_duration Default TTL value parsed from table comment
600 */
extract_ttl_duration(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,uint64 * ttl_duration)601 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
602 const Rdb_tbl_def *const tbl_def_arg,
603 uint64 *ttl_duration) {
604 DBUG_ASSERT(table_arg != nullptr);
605 DBUG_ASSERT(tbl_def_arg != nullptr);
606 DBUG_ASSERT(ttl_duration != nullptr);
607 std::string table_comment(table_arg->s->comment.str,
608 table_arg->s->comment.length);
609
610 bool ttl_duration_per_part_match_found = false;
611 std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
612 table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
613 RDB_TTL_DURATION_QUALIFIER);
614
615 /* If we don't have a ttl duration, nothing to do here. */
616 if (ttl_duration_str.empty()) {
617 return HA_EXIT_SUCCESS;
618 }
619
620 /*
621 Catch errors where a non-integral value was used as ttl duration, strtoull
622 will return 0.
623 */
624 *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
625 if (!*ttl_duration) {
626 my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
627 return HA_EXIT_FAILURE;
628 }
629
630 return HA_EXIT_SUCCESS;
631 }
632
633 /*
634 Determine if the table has TTL enabled by parsing the table comment.
635
636 @param[IN] table_arg
637 @param[IN] tbl_def_arg
638 @param[OUT] ttl_column TTL column in the table
639 @param[IN] skip_checks Skip validation checks (when called in
640 setup())
641 */
extract_ttl_col(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,std::string * ttl_column,uint * ttl_field_index,bool skip_checks)642 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
643 const Rdb_tbl_def *const tbl_def_arg,
644 std::string *ttl_column,
645 uint *ttl_field_index, bool skip_checks) {
646 std::string table_comment(table_arg->s->comment.str,
647 table_arg->s->comment.length);
648 /*
649 Check if there is a TTL column specified. Note that this is not required
650 and if omitted, an 8-byte ttl field will be prepended to each record
651 implicitly.
652 */
653 bool ttl_col_per_part_match_found = false;
654 std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
655 table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
656 RDB_TTL_COL_QUALIFIER);
657
658 if (skip_checks) {
659 for (uint i = 0; i < table_arg->s->fields; i++) {
660 Field *const field = table_arg->field[i];
661 if (field_check_field_name_match(field, ttl_col_str.c_str())) {
662 *ttl_column = ttl_col_str;
663 *ttl_field_index = i;
664 }
665 }
666 return HA_EXIT_SUCCESS;
667 }
668
669 /* Check if TTL column exists in table */
670 if (!ttl_col_str.empty()) {
671 bool found = false;
672 for (uint i = 0; i < table_arg->s->fields; i++) {
673 Field *const field = table_arg->field[i];
674 if (field_check_field_name_match(field, ttl_col_str.c_str()) &&
675 field->real_type() == MYSQL_TYPE_LONGLONG &&
676 field->key_type() == HA_KEYTYPE_ULONGLONG &&
677 !field->real_maybe_null()) {
678 *ttl_column = ttl_col_str;
679 *ttl_field_index = i;
680 found = true;
681 break;
682 }
683 }
684
685 if (!found) {
686 my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
687 return HA_EXIT_FAILURE;
688 }
689 }
690
691 return HA_EXIT_SUCCESS;
692 }
693
gen_qualifier_for_table(const char * const qualifier,const std::string & partition_name)694 const std::string Rdb_key_def::gen_qualifier_for_table(
695 const char *const qualifier, const std::string &partition_name) {
696 bool has_partition = !partition_name.empty();
697 std::string qualifier_str = "";
698
699 if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
700 return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
701 : qualifier_str + RDB_CF_NAME_QUALIFIER +
702 RDB_QUALIFIER_VALUE_SEP;
703 } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
704 return has_partition
705 ? gen_ttl_duration_qualifier_for_partition(partition_name)
706 : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
707 RDB_QUALIFIER_VALUE_SEP;
708 } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
709 return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
710 : qualifier_str + RDB_TTL_COL_QUALIFIER +
711 RDB_QUALIFIER_VALUE_SEP;
712 } else {
713 DBUG_ASSERT(0);
714 }
715
716 return qualifier_str;
717 }
718
719 /*
720 Formats the string and returns the column family name assignment part for a
721 specific partition.
722 */
gen_cf_name_qualifier_for_partition(const std::string & prefix)723 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
724 const std::string &prefix) {
725 DBUG_ASSERT(!prefix.empty());
726
727 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
728 RDB_QUALIFIER_VALUE_SEP;
729 }
730
gen_ttl_duration_qualifier_for_partition(const std::string & prefix)731 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
732 const std::string &prefix) {
733 DBUG_ASSERT(!prefix.empty());
734
735 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
736 RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
737 }
738
gen_ttl_col_qualifier_for_partition(const std::string & prefix)739 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
740 const std::string &prefix) {
741 DBUG_ASSERT(!prefix.empty());
742
743 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
744 RDB_QUALIFIER_VALUE_SEP;
745 }
746
parse_comment_for_qualifier(const std::string & comment,const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,bool * per_part_match_found,const char * const qualifier)747 const std::string Rdb_key_def::parse_comment_for_qualifier(
748 const std::string &comment, const TABLE *const table_arg,
749 const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
750 const char *const qualifier) {
751 DBUG_ASSERT(table_arg != nullptr);
752 DBUG_ASSERT(tbl_def_arg != nullptr);
753 DBUG_ASSERT(per_part_match_found != nullptr);
754 DBUG_ASSERT(qualifier != nullptr);
755
756 std::string empty_result;
757
758 // Flag which marks if partition specific options were found.
759 *per_part_match_found = false;
760
761 if (comment.empty()) {
762 return empty_result;
763 }
764
765 // Let's fetch the comment for a index and check if there's a custom key
766 // name specified for a partition we are handling.
767 std::vector<std::string> v =
768 myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
769
770 std::string search_str = gen_qualifier_for_table(qualifier);
771
772 // If table has partitions then we need to check if user has requested
773 // qualifiers on a per partition basis.
774 //
775 // NOTE: this means if you specify a qualifier for a specific partition it
776 // will take precedence the 'table level' qualifier if one exists.
777 std::string search_str_part;
778 if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) {
779 std::string partition_name = tbl_def_arg->base_partition();
780 DBUG_ASSERT(!partition_name.empty());
781 search_str_part = gen_qualifier_for_table(qualifier, partition_name);
782 }
783
784 DBUG_ASSERT(!search_str.empty());
785
786 // Basic O(N) search for a matching assignment. At most we expect maybe
787 // ten or so elements here.
788 if (!search_str_part.empty()) {
789 for (const auto &it : v) {
790 if (it.substr(0, search_str_part.length()) == search_str_part) {
791 // We found a prefix match. Try to parse it as an assignment.
792 std::vector<std::string> tokens =
793 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
794
795 // We found a custom qualifier, it was in the form we expected it to be.
796 // Return that instead of whatever we initially wanted to return. In
797 // a case below the `foo` part will be returned to the caller.
798 //
799 // p3_cfname=foo
800 //
801 // If no value was specified then we'll return an empty string which
802 // later gets translated into using a default CF.
803 if (tokens.size() == 2) {
804 *per_part_match_found = true;
805 return tokens[1];
806 } else {
807 return empty_result;
808 }
809 }
810 }
811 }
812
813 // Do this loop again, this time searching for 'table level' qualifiers if we
814 // didn't find any partition level qualifiers above.
815 for (const auto &it : v) {
816 if (it.substr(0, search_str.length()) == search_str) {
817 std::vector<std::string> tokens =
818 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
819 if (tokens.size() == 2) {
820 return tokens[1];
821 } else {
822 return empty_result;
823 }
824 }
825 }
826
827 // If we didn't find any partitioned/non-partitioned qualifiers, return an
828 // empty string.
829 return empty_result;
830 }
831
832 /**
833 Read a memcmp key part from a slice using the passed in reader.
834
835 Returns -1 if field was null, 1 if error, 0 otherwise.
836 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const837 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
838 Rdb_string_reader *reader,
839 const uint part_num) const {
840 /* It is impossible to unpack the column. Skip it. */
841 if (m_pack_info[part_num].m_maybe_null) {
842 const char *nullp;
843 if (!(nullp = reader->read(1))) return 1;
844 if (*nullp == 0) {
845 /* This is a NULL value */
846 return -1;
847 } else {
848 /* If NULL marker is not '0', it can be only '1' */
849 if (*nullp != 1) return 1;
850 }
851 }
852
853 Rdb_field_packing *fpi = &m_pack_info[part_num];
854 DBUG_ASSERT(table_arg->s != nullptr);
855
856 bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
857 (table_arg->s->primary_key == MAX_INDEXES);
858 Field *field = nullptr;
859 if (!is_hidden_pk_part) {
860 field = fpi->get_field_in_table(table_arg);
861 }
862 if ((fpi->m_skip_func)(fpi, field, reader)) {
863 return 1;
864 }
865 return 0;
866 }
867
868 /**
869 Get a mem-comparable form of Primary Key from mem-comparable form of this key
870
871 @param
872 pk_descr Primary Key descriptor
873 key Index tuple from this key in mem-comparable form
874 pk_buffer OUT Put here mem-comparable form of the Primary Key.
875
876 @note
877 It may or may not be possible to restore primary key columns to their
878 mem-comparable form. To handle all cases, this function copies mem-
879 comparable forms directly.
880
881 RocksDB SE supports "Extended keys". This means that PK columns are present
882 at the end of every key. If the key already includes PK columns, then
883 these columns are not present at the end of the key.
884
885 Because of the above, we copy each primary key column.
886
887 @todo
888 If we checked crc32 checksums in this function, we would catch some CRC
889 violations that we currently don't. On the other hand, there is a broader
890 set of queries for which we would check the checksum twice.
891 */
892
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const893 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
894 const Rdb_key_def &pk_descr,
895 const rocksdb::Slice *const key,
896 uchar *const pk_buffer) const {
897 DBUG_ASSERT(table != nullptr);
898 DBUG_ASSERT(key != nullptr);
899 DBUG_ASSERT(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
900 DBUG_ASSERT(pk_buffer);
901
902 uint size = 0;
903 uchar *buf = pk_buffer;
904 DBUG_ASSERT(m_pk_key_parts);
905
906 /* Put the PK number */
907 rdb_netbuf_store_index(buf, pk_descr.m_index_number);
908 buf += INDEX_NUMBER_SIZE;
909 size += INDEX_NUMBER_SIZE;
910
911 const char *start_offs[MAX_REF_PARTS];
912 const char *end_offs[MAX_REF_PARTS];
913 int pk_key_part;
914 uint i;
915 Rdb_string_reader reader(key);
916
917 // Skip the index number
918 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
919
920 for (i = 0; i < m_key_parts; i++) {
921 if ((pk_key_part = m_pk_part_no[i]) != -1) {
922 start_offs[pk_key_part] = reader.get_current_ptr();
923 }
924
925 if (read_memcmp_key_part(table, &reader, i) > 0) {
926 return RDB_INVALID_KEY_LEN;
927 }
928
929 if (pk_key_part != -1) {
930 end_offs[pk_key_part] = reader.get_current_ptr();
931 }
932 }
933
934 for (i = 0; i < m_pk_key_parts; i++) {
935 const uint part_size = end_offs[i] - start_offs[i];
936 memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
937 buf += part_size;
938 size += part_size;
939 }
940
941 return size;
942 }
943
944 /**
945 Get a mem-comparable form of Secondary Key from mem-comparable form of this
946 key, without the extended primary key tail.
947
948 @param
949 key Index tuple from this key in mem-comparable form
950 sk_buffer OUT Put here mem-comparable form of the Secondary Key.
951 n_null_fields OUT Put number of null fields contained within sk entry
952 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const953 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
954 const rocksdb::Slice &key,
955 uchar *sk_buffer,
956 uint *n_null_fields) const {
957 DBUG_ASSERT(table != nullptr);
958 DBUG_ASSERT(sk_buffer != nullptr);
959 DBUG_ASSERT(n_null_fields != nullptr);
960 DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
961
962 uchar *buf = sk_buffer;
963
964 int res;
965 Rdb_string_reader reader(&key);
966 const char *start = reader.get_current_ptr();
967
968 // Skip the index number
969 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
970
971 for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
972 if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
973 return RDB_INVALID_KEY_LEN;
974 } else if (res == -1) {
975 (*n_null_fields)++;
976 }
977 }
978
979 uint sk_memcmp_len = reader.get_current_ptr() - start;
980 memcpy(buf, start, sk_memcmp_len);
981 return sk_memcmp_len;
982 }
983
984 /**
985 Convert index tuple into storage (i.e. mem-comparable) format
986
987 @detail
988 Currently this is done by unpacking into table->record[0] and then
989 packing index columns into storage format.
990
991 @param pack_buffer Temporary area for packing varchar columns. Its
992 size is at least max_storage_fmt_length() bytes.
993 */
994
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,const uchar * const key_tuple,const key_part_map & keypart_map) const995 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
996 uchar *const packed_tuple,
997 const uchar *const key_tuple,
998 const key_part_map &keypart_map) const {
999 DBUG_ASSERT(tbl != nullptr);
1000 DBUG_ASSERT(pack_buffer != nullptr);
1001 DBUG_ASSERT(packed_tuple != nullptr);
1002 DBUG_ASSERT(key_tuple != nullptr);
1003
1004 /* We were given a record in KeyTupleFormat. First, save it to record */
1005 const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
1006 key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
1007
1008 uint n_used_parts = my_count_bits(keypart_map);
1009 if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0; // Full key is used
1010
1011 /* Then, convert the record into a mem-comparable form */
1012 return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
1013 false, 0, n_used_parts);
1014 }
1015
1016 /**
1017 @brief
1018 Check if "unpack info" data includes checksum.
1019
1020 @detail
1021 This is used only by CHECK TABLE to count the number of rows that have
1022 checksums.
1023 */
1024
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)1025 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
1026 size_t size = unpack_info.size();
1027 if (size == 0) {
1028 return false;
1029 }
1030 const uchar *ptr = (const uchar *)unpack_info.data();
1031
1032 // Skip unpack info if present.
1033 if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
1034 const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
1035 SHIP_ASSERT(size >= skip_len);
1036
1037 size -= skip_len;
1038 ptr += skip_len;
1039 }
1040
1041 return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
1042 }
1043
1044 /*
1045 @return Number of bytes that were changed
1046 */
successor(uchar * const packed_tuple,const uint len)1047 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
1048 DBUG_ASSERT(packed_tuple != nullptr);
1049
1050 int changed = 0;
1051 uchar *p = packed_tuple + len - 1;
1052 for (; p > packed_tuple; p--) {
1053 changed++;
1054 if (*p != uchar(0xFF)) {
1055 *p = *p + 1;
1056 break;
1057 }
1058 *p = '\0';
1059 }
1060 return changed;
1061 }
1062
1063 /*
1064 @return Number of bytes that were changed
1065 */
predecessor(uchar * const packed_tuple,const uint len)1066 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
1067 DBUG_ASSERT(packed_tuple != nullptr);
1068
1069 int changed = 0;
1070 uchar *p = packed_tuple + len - 1;
1071 for (; p > packed_tuple; p--) {
1072 changed++;
1073 if (*p != uchar(0x00)) {
1074 *p = *p - 1;
1075 break;
1076 }
1077 *p = 0xFF;
1078 }
1079 return changed;
1080 }
1081
1082 static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
1083 {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
1084 {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
1085
1086 /*
1087 @return The length in bytes of the header specified by the given tag
1088 */
get_unpack_header_size(char tag)1089 size_t Rdb_key_def::get_unpack_header_size(char tag) {
1090 DBUG_ASSERT(is_unpack_data_tag(tag));
1091 return UNPACK_HEADER_SIZES.at(tag);
1092 }
1093
1094 /*
1095 Get a bitmap indicating which varchar columns must be covered for this
1096 lookup to be covered. If the bitmap is a subset of the covered bitmap, then
1097 the lookup is covered. If it can already be determined that the lookup is
1098 not covered, map->bitmap will be set to null.
1099 */
get_lookup_bitmap(const TABLE * table,MY_BITMAP * map) const1100 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
1101 DBUG_ASSERT(map->bitmap == nullptr);
1102 bitmap_init(map, nullptr, MAX_REF_PARTS, false);
1103 uint curr_bitmap_pos = 0;
1104
1105 // Indicates which columns in the read set might be covered.
1106 MY_BITMAP maybe_covered_bitmap;
1107 bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
1108
1109 for (uint i = 0; i < m_key_parts; i++) {
1110 if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
1111 continue;
1112 }
1113
1114 Field *const field = m_pack_info[i].get_field_in_table(table);
1115
1116 // Columns which are always covered are not stored in the covered bitmap so
1117 // we can ignore them here too.
1118 if (m_pack_info[i].m_covered &&
1119 bitmap_is_set(table->read_set, field->field_index)) {
1120 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1121 continue;
1122 }
1123
1124 switch (field->real_type()) {
1125 // This type may be covered depending on the record. If it was requested,
1126 // we require the covered bitmap to have this bit set.
1127 case MYSQL_TYPE_VARCHAR:
1128 if (curr_bitmap_pos < MAX_REF_PARTS) {
1129 if (bitmap_is_set(table->read_set, field->field_index)) {
1130 bitmap_set_bit(map, curr_bitmap_pos);
1131 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1132 }
1133 curr_bitmap_pos++;
1134 } else {
1135 bitmap_free(&maybe_covered_bitmap);
1136 bitmap_free(map);
1137 return;
1138 }
1139 break;
1140 // This column is a type which is never covered. If it was requested, we
1141 // know this lookup will never be covered.
1142 default:
1143 if (bitmap_is_set(table->read_set, field->field_index)) {
1144 bitmap_free(&maybe_covered_bitmap);
1145 bitmap_free(map);
1146 return;
1147 }
1148 break;
1149 }
1150 }
1151
1152 // If there are columns which are not covered in the read set, the lookup
1153 // can't be covered.
1154 if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
1155 bitmap_free(map);
1156 }
1157 bitmap_free(&maybe_covered_bitmap);
1158 }
1159
1160 /*
1161 Return true if for this secondary index
1162 - All of the requested columns are in the index
1163 - All values for columns that are prefix-only indexes are shorter or equal
1164 in length to the prefix
1165 */
covers_lookup(const rocksdb::Slice * const unpack_info,const MY_BITMAP * const lookup_bitmap) const1166 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
1167 const MY_BITMAP *const lookup_bitmap) const {
1168 DBUG_ASSERT(lookup_bitmap != nullptr);
1169 if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
1170 return false;
1171 }
1172
1173 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1174
1175 // Check if this unpack_info has a covered_bitmap
1176 const char *unpack_header = unp_reader.get_current_ptr();
1177 const bool has_covered_unpack_info =
1178 unp_reader.remaining_bytes() &&
1179 unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
1180 if (!has_covered_unpack_info ||
1181 !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
1182 return false;
1183 }
1184
1185 MY_BITMAP covered_bitmap;
1186 my_bitmap_map covered_bits;
1187 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1188 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1189 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1190 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1191
1192 return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
1193 }
1194
1195 /* Indicates that all key parts can be unpacked to cover a secondary lookup */
can_cover_lookup() const1196 bool Rdb_key_def::can_cover_lookup() const {
1197 for (uint i = 0; i < m_key_parts; i++) {
1198 if (!m_pack_info[i].m_covered) return false;
1199 }
1200 return true;
1201 }
1202
pack_field(Field * const field,Rdb_field_packing * pack_info,uchar * tuple,uchar * const packed_tuple,uchar * const pack_buffer,Rdb_string_writer * const unpack_info,uint * const n_null_fields) const1203 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
1204 uchar *tuple, uchar *const packed_tuple,
1205 uchar *const pack_buffer,
1206 Rdb_string_writer *const unpack_info,
1207 uint *const n_null_fields) const {
1208 if (field->real_maybe_null()) {
1209 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
1210 if (field->is_real_null()) {
1211 /* NULL value. store '\0' so that it sorts before non-NULL values */
1212 *tuple++ = 0;
1213 /* That's it, don't store anything else */
1214 if (n_null_fields) (*n_null_fields)++;
1215 return tuple;
1216 } else {
1217 /* Not a NULL value. Store '1' */
1218 *tuple++ = 1;
1219 }
1220 }
1221
1222 const bool create_unpack_info =
1223 (unpack_info && // we were requested to generate unpack_info
1224 pack_info->uses_unpack_info()); // and this keypart uses it
1225 Rdb_pack_field_context pack_ctx(unpack_info);
1226
1227 // Set the offset for methods which do not take an offset as an argument
1228 DBUG_ASSERT(
1229 is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
1230
1231 (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
1232
1233 /* Make "unpack info" to be stored in the value */
1234 if (create_unpack_info) {
1235 (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
1236 &pack_ctx);
1237 }
1238
1239 return tuple;
1240 }
1241
1242 /**
1243 Get index columns from the record and pack them into mem-comparable form.
1244
1245 @param
1246 tbl Table we're working on
1247 record IN Record buffer with fields in table->record format
1248 pack_buffer IN Temporary area for packing varchars. The size is
1249 at least max_storage_fmt_length() bytes.
1250 packed_tuple OUT Key in the mem-comparable form
1251 unpack_info OUT Unpack data
1252 unpack_info_len OUT Unpack data length
1253 n_key_parts Number of keyparts to process. 0 means all of them.
1254 n_null_fields OUT Number of key fields with NULL value.
1255 ttl_bytes IN Previous ttl bytes from old record for update case or
1256 current ttl bytes from just packed primary key/value
1257 @detail
1258 Some callers do not need the unpack information, they can pass
1259 unpack_info=nullptr, unpack_info_len=nullptr.
1260
1261 @return
1262 Length of the packed tuple
1263 */
1264
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool should_store_row_debug_checksums,const longlong hidden_pk_id,uint n_key_parts,uint * const n_null_fields,const char * const ttl_bytes) const1265 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
1266 const uchar *const record,
1267 uchar *const packed_tuple,
1268 Rdb_string_writer *const unpack_info,
1269 const bool should_store_row_debug_checksums,
1270 const longlong hidden_pk_id, uint n_key_parts,
1271 uint *const n_null_fields,
1272 const char *const ttl_bytes) const {
1273 DBUG_ASSERT(tbl != nullptr);
1274 DBUG_ASSERT(pack_buffer != nullptr);
1275 DBUG_ASSERT(record != nullptr);
1276 DBUG_ASSERT(packed_tuple != nullptr);
1277 // Checksums for PKs are made when record is packed.
1278 // We should never attempt to make checksum just from PK values
1279 DBUG_ASSERT_IMP(should_store_row_debug_checksums,
1280 (m_index_type == INDEX_TYPE_SECONDARY));
1281
1282 uchar *tuple = packed_tuple;
1283 size_t unpack_start_pos = size_t(-1);
1284 size_t unpack_len_pos = size_t(-1);
1285 size_t covered_bitmap_pos = size_t(-1);
1286 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1287
1288 rdb_netbuf_store_index(tuple, m_index_number);
1289 tuple += INDEX_NUMBER_SIZE;
1290
1291 // If n_key_parts is 0, it means all columns.
1292 // The following includes the 'extended key' tail.
1293 // The 'extended key' includes primary key. This is done to 'uniqify'
1294 // non-unique indexes
1295 const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1296
1297 // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1298 // hidden key part. So we skip it (its always 1 part).
1299 if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
1300 n_key_parts = m_key_parts - 1;
1301 } else if (use_all_columns) {
1302 n_key_parts = m_key_parts;
1303 }
1304
1305 if (n_null_fields) *n_null_fields = 0;
1306
1307 // Check if we need a covered bitmap. If it is certain that all key parts are
1308 // covering, we don't need one.
1309 bool store_covered_bitmap = false;
1310 if (unpack_info && use_covered_bitmap_format()) {
1311 for (uint i = 0; i < n_key_parts; i++) {
1312 if (!m_pack_info[i].m_covered) {
1313 store_covered_bitmap = true;
1314 break;
1315 }
1316 }
1317 }
1318
1319 const char tag =
1320 store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1321
1322 if (unpack_info) {
1323 unpack_info->clear();
1324
1325 if (m_index_type == INDEX_TYPE_SECONDARY &&
1326 m_total_index_flags_length > 0) {
1327 // Reserve space for index flag fields
1328 unpack_info->allocate(m_total_index_flags_length);
1329
1330 // Insert TTL timestamp
1331 if (has_ttl() && ttl_bytes) {
1332 write_index_flag_field(unpack_info,
1333 reinterpret_cast<const uchar *>(ttl_bytes),
1334 Rdb_key_def::TTL_FLAG);
1335 }
1336 }
1337
1338 unpack_start_pos = unpack_info->get_current_pos();
1339 unpack_info->write_uint8(tag);
1340 unpack_len_pos = unpack_info->get_current_pos();
1341 // we don't know the total length yet, so write a zero
1342 unpack_info->write_uint16(0);
1343
1344 if (store_covered_bitmap) {
1345 // Reserve two bytes for the covered bitmap. This will store, for key
1346 // parts which are not always covering, whether or not it is covering
1347 // for this record.
1348 covered_bitmap_pos = unpack_info->get_current_pos();
1349 unpack_info->write_uint16(0);
1350 }
1351 }
1352
1353 MY_BITMAP covered_bitmap;
1354 my_bitmap_map covered_bits;
1355 uint curr_bitmap_pos = 0;
1356 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1357
1358 for (uint i = 0; i < n_key_parts; i++) {
1359 // Fill hidden pk id into the last key part for secondary keys for tables
1360 // with no pk
1361 if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1362 m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1363 break;
1364 }
1365
1366 Field *const field = m_pack_info[i].get_field_in_table(tbl);
1367 DBUG_ASSERT(field != nullptr);
1368
1369 uint field_offset = field->ptr - tbl->record[0];
1370 uint null_offset = field->null_offset(tbl->record[0]);
1371 bool maybe_null = field->real_maybe_null();
1372
1373 field->move_field(
1374 const_cast<uchar *>(record) + field_offset,
1375 maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
1376 field->null_bit);
1377 // WARNING! Don't return without restoring field->ptr and field->null_ptr
1378
1379 tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1380 unpack_info, n_null_fields);
1381
1382 // If this key part is a prefix of a VARCHAR field, check if it's covered.
1383 if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1384 !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1385 size_t data_length = field->data_length();
1386 uint16 key_length;
1387 if (m_pk_part_no[i] == (uint)-1) {
1388 key_length = tbl->key_info[get_keyno()].key_part[i].length;
1389 } else {
1390 key_length =
1391 tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1392 }
1393
1394 if (m_pack_info[i].m_unpack_func != nullptr &&
1395 data_length <= key_length) {
1396 bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1397 }
1398 curr_bitmap_pos++;
1399 }
1400
1401 // Restore field->ptr and field->null_ptr
1402 field->move_field(tbl->record[0] + field_offset,
1403 maybe_null ? tbl->record[0] + null_offset : nullptr,
1404 field->null_bit);
1405 }
1406
1407 if (unpack_info) {
1408 const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1409 DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
1410
1411 // Don't store the unpack_info if it has only the header (that is, there's
1412 // no meaningful content).
1413 // Primary Keys are special: for them, store the unpack_info even if it's
1414 // empty (provided m_maybe_unpack_info==true, see
1415 // ha_rocksdb::convert_record_to_storage_format)
1416 if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1417 if (len == get_unpack_header_size(tag) && !covered_bits) {
1418 unpack_info->truncate(unpack_start_pos);
1419 } else if (store_covered_bitmap) {
1420 unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1421 }
1422 } else {
1423 unpack_info->write_uint16_at(unpack_len_pos, len);
1424 }
1425
1426 //
1427 // Secondary keys have key and value checksums in the value part
1428 // Primary key is a special case (the value part has non-indexed columns),
1429 // so the checksums are computed and stored by
1430 // ha_rocksdb::convert_record_to_storage_format
1431 //
1432 if (should_store_row_debug_checksums) {
1433 const uint32_t key_crc32 = crc32(0, packed_tuple, tuple - packed_tuple);
1434 const uint32_t val_crc32 =
1435 crc32(0, unpack_info->ptr(), unpack_info->get_current_pos());
1436
1437 unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1438 unpack_info->write_uint32(key_crc32);
1439 unpack_info->write_uint32(val_crc32);
1440 }
1441 }
1442
1443 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1444
1445 return tuple - packed_tuple;
1446 }
1447
1448 /**
1449 Pack the hidden primary key into mem-comparable form.
1450
1451 @param
1452 tbl Table we're working on
1453 hidden_pk_id IN New value to be packed into key
1454 packed_tuple OUT Key in the mem-comparable form
1455
1456 @return
1457 Length of the packed tuple
1458 */
1459
pack_hidden_pk(const longlong hidden_pk_id,uchar * const packed_tuple) const1460 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
1461 uchar *const packed_tuple) const {
1462 DBUG_ASSERT(packed_tuple != nullptr);
1463
1464 uchar *tuple = packed_tuple;
1465 rdb_netbuf_store_index(tuple, m_index_number);
1466 tuple += INDEX_NUMBER_SIZE;
1467 DBUG_ASSERT(m_key_parts == 1);
1468 DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
1469 m_pack_info[0].m_max_image_len));
1470
1471 m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1472
1473 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1474 return tuple - packed_tuple;
1475 }
1476
1477 /*
1478 Function of type rdb_index_field_pack_t
1479 */
1480
pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1481 void Rdb_key_def::pack_with_make_sort_key(
1482 Rdb_field_packing *const fpi, Field *const field,
1483 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1484 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1485 DBUG_ASSERT(fpi != nullptr);
1486 DBUG_ASSERT(field != nullptr);
1487 DBUG_ASSERT(dst != nullptr);
1488 DBUG_ASSERT(*dst != nullptr);
1489
1490 const int max_len = fpi->m_max_image_len;
1491 MY_BITMAP*old_map;
1492
1493 old_map= dbug_tmp_use_all_columns(field->table,
1494 &field->table->read_set);
1495 field->sort_string(*dst, max_len);
1496 dbug_tmp_restore_column_map(&field->table->read_set, old_map);
1497 *dst += max_len;
1498 }
1499
1500 /*
1501 Compares two keys without unpacking
1502
1503 @detail
1504 @return
1505 0 - Ok. column_index is the index of the first column which is different.
1506 -1 if two kes are equal
1507 1 - Data format error.
1508 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const1509 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
1510 const rocksdb::Slice *key2,
1511 std::size_t *const column_index) const {
1512 DBUG_ASSERT(key1 != nullptr);
1513 DBUG_ASSERT(key2 != nullptr);
1514 DBUG_ASSERT(column_index != nullptr);
1515
1516 // the caller should check the return value and
1517 // not rely on column_index being valid
1518 *column_index = 0xbadf00d;
1519
1520 Rdb_string_reader reader1(key1);
1521 Rdb_string_reader reader2(key2);
1522
1523 // Skip the index number
1524 if ((!reader1.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1525
1526 if ((!reader2.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1527
1528 for (uint i = 0; i < m_key_parts; i++) {
1529 const Rdb_field_packing *const fpi = &m_pack_info[i];
1530 if (fpi->m_maybe_null) {
1531 const auto nullp1 = reader1.read(1);
1532 const auto nullp2 = reader2.read(1);
1533
1534 if (nullp1 == nullptr || nullp2 == nullptr) {
1535 return HA_EXIT_FAILURE;
1536 }
1537
1538 if (*nullp1 != *nullp2) {
1539 *column_index = i;
1540 return HA_EXIT_SUCCESS;
1541 }
1542
1543 if (*nullp1 == 0) {
1544 /* This is a NULL value */
1545 continue;
1546 }
1547 }
1548
1549 const auto before_skip1 = reader1.get_current_ptr();
1550 const auto before_skip2 = reader2.get_current_ptr();
1551 DBUG_ASSERT(fpi->m_skip_func);
1552 if ((fpi->m_skip_func)(fpi, nullptr, &reader1)) {
1553 return HA_EXIT_FAILURE;
1554 }
1555 if ((fpi->m_skip_func)(fpi, nullptr, &reader2)) {
1556 return HA_EXIT_FAILURE;
1557 }
1558 const auto size1 = reader1.get_current_ptr() - before_skip1;
1559 const auto size2 = reader2.get_current_ptr() - before_skip2;
1560 if (size1 != size2) {
1561 *column_index = i;
1562 return HA_EXIT_SUCCESS;
1563 }
1564
1565 if (memcmp(before_skip1, before_skip2, size1) != 0) {
1566 *column_index = i;
1567 return HA_EXIT_SUCCESS;
1568 }
1569 }
1570
1571 *column_index = m_key_parts;
1572 return HA_EXIT_SUCCESS;
1573 }
1574
1575 /*
1576 @brief
1577 Given a zero-padded key, determine its real key length
1578
1579 @detail
1580 Fixed-size skip functions just read.
1581 */
1582
key_length(const TABLE * const table,const rocksdb::Slice & key) const1583 size_t Rdb_key_def::key_length(const TABLE *const table,
1584 const rocksdb::Slice &key) const {
1585 DBUG_ASSERT(table != nullptr);
1586
1587 Rdb_string_reader reader(&key);
1588
1589 if ((!reader.read(INDEX_NUMBER_SIZE))) {
1590 return size_t(-1);
1591 }
1592 for (uint i = 0; i < m_key_parts; i++) {
1593 const Rdb_field_packing *fpi = &m_pack_info[i];
1594 const Field *field = nullptr;
1595 if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY) {
1596 field = fpi->get_field_in_table(table);
1597 }
1598 if ((fpi->m_skip_func)(fpi, field, &reader)) {
1599 return size_t(-1);
1600 }
1601 }
1602 return key.size() - reader.remaining_bytes();
1603 }
1604
1605 /*
1606 Take mem-comparable form and unpack_info and unpack it to Table->record
1607
1608 @detail
1609 not all indexes support this
1610
1611 @return
1612 HA_EXIT_SUCCESS OK
1613 other HA_ERR error code
1614 */
1615
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool verify_row_debug_checksums) const1616 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
1617 const rocksdb::Slice *const packed_key,
1618 const rocksdb::Slice *const unpack_info,
1619 const bool verify_row_debug_checksums) const {
1620 Rdb_string_reader reader(packed_key);
1621 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1622
1623 // There is no checksuming data after unpack_info for primary keys, because
1624 // the layout there is different. The checksum is verified in
1625 // ha_rocksdb::convert_record_from_storage_format instead.
1626 DBUG_ASSERT_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
1627 !verify_row_debug_checksums);
1628
1629 // Skip the index number
1630 if ((!reader.read(INDEX_NUMBER_SIZE))) {
1631 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1632 }
1633
1634 // For secondary keys, we expect the value field to contain index flags,
1635 // unpack data, and checksum data in that order. One or all can be missing,
1636 // but they cannot be reordered.
1637 if (unp_reader.remaining_bytes()) {
1638 if (m_index_type == INDEX_TYPE_SECONDARY &&
1639 m_total_index_flags_length > 0 &&
1640 !unp_reader.read(m_total_index_flags_length)) {
1641 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1642 }
1643 }
1644
1645 const char *unpack_header = unp_reader.get_current_ptr();
1646 bool has_unpack_info =
1647 unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
1648 if (has_unpack_info) {
1649 if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
1650 return HA_ERR_ROCKSDB_CORRUPT_DATA;
1651 }
1652 }
1653
1654 // Read the covered bitmap
1655 MY_BITMAP covered_bitmap;
1656 my_bitmap_map covered_bits;
1657 bool has_covered_bitmap =
1658 has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
1659 if (has_covered_bitmap) {
1660 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1661 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1662 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1663 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1664 }
1665
1666 int err = HA_EXIT_SUCCESS;
1667
1668
1669 Rdb_key_field_iterator iter(
1670 this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
1671 has_covered_bitmap ? &covered_bitmap : nullptr, buf);
1672 while (iter.has_next()) {
1673 err = iter.next();
1674 if (err) {
1675 return err;
1676 }
1677 }
1678
1679 /*
1680 Check checksum values if present
1681 */
1682 const char *ptr;
1683 if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
1684 if (verify_row_debug_checksums) {
1685 uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
1686 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1687 const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
1688 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1689
1690 const uint32_t computed_key_chksum =
1691 crc32(0, (const uchar *)packed_key->data(), packed_key->size());
1692 const uint32_t computed_val_chksum =
1693 crc32(0, (const uchar *)unpack_info->data(),
1694 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1695
1696 DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
1697 stored_key_chksum++;);
1698
1699 if (stored_key_chksum != computed_key_chksum) {
1700 report_checksum_mismatch(true, packed_key->data(), packed_key->size());
1701 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1702 }
1703
1704 if (stored_val_chksum != computed_val_chksum) {
1705 report_checksum_mismatch(false, unpack_info->data(),
1706 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1707 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1708 }
1709 } else {
1710 /* The checksums are present but we are not checking checksums */
1711 }
1712 }
1713
1714 if (reader.remaining_bytes()) return HA_ERR_ROCKSDB_CORRUPT_DATA;
1715
1716 return HA_EXIT_SUCCESS;
1717 }
1718
table_has_hidden_pk(const TABLE * const table)1719 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
1720 return table->s->primary_key == MAX_INDEXES;
1721 }
1722
report_checksum_mismatch(const bool is_key,const char * const data,const size_t data_size) const1723 void Rdb_key_def::report_checksum_mismatch(const bool is_key,
1724 const char *const data,
1725 const size_t data_size) const {
1726 // NO_LINT_DEBUG
1727 sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
1728 is_key ? "key" : "value", get_index_number());
1729
1730 const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
1731 // NO_LINT_DEBUG
1732 sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
1733 (uint64_t)data_size, buf.c_str());
1734
1735 my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1736 }
1737
index_format_min_check(const int pk_min,const int sk_min) const1738 bool Rdb_key_def::index_format_min_check(const int pk_min,
1739 const int sk_min) const {
1740 switch (m_index_type) {
1741 case INDEX_TYPE_PRIMARY:
1742 case INDEX_TYPE_HIDDEN_PRIMARY:
1743 return (m_kv_format_version >= pk_min);
1744 case INDEX_TYPE_SECONDARY:
1745 return (m_kv_format_version >= sk_min);
1746 default:
1747 DBUG_ASSERT(0);
1748 return false;
1749 }
1750 }
1751
1752 ///////////////////////////////////////////////////////////////////////////////////////////
1753 // Rdb_field_packing
1754 ///////////////////////////////////////////////////////////////////////////////////////////
1755
1756 /*
1757 Function of type rdb_index_field_skip_t
1758 */
1759
skip_max_length(const Rdb_field_packing * const fpi,const Field * const field MY_ATTRIBUTE ((__unused__)),Rdb_string_reader * const reader)1760 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
1761 const Field *const field
1762 MY_ATTRIBUTE((__unused__)),
1763 Rdb_string_reader *const reader) {
1764 if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
1765 return HA_EXIT_SUCCESS;
1766 }
1767
1768 /*
1769 (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1770 split in the middle of an UTF-8 character. See the implementation of
1771 unpack_binary_or_utf8_varchar.
1772 */
1773 #define RDB_ESCAPE_LENGTH 9
1774 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
1775 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1776 "RDB_ESCAPE_LENGTH-1 must be even.");
1777
1778 #define RDB_ENCODED_SIZE(len) \
1779 ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
1780 RDB_ESCAPE_LENGTH
1781
1782 #define RDB_LEGACY_ENCODED_SIZE(len) \
1783 ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
1784 RDB_LEGACY_ESCAPE_LENGTH
1785
1786 /*
1787 Function of type rdb_index_field_skip_t
1788 */
1789
skip_variable_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1790 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
1791 const Field *const field,
1792 Rdb_string_reader *const reader) {
1793 const uchar *ptr;
1794 bool finished = false;
1795
1796 size_t dst_len; /* How much data can be there */
1797 if (field) {
1798 const Field_varstring *const field_var =
1799 static_cast<const Field_varstring *>(field);
1800 dst_len = field_var->pack_length() - field_var->length_bytes;
1801 } else {
1802 dst_len = UINT_MAX;
1803 }
1804
1805 bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
1806
1807 /* Decode the length-emitted encoding here */
1808 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1809 uint used_bytes;
1810
1811 /* See pack_with_varchar_encoding. */
1812 if (use_legacy_format) {
1813 used_bytes = calc_unpack_legacy_variable_format(
1814 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1815 } else {
1816 used_bytes =
1817 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1818 }
1819
1820 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
1821 return HA_EXIT_FAILURE; // Corruption in the data
1822 }
1823
1824 if (finished) {
1825 break;
1826 }
1827
1828 dst_len -= used_bytes;
1829 }
1830
1831 if (!finished) {
1832 return HA_EXIT_FAILURE;
1833 }
1834
1835 return HA_EXIT_SUCCESS;
1836 }
1837
1838 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1839 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1840 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1841
1842 /*
1843 Skip a keypart that uses Variable-Length Space-Padded encoding
1844 */
1845
skip_variable_space_pad(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1846 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
1847 const Field *const field,
1848 Rdb_string_reader *const reader) {
1849 const uchar *ptr;
1850 bool finished = false;
1851
1852 size_t dst_len = UINT_MAX; /* How much data can be there */
1853
1854 if (field) {
1855 const Field_varstring *const field_var =
1856 static_cast<const Field_varstring *>(field);
1857 dst_len = field_var->pack_length() - field_var->length_bytes;
1858 }
1859
1860 /* Decode the length-emitted encoding here */
1861 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1862 // See pack_with_varchar_space_pad
1863 const uchar c = ptr[fpi->m_segment_size - 1];
1864 if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1865 // This is the last segment
1866 finished = true;
1867 break;
1868 } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1869 c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1870 // This is not the last segment
1871 if ((fpi->m_segment_size - 1) > dst_len) {
1872 // The segment is full of data but the table field can't hold that
1873 // much! This must be data corruption.
1874 return HA_EXIT_FAILURE;
1875 }
1876 dst_len -= (fpi->m_segment_size - 1);
1877 } else {
1878 // Encountered a value that's none of the VARCHAR_CMP* constants
1879 // It's data corruption.
1880 return HA_EXIT_FAILURE;
1881 }
1882 }
1883 return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1884 }
1885
1886 /*
1887 Function of type rdb_index_field_unpack_t
1888 */
1889
unpack_integer(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))1890 int Rdb_key_def::unpack_integer(
1891 Rdb_field_packing *const fpi, Field *const field, uchar *const to,
1892 Rdb_string_reader *const reader,
1893 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
1894 const int length = fpi->m_max_image_len;
1895
1896 const uchar *from;
1897 if (!(from = (const uchar *)reader->read(length))) {
1898 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1899 }
1900
1901 #ifdef WORDS_BIGENDIAN
1902 {
1903 if (static_cast<Field_num *>(field)->unsigned_flag) {
1904 to[0] = from[0];
1905 } else {
1906 to[0] = static_cast<char>(from[0] ^ 128); // Reverse the sign bit.
1907 }
1908 memcpy(to + 1, from + 1, length - 1);
1909 }
1910 #else
1911 {
1912 const int sign_byte = from[0];
1913 if (static_cast<Field_num *>(field)->unsigned_flag) {
1914 to[length - 1] = sign_byte;
1915 } else {
1916 to[length - 1] =
1917 static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
1918 }
1919 for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
1920 }
1921 #endif
1922 return UNPACK_SUCCESS;
1923 }
1924
1925 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)1926 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1927 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1928 // A few systems store the most-significant _word_ first on little-endian
1929 dst[0] = src[3];
1930 dst[1] = src[2];
1931 dst[2] = src[1];
1932 dst[3] = src[0];
1933 dst[4] = src[7];
1934 dst[5] = src[6];
1935 dst[6] = src[5];
1936 dst[7] = src[4];
1937 #else
1938 dst[0] = src[7];
1939 dst[1] = src[6];
1940 dst[2] = src[5];
1941 dst[3] = src[4];
1942 dst[4] = src[3];
1943 dst[5] = src[2];
1944 dst[6] = src[1];
1945 dst[7] = src[0];
1946 #endif
1947 }
1948
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)1949 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1950 dst[0] = src[3];
1951 dst[1] = src[2];
1952 dst[2] = src[1];
1953 dst[3] = src[0];
1954 }
1955 #else
1956 #define rdb_swap_double_bytes nullptr
1957 #define rdb_swap_float_bytes nullptr
1958 #endif
1959
unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t size,const int exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))1960 int Rdb_key_def::unpack_floating_point(
1961 uchar *const dst, Rdb_string_reader *const reader, const size_t size,
1962 const int exp_digit, const uchar *const zero_pattern,
1963 const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
1964 const uchar *const from = (const uchar *)reader->read(size);
1965 if (from == nullptr) {
1966 /* Mem-comparable image doesn't have enough bytes */
1967 return UNPACK_FAILURE;
1968 }
1969
1970 /* Check to see if the value is zero */
1971 if (memcmp(from, zero_pattern, size) == 0) {
1972 memcpy(dst, zero_val, size);
1973 return UNPACK_SUCCESS;
1974 }
1975
1976 #if defined(WORDS_BIGENDIAN)
1977 // On big-endian, output can go directly into result
1978 uchar *const tmp = dst;
1979 #else
1980 // Otherwise use a temporary buffer to make byte-swapping easier later
1981 uchar tmp[8];
1982 #endif
1983
1984 memcpy(tmp, from, size);
1985
1986 if (tmp[0] & 0x80) {
1987 // If the high bit is set the original value was positive so
1988 // remove the high bit and subtract one from the exponent.
1989 ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1990 exp_part &= 0x7FFF; // clear high bit;
1991 exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent
1992 tmp[0] = (uchar)(exp_part >> 8);
1993 tmp[1] = (uchar)exp_part;
1994 } else {
1995 // Otherwise the original value was negative and all bytes have been
1996 // negated.
1997 for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
1998 }
1999
2000 #if !defined(WORDS_BIGENDIAN)
2001 // On little-endian, swap the bytes around
2002 swap_func(dst, tmp);
2003 #else
2004 DBUG_ASSERT(swap_func == nullptr);
2005 #endif
2006
2007 return UNPACK_SUCCESS;
2008 }
2009
2010 #if !defined(DBL_EXP_DIG)
2011 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
2012 #endif
2013
2014 /*
2015 Function of type rdb_index_field_unpack_t
2016
2017 Unpack a double by doing the reverse action of change_double_for_sort
2018 (sql/filesort.cc). Note that this only works on IEEE values.
2019 Note also that this code assumes that NaN and +/-Infinity are never
2020 allowed in the database.
2021 */
unpack_double(Rdb_field_packing * const fpi MY_ATTRIBUTE ((__unused__)),Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2022 int Rdb_key_def::unpack_double(
2023 Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
2024 Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr,
2025 Rdb_string_reader *const reader,
2026 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2027 static double zero_val = 0.0;
2028 static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
2029
2030 return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
2031 zero_pattern, (const uchar *)&zero_val,
2032 rdb_swap_double_bytes);
2033 }
2034
2035 #if !defined(FLT_EXP_DIG)
2036 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
2037 #endif
2038
2039 /*
2040 Function of type rdb_index_field_unpack_t
2041
2042 Unpack a float by doing the reverse action of Field_float::make_sort_key
2043 (sql/field.cc). Note that this only works on IEEE values.
2044 Note also that this code assumes that NaN and +/-Infinity are never
2045 allowed in the database.
2046 */
unpack_float(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2047 int Rdb_key_def::unpack_float(
2048 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2049 uchar *const field_ptr, Rdb_string_reader *const reader,
2050 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2051 static float zero_val = 0.0;
2052 static const uchar zero_pattern[4] = {128, 0, 0, 0};
2053
2054 return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
2055 zero_pattern, (const uchar *)&zero_val,
2056 rdb_swap_float_bytes);
2057 }
2058
2059 /*
2060 Function of type rdb_index_field_unpack_t used to
2061 Unpack by doing the reverse action to Field_newdate::make_sort_key.
2062 */
2063
unpack_newdate(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2064 int Rdb_key_def::unpack_newdate(
2065 Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2066 uchar *const field_ptr, Rdb_string_reader *const reader,
2067 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2068 const char *from;
2069 DBUG_ASSERT(fpi->m_max_image_len == 3);
2070
2071 if (!(from = reader->read(3))) {
2072 /* Mem-comparable image doesn't have enough bytes */
2073 return UNPACK_FAILURE;
2074 }
2075
2076 field_ptr[0] = from[2];
2077 field_ptr[1] = from[1];
2078 field_ptr[2] = from[0];
2079 return UNPACK_SUCCESS;
2080 }
2081
2082 /*
2083 Function of type rdb_index_field_unpack_t, used to
2084 Unpack the string by copying it over.
2085 This is for BINARY(n) where the value occupies the whole length.
2086 */
2087
unpack_binary_str(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2088 int Rdb_key_def::unpack_binary_str(
2089 Rdb_field_packing *const fpi, Field *const field, uchar *const to,
2090 Rdb_string_reader *const reader,
2091 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2092 const char *from;
2093 if (!(from = reader->read(fpi->m_max_image_len))) {
2094 /* Mem-comparable image doesn't have enough bytes */
2095 return UNPACK_FAILURE;
2096 }
2097
2098 memcpy(to, from, fpi->m_max_image_len);
2099 return UNPACK_SUCCESS;
2100 }
2101
2102 /*
2103 Function of type rdb_index_field_unpack_t.
2104 For UTF-8, we need to convert 2-byte wide-character entities back into
2105 UTF8 sequences.
2106 */
2107
unpack_utf8_str(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2108 int Rdb_key_def::unpack_utf8_str(
2109 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2110 Rdb_string_reader *const reader,
2111 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2112 my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
2113 const uchar *src;
2114 if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
2115 /* Mem-comparable image doesn't have enough bytes */
2116 return UNPACK_FAILURE;
2117 }
2118
2119 const uchar *const src_end = src + fpi->m_max_image_len;
2120 uchar *const dst_end = dst + field->pack_length();
2121
2122 while (src < src_end) {
2123 my_wc_t wc = (src[0] << 8) | src[1];
2124 src += 2;
2125 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2126 DBUG_ASSERT(res > 0 && res <= 3);
2127 if (res < 0) return UNPACK_FAILURE;
2128 dst += res;
2129 }
2130
2131 cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
2132 cset->pad_char);
2133 return UNPACK_SUCCESS;
2134 }
2135
2136 /*
2137 This is the original algorithm to encode a variable binary field. It
2138 sets a flag byte every Nth byte. The flag value is (255 - #pad) where
2139 #pad is the number of padding bytes that were needed (0 if all N-1
2140 bytes were used).
2141
2142 If N=8 and the field is:
2143 * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2144 * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2145 And the 4 byte string compares as greater than the 3 byte string
2146
2147 Unfortunately the algorithm has a flaw. If the input is exactly a
2148 multiple of N-1, an extra N bytes are written. Since we usually use
2149 N=9, an 8 byte input will generate 18 bytes of output instead of the
2150 9 bytes of output that is optimal.
2151
2152 See pack_variable_format for the newer algorithm.
2153 */
pack_legacy_variable_format(const uchar * src,size_t src_len,uchar ** dst)2154 void Rdb_key_def::pack_legacy_variable_format(
2155 const uchar *src, // The data to encode
2156 size_t src_len, // The length of the data to encode
2157 uchar **dst) // The location to encode the data
2158 {
2159 size_t copy_len;
2160 size_t padding_bytes;
2161 uchar *ptr = *dst;
2162
2163 do {
2164 copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2165 padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2166 memcpy(ptr, src, copy_len);
2167 ptr += copy_len;
2168 src += copy_len;
2169 // pad with zeros if necessary
2170 if (padding_bytes > 0) {
2171 memset(ptr, 0, padding_bytes);
2172 ptr += padding_bytes;
2173 }
2174
2175 *(ptr++) = 255 - padding_bytes;
2176
2177 src_len -= copy_len;
2178 } while (padding_bytes == 0);
2179
2180 *dst = ptr;
2181 }
2182
2183 /*
2184 This is the new algorithm. Similarly to the legacy format the input
2185 is split up into N-1 bytes and a flag byte is used as the Nth byte
2186 in the output.
2187
2188 - If the previous segment needed any padding the flag is set to the
2189 number of bytes used (0..N-2). 0 is possible in the first segment
2190 if the input is 0 bytes long.
2191 - If no padding was used and there is no more data left in the input
2192 the flag is set to N-1
2193 - If no padding was used and there is still data left in the input the
2194 flag is set to N.
2195
2196 For N=9, the following input values encode to the specified
2197 outout (where 'X' indicates a byte of the original input):
2198 - 0 bytes is encoded as 0 0 0 0 0 0 0 0 0
2199 - 1 byte is encoded as X 0 0 0 0 0 0 0 1
2200 - 2 bytes is encoded as X X 0 0 0 0 0 0 2
2201 - 7 bytes is encoded as X X X X X X X 0 7
2202 - 8 bytes is encoded as X X X X X X X X 8
2203 - 9 bytes is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2204 - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2205 */
pack_variable_format(const uchar * src,size_t src_len,uchar ** dst)2206 void Rdb_key_def::pack_variable_format(
2207 const uchar *src, // The data to encode
2208 size_t src_len, // The length of the data to encode
2209 uchar **dst) // The location to encode the data
2210 {
2211 uchar *ptr = *dst;
2212
2213 for (;;) {
2214 // Figure out how many bytes to copy, copy them and adjust pointers
2215 const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2216 memcpy(ptr, src, copy_len);
2217 ptr += copy_len;
2218 src += copy_len;
2219 src_len -= copy_len;
2220
2221 // Are we at the end of the input?
2222 if (src_len == 0) {
2223 // pad with zeros if necessary;
2224 const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2225 if (padding_bytes > 0) {
2226 memset(ptr, 0, padding_bytes);
2227 ptr += padding_bytes;
2228 }
2229
2230 // Put the flag byte (0 - N-1) in the output
2231 *(ptr++) = (uchar)copy_len;
2232 break;
2233 }
2234
2235 // We have more data - put the flag byte (N) in and continue
2236 *(ptr++) = RDB_ESCAPE_LENGTH;
2237 }
2238
2239 *dst = ptr;
2240 }
2241
2242 /*
2243 Function of type rdb_index_field_pack_t
2244 */
2245
pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))2246 void Rdb_key_def::pack_with_varchar_encoding(
2247 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2248 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
2249 const CHARSET_INFO *const charset = field->charset();
2250 Field_varstring *const field_var = (Field_varstring *)field;
2251
2252 const size_t value_length = (field_var->length_bytes == 1)
2253 ? (uint)*field->ptr
2254 : uint2korr(field->ptr);
2255 size_t xfrm_len = charset->coll->strnxfrm(
2256 charset, buf, fpi->m_max_image_len, field_var->char_length(),
2257 field_var->ptr + field_var->length_bytes, value_length, 0);
2258
2259 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2260 if (fpi->m_use_legacy_varbinary_format) {
2261 pack_legacy_variable_format(buf, xfrm_len, dst);
2262 } else {
2263 pack_variable_format(buf, xfrm_len, dst);
2264 }
2265 }
2266
2267 /*
2268 Compare the string in [buf..buf_end) with a string that is an infinite
2269 sequence of strings in space_xfrm
2270 */
2271
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)2272 static int rdb_compare_string_with_spaces(
2273 const uchar *buf, const uchar *const buf_end,
2274 const std::vector<uchar> *const space_xfrm) {
2275 int cmp = 0;
2276 while (buf < buf_end) {
2277 size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2278 if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
2279 buf += bytes;
2280 }
2281 return cmp;
2282 }
2283
2284 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2285 /*
2286 Pack the data with Variable-Length Space-Padded Encoding.
2287
2288 The encoding is there to meet two goals:
2289
2290 Goal#1. Comparison. The SQL standard says
2291
2292 " If the collation for the comparison has the PAD SPACE characteristic,
2293 for the purposes of the comparison, the shorter value is effectively
2294 extended to the length of the longer by concatenation of <space>s on the
2295 right.
2296
2297 At the moment, all MySQL collations except one have the PAD SPACE
2298 characteristic. The exception is the "binary" collation that is used by
2299 [VAR]BINARY columns. (Note that binary collations for specific charsets,
2300 like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2301 the PAD SPACE characteristic).
2302
2303 Goal#2 is to preserve the number of trailing spaces in the original value.
2304
2305 This is achieved by using the following encoding:
2306 The key part:
2307 - Stores mem-comparable image of the column
2308 - It is stored in chunks of fpi->m_segment_size bytes (*)
2309 = If the remainder of the chunk is not occupied, it is padded with mem-
2310 comparable image of the space character (cs->pad_char to be precise).
2311 - The last byte of the chunk shows how the rest of column's mem-comparable
2312 image would compare to mem-comparable image of the column extended with
2313 spaces. There are three possible values.
2314 - VARCHAR_CMP_LESS_THAN_SPACES,
2315 - VARCHAR_CMP_EQUAL_TO_SPACES
2316 - VARCHAR_CMP_GREATER_THAN_SPACES
2317
2318 VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2319 is spaces, or something that sorts as spaces, so there is no reason to store
2320 it).
2321
2322 Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2323
2324 'abcd\0' => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0 ' <VARCHAR_CMP_EQUAL> ]
2325 'abcd' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2326 'abcd ' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2327 'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2328
2329 As mentioned above, the last chunk is padded with mem-comparable images of
2330 cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2331
2332 fpi->m_segment_size depends on the used collation. It is chosen to be such
2333 that no mem-comparable image of space will ever stretch across the segments
2334 (see get_segment_size_from_collation).
2335
2336 == The value part (aka unpack_info) ==
2337 The value part stores the number of space characters that one needs to add
2338 when unpacking the string.
2339 - If the number is positive, it means add this many spaces at the end
2340 - If the number is negative, it means padding has added extra spaces which
2341 must be removed.
2342
2343 Storage considerations
2344 - depending on column's max size, the number may occupy 1 or 2 bytes
2345 - the number of spaces that need to be removed is not more than
2346 RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2347 then store it as unsigned.
2348
2349 @seealso
2350 unpack_binary_or_utf8_varchar_space_pad
2351 unpack_simple_varchar_space_pad
2352 dummy_make_unpack_info
2353 skip_variable_space_pad
2354 */
2355
pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)2356 void Rdb_key_def::pack_with_varchar_space_pad(
2357 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2358 Rdb_pack_field_context *const pack_ctx) {
2359 Rdb_string_writer *const unpack_info = pack_ctx->writer;
2360 const CHARSET_INFO *const charset = field->charset();
2361 const auto field_var = static_cast<Field_varstring *>(field);
2362
2363 const size_t value_length = (field_var->length_bytes == 1)
2364 ? (uint)*field->ptr
2365 : uint2korr(field->ptr);
2366
2367 const size_t trimmed_len = charset->cset->lengthsp(
2368 charset, (const char *)field_var->ptr + field_var->length_bytes,
2369 value_length);
2370 const size_t xfrm_len = charset->coll->strnxfrm(
2371 charset, buf, fpi->m_max_image_len, field_var->char_length(),
2372 field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2373
2374 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2375 uchar *const buf_end = buf + xfrm_len;
2376
2377 size_t encoded_size = 0;
2378 uchar *ptr = *dst;
2379 size_t padding_bytes;
2380 while (true) {
2381 const size_t copy_len =
2382 std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2383 padding_bytes = fpi->m_segment_size - 1 - copy_len;
2384 memcpy(ptr, buf, copy_len);
2385 ptr += copy_len;
2386 buf += copy_len;
2387
2388 if (padding_bytes) {
2389 memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2390 ptr += padding_bytes;
2391 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment
2392 } else {
2393 // Compare the string suffix with a hypothetical infinite string of
2394 // spaces. It could be that the first difference is beyond the end of
2395 // current chunk.
2396 const int cmp =
2397 rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2398
2399 if (cmp < 0) {
2400 *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2401 } else if (cmp > 0) {
2402 *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2403 } else {
2404 // It turns out all the rest are spaces.
2405 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2406 }
2407 }
2408 encoded_size += fpi->m_segment_size;
2409
2410 if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
2411 }
2412
2413 // m_unpack_info_stores_value means unpack_info stores the whole original
2414 // value. There is no need to store the number of trimmed/padded endspaces
2415 // in that case.
2416 if (unpack_info && !fpi->m_unpack_info_stores_value) {
2417 // (value_length - trimmed_len) is the number of trimmed space *characters*
2418 // then, padding_bytes is the number of *bytes* added as padding
2419 // then, we add 8, because we don't store negative values.
2420 DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
2421 DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
2422 const size_t removed_chars =
2423 RDB_TRIMMED_CHARS_OFFSET +
2424 (value_length - trimmed_len) / fpi->space_mb_len -
2425 padding_bytes / fpi->space_xfrm_len;
2426
2427 if (fpi->m_unpack_info_uses_two_bytes) {
2428 unpack_info->write_uint16(removed_chars);
2429 } else {
2430 DBUG_ASSERT(removed_chars < 0x100);
2431 unpack_info->write_uint8(removed_chars);
2432 }
2433 }
2434
2435 *dst += encoded_size;
2436 }
2437
2438 /*
2439 Calculate the number of used bytes in the chunk and whether this is the
2440 last chunk in the input. This is based on the old legacy format - see
2441 pack_legacy_variable_format.
2442 */
calc_unpack_legacy_variable_format(uchar flag,bool * done)2443 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
2444 uint pad = 255 - flag;
2445 uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2446 if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2447 return (uint)-1;
2448 }
2449
2450 *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2451 return used_bytes;
2452 }
2453
2454 /*
2455 Calculate the number of used bytes in the chunk and whether this is the
2456 last chunk in the input. This is based on the new format - see
2457 pack_variable_format.
2458 */
calc_unpack_variable_format(uchar flag,bool * done)2459 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
2460 // Check for invalid flag values
2461 if (flag > RDB_ESCAPE_LENGTH) {
2462 return (uint)-1;
2463 }
2464
2465 // Values from 1 to N-1 indicate this is the last chunk and that is how
2466 // many bytes were used
2467 if (flag < RDB_ESCAPE_LENGTH) {
2468 *done = true;
2469 return flag;
2470 }
2471
2472 // A value of N means we used N-1 bytes and had more to go
2473 *done = false;
2474 return RDB_ESCAPE_LENGTH - 1;
2475 }
2476
2477 /*
2478 Unpack data that has charset information. Each two bytes of the input is
2479 treated as a wide-character and converted to its multibyte equivalent in
2480 the output.
2481 */
unpack_charset(const CHARSET_INFO * cset,const uchar * src,uint src_len,uchar * dst,uint dst_len,uint * used_bytes)2482 static int unpack_charset(
2483 const CHARSET_INFO *cset, // character set information
2484 const uchar *src, // source data to unpack
2485 uint src_len, // length of source data
2486 uchar *dst, // destination of unpacked data
2487 uint dst_len, // length of destination data
2488 uint *used_bytes) // output number of bytes used
2489 {
2490 if (src_len & 1) {
2491 /*
2492 UTF-8 characters are encoded into two-byte entities. There is no way
2493 we can have an odd number of bytes after encoding.
2494 */
2495 return UNPACK_FAILURE;
2496 }
2497
2498 uchar *dst_end = dst + dst_len;
2499 uint used = 0;
2500
2501 for (uint ii = 0; ii < src_len; ii += 2) {
2502 my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2503 int res = cset->cset->wc_mb(cset, wc, dst + used, dst_end);
2504 DBUG_ASSERT(res > 0 && res <= 3);
2505 if (res < 0) {
2506 return UNPACK_FAILURE;
2507 }
2508
2509 used += res;
2510 }
2511
2512 *used_bytes = used;
2513 return UNPACK_SUCCESS;
2514 }
2515
2516 /*
2517 Function of type rdb_index_field_unpack_t
2518 */
2519
unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2520 int Rdb_key_def::unpack_binary_or_utf8_varchar(
2521 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2522 Rdb_string_reader *const reader,
2523 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2524 const uchar *ptr;
2525 size_t len = 0;
2526 bool finished = false;
2527 uchar *d0 = dst;
2528 Field_varstring *const field_var = (Field_varstring *)field;
2529 dst += field_var->length_bytes;
2530 // How much we can unpack
2531 size_t dst_len = field_var->pack_length() - field_var->length_bytes;
2532
2533 bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
2534
2535 /* Decode the length-emitted encoding here */
2536 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2537 uint used_bytes;
2538
2539 /* See pack_with_varchar_encoding. */
2540 if (use_legacy_format) {
2541 used_bytes = calc_unpack_legacy_variable_format(
2542 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2543 } else {
2544 used_bytes =
2545 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2546 }
2547
2548 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2549 return UNPACK_FAILURE; // Corruption in the data
2550 }
2551
2552 /*
2553 Now, we need to decode used_bytes of data and append them to the value.
2554 */
2555 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2556 int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst,
2557 dst_len, &used_bytes);
2558 if (err != UNPACK_SUCCESS) {
2559 return err;
2560 }
2561 } else {
2562 memcpy(dst, ptr, used_bytes);
2563 }
2564
2565 dst += used_bytes;
2566 dst_len -= used_bytes;
2567 len += used_bytes;
2568
2569 if (finished) {
2570 break;
2571 }
2572 }
2573
2574 if (!finished) {
2575 return UNPACK_FAILURE;
2576 }
2577
2578 /* Save the length */
2579 if (field_var->length_bytes == 1) {
2580 d0[0] = (uchar)len;
2581 } else {
2582 DBUG_ASSERT(field_var->length_bytes == 2);
2583 int2store(d0, len);
2584 }
2585 return UNPACK_SUCCESS;
2586 }
2587
2588 /*
2589 @seealso
2590 pack_with_varchar_space_pad - packing function
2591 unpack_simple_varchar_space_pad - unpacking function for 'simple'
2592 charsets.
2593 skip_variable_space_pad - skip function
2594 */
unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2595 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
2596 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2597 Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2598 const uchar *ptr;
2599 size_t len = 0;
2600 bool finished = false;
2601 Field_varstring *const field_var = static_cast<Field_varstring *>(field);
2602 uchar *d0 = dst;
2603 uchar *dst_end = dst + field_var->pack_length();
2604 dst += field_var->length_bytes;
2605
2606 uint space_padding_bytes = 0;
2607 uint extra_spaces;
2608 if ((fpi->m_unpack_info_uses_two_bytes
2609 ? unp_reader->read_uint16(&extra_spaces)
2610 : unp_reader->read_uint8(&extra_spaces))) {
2611 return UNPACK_FAILURE;
2612 }
2613
2614 if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
2615 space_padding_bytes =
2616 -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
2617 extra_spaces = 0;
2618 } else {
2619 extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
2620 }
2621
2622 space_padding_bytes *= fpi->space_xfrm_len;
2623
2624 /* Decode the length-emitted encoding here */
2625 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2626 const char last_byte = ptr[fpi->m_segment_size - 1];
2627 size_t used_bytes;
2628 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment
2629 {
2630 if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2631 return UNPACK_FAILURE; // Cannot happen, corrupted data
2632 }
2633 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2634 finished = true;
2635 } else {
2636 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2637 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2638 return UNPACK_FAILURE; // Invalid value
2639 }
2640 used_bytes = fpi->m_segment_size - 1;
2641 }
2642
2643 // Now, need to decode used_bytes of data and append them to the value.
2644 if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2645 if (used_bytes & 1) {
2646 /*
2647 UTF-8 characters are encoded into two-byte entities. There is no way
2648 we can have an odd number of bytes after encoding.
2649 */
2650 return UNPACK_FAILURE;
2651 }
2652
2653 const uchar *src = ptr;
2654 const uchar *const src_end = ptr + used_bytes;
2655 while (src < src_end) {
2656 my_wc_t wc = (src[0] << 8) | src[1];
2657 src += 2;
2658 const CHARSET_INFO *cset = fpi->m_varchar_charset;
2659 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2660 DBUG_ASSERT(res <= 3);
2661 if (res <= 0) return UNPACK_FAILURE;
2662 dst += res;
2663 len += res;
2664 }
2665 } else {
2666 if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
2667 memcpy(dst, ptr, used_bytes);
2668 dst += used_bytes;
2669 len += used_bytes;
2670 }
2671
2672 if (finished) {
2673 if (extra_spaces) {
2674 // Both binary and UTF-8 charset store space as ' ',
2675 // so the following is ok:
2676 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2677 memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
2678 len += extra_spaces;
2679 }
2680 break;
2681 }
2682 }
2683
2684 if (!finished) return UNPACK_FAILURE;
2685
2686 /* Save the length */
2687 if (field_var->length_bytes == 1) {
2688 d0[0] = (uchar)len;
2689 } else {
2690 DBUG_ASSERT(field_var->length_bytes == 2);
2691 int2store(d0, len);
2692 }
2693 return UNPACK_SUCCESS;
2694 }
2695
2696 /////////////////////////////////////////////////////////////////////////
2697
2698 /*
2699 Function of type rdb_make_unpack_info_t
2700 */
2701
make_unpack_unknown(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2702 void Rdb_key_def::make_unpack_unknown(
2703 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2704 const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2705 pack_ctx->writer->write(field->ptr, field->pack_length());
2706 }
2707
2708 /*
2709 This point of this function is only to indicate that unpack_info is
2710 available.
2711
2712 The actual unpack_info data is produced by the function that packs the key,
2713 that is, pack_with_varchar_space_pad.
2714 */
2715
dummy_make_unpack_info(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * field MY_ATTRIBUTE ((__unused__)),Rdb_pack_field_context * pack_ctx MY_ATTRIBUTE ((__unused__)))2716 void Rdb_key_def::dummy_make_unpack_info(
2717 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2718 const Field *field MY_ATTRIBUTE((__unused__)),
2719 Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
2720 // Do nothing
2721 }
2722
2723 /*
2724 Function of type rdb_index_field_unpack_t
2725 */
2726
unpack_unknown(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2727 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi,
2728 Field *const field, uchar *const dst,
2729 Rdb_string_reader *const reader,
2730 Rdb_string_reader *const unp_reader) {
2731 const uchar *ptr;
2732 const uint len = fpi->m_unpack_data_len;
2733 // We don't use anything from the key, so skip over it.
2734 if (skip_max_length(fpi, field, reader)) {
2735 return UNPACK_FAILURE;
2736 }
2737
2738 DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
2739
2740 if ((ptr = (const uchar *)unp_reader->read(len))) {
2741 memcpy(dst, ptr, len);
2742 return UNPACK_SUCCESS;
2743 }
2744 return UNPACK_FAILURE;
2745 }
2746
2747 /*
2748 Function of type rdb_make_unpack_info_t
2749 */
2750
make_unpack_unknown_varchar(const Rdb_collation_codec * const codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2751 void Rdb_key_def::make_unpack_unknown_varchar(
2752 const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
2753 const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2754 const auto f = static_cast<const Field_varstring *>(field);
2755 uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2756 len += f->length_bytes;
2757 pack_ctx->writer->write(field->ptr, len);
2758 }
2759
2760 /*
2761 Function of type rdb_index_field_unpack_t
2762
2763 @detail
2764 Unpack a key part in an "unknown" collation from its
2765 (mem_comparable_form, unpack_info) form.
2766
2767 "Unknown" means we have no clue about how mem_comparable_form is made from
2768 the original string, so we keep the whole original string in the unpack_info.
2769
2770 @seealso
2771 make_unpack_unknown, unpack_unknown
2772 */
2773
unpack_unknown_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2774 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
2775 Field *const field, uchar *dst,
2776 Rdb_string_reader *const reader,
2777 Rdb_string_reader *const unp_reader) {
2778 const uchar *ptr;
2779 uchar *const d0 = dst;
2780 const auto f = static_cast<Field_varstring *>(field);
2781 dst += f->length_bytes;
2782 const uint len_bytes = f->length_bytes;
2783 // We don't use anything from the key, so skip over it.
2784 if ((fpi->m_skip_func)(fpi, field, reader)) {
2785 return UNPACK_FAILURE;
2786 }
2787
2788 DBUG_ASSERT(len_bytes > 0);
2789 DBUG_ASSERT(unp_reader != nullptr);
2790
2791 if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
2792 memcpy(d0, ptr, len_bytes);
2793 const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
2794 if ((ptr = (const uchar *)unp_reader->read(len))) {
2795 memcpy(dst, ptr, len);
2796 return UNPACK_SUCCESS;
2797 }
2798 }
2799 return UNPACK_FAILURE;
2800 }
2801
2802 /*
2803 Write unpack_data for a "simple" collation
2804 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)2805 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
2806 const Rdb_collation_codec *const codec,
2807 const uchar *const src,
2808 const size_t src_len) {
2809 for (uint i = 0; i < src_len; i++) {
2810 writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
2811 }
2812 }
2813
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len,uchar * const dst)2814 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
2815 const Rdb_collation_codec *const codec,
2816 const uchar *const src, const size_t src_len,
2817 uchar *const dst) {
2818 for (uint i = 0; i < src_len; i++) {
2819 if (codec->m_dec_size[src[i]] > 0) {
2820 uint *ret;
2821 DBUG_ASSERT(reader != nullptr);
2822
2823 if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
2824 return UNPACK_FAILURE;
2825 }
2826 dst[i] = codec->m_dec_idx[*ret][src[i]];
2827 } else {
2828 dst[i] = codec->m_dec_idx[0][src[i]];
2829 }
2830 }
2831
2832 return UNPACK_SUCCESS;
2833 }
2834
2835 /*
2836 Function of type rdb_make_unpack_info_t
2837
2838 @detail
2839 Make unpack_data for VARCHAR(n) in a "simple" charset.
2840 */
2841
make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2842 void Rdb_key_def::make_unpack_simple_varchar(
2843 const Rdb_collation_codec *const codec, const Field *const field,
2844 Rdb_pack_field_context *const pack_ctx) {
2845 const auto f = static_cast<const Field_varstring *>(field);
2846 uchar *const src = f->ptr + f->length_bytes;
2847 const size_t src_len =
2848 f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2849 Rdb_bit_writer bit_writer(pack_ctx->writer);
2850 // The std::min compares characters with bytes, but for simple collations,
2851 // mbmaxlen = 1.
2852 rdb_write_unpack_simple(&bit_writer, codec, src,
2853 std::min((size_t)f->char_length(), src_len));
2854 }
2855
2856 /*
2857 Function of type rdb_index_field_unpack_t
2858
2859 @seealso
2860 pack_with_varchar_space_pad - packing function
2861 unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
2862 */
2863
unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2864 int Rdb_key_def::unpack_simple_varchar_space_pad(
2865 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2866 Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2867 const uchar *ptr;
2868 size_t len = 0;
2869 bool finished = false;
2870 uchar *d0 = dst;
2871 const Field_varstring *const field_var =
2872 static_cast<Field_varstring *>(field);
2873 // For simple collations, char_length is also number of bytes.
2874 DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
2875 uchar *dst_end = dst + field_var->pack_length();
2876 dst += field_var->length_bytes;
2877 Rdb_bit_reader bit_reader(unp_reader);
2878
2879 uint space_padding_bytes = 0;
2880 uint extra_spaces;
2881 DBUG_ASSERT(unp_reader != nullptr);
2882
2883 if ((fpi->m_unpack_info_uses_two_bytes
2884 ? unp_reader->read_uint16(&extra_spaces)
2885 : unp_reader->read_uint8(&extra_spaces))) {
2886 return UNPACK_FAILURE;
2887 }
2888
2889 if (extra_spaces <= 8) {
2890 space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2891 extra_spaces = 0;
2892 } else {
2893 extra_spaces -= 8;
2894 }
2895
2896 space_padding_bytes *= fpi->space_xfrm_len;
2897
2898 /* Decode the length-emitted encoding here */
2899 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2900 const char last_byte =
2901 ptr[fpi->m_segment_size - 1]; // number of padding bytes
2902 size_t used_bytes;
2903 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2904 // this is the last one
2905 if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2906 return UNPACK_FAILURE; // Cannot happen, corrupted data
2907 }
2908 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2909 finished = true;
2910 } else {
2911 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2912 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2913 return UNPACK_FAILURE;
2914 }
2915 used_bytes = fpi->m_segment_size - 1;
2916 }
2917
2918 if (dst + used_bytes > dst_end) {
2919 // The value on disk is longer than the field definition allows?
2920 return UNPACK_FAILURE;
2921 }
2922
2923 uint ret;
2924 if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2925 used_bytes, dst)) != UNPACK_SUCCESS) {
2926 return ret;
2927 }
2928
2929 dst += used_bytes;
2930 len += used_bytes;
2931
2932 if (finished) {
2933 if (extra_spaces) {
2934 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2935 // pad_char has a 1-byte form in all charsets that
2936 // are handled by rdb_init_collation_mapping.
2937 memset(dst, field_var->charset()->pad_char, extra_spaces);
2938 len += extra_spaces;
2939 }
2940 break;
2941 }
2942 }
2943
2944 if (!finished) return UNPACK_FAILURE;
2945
2946 /* Save the length */
2947 if (field_var->length_bytes == 1) {
2948 d0[0] = (uchar)len;
2949 } else {
2950 DBUG_ASSERT(field_var->length_bytes == 2);
2951 int2store(d0, len);
2952 }
2953 return UNPACK_SUCCESS;
2954 }
2955
2956 /*
2957 Function of type rdb_make_unpack_info_t
2958
2959 @detail
2960 Make unpack_data for CHAR(n) value in a "simple" charset.
2961 It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2962
2963 @seealso
2964 The VARCHAR variant is in make_unpack_simple_varchar
2965 */
2966
make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2967 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
2968 const Field *const field,
2969 Rdb_pack_field_context *const pack_ctx) {
2970 const uchar *const src = field->ptr;
2971 Rdb_bit_writer bit_writer(pack_ctx->writer);
2972 rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2973 }
2974
2975 /*
2976 Function of type rdb_index_field_unpack_t
2977 */
2978
unpack_simple(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2979 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi,
2980 Field *const field MY_ATTRIBUTE((__unused__)),
2981 uchar *const dst,
2982 Rdb_string_reader *const reader,
2983 Rdb_string_reader *const unp_reader) {
2984 const uchar *ptr;
2985 const uint len = fpi->m_max_image_len;
2986 Rdb_bit_reader bit_reader(unp_reader);
2987
2988 if (!(ptr = (const uchar *)reader->read(len))) {
2989 return UNPACK_FAILURE;
2990 }
2991
2992 return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2993 fpi->m_charset_codec, ptr, len, dst);
2994 }
2995
2996 // See Rdb_charset_space_info::spaces_xfrm
2997 const int RDB_SPACE_XFRM_SIZE = 32;
2998
2999 // A class holding information about how space character is represented in a
3000 // charset.
3001 class Rdb_charset_space_info {
3002 public:
3003 Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
3004 Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
3005 Rdb_charset_space_info() = default;
3006
3007 // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
3008 std::vector<uchar> spaces_xfrm;
3009
3010 // length(strxfrm(' '))
3011 size_t space_xfrm_len;
3012
3013 // length of the space character itself
3014 // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
3015 // (length=2)
3016 size_t space_mb_len;
3017 };
3018
3019 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
3020 rdb_mem_comparable_space;
3021
3022 /*
3023 @brief
3024 For a given charset, get
3025 - strxfrm(' '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
3026 - length of strxfrm(charset, ' ')
3027 - length of the space character in the charset
3028
3029 @param cs IN Charset to get the space for
3030 @param ptr OUT A few space characters
3031 @param len OUT Return length of the space (in bytes)
3032
3033 @detail
3034 It is tempting to pre-generate mem-comparable form of space character for
3035 every charset on server startup.
3036 One can't do that: some charsets are not initialized until somebody
3037 attempts to use them (e.g. create or open a table that has a field that
3038 uses the charset).
3039 */
3040
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)3041 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
3042 const std::vector<uchar> **xfrm,
3043 size_t *const xfrm_len,
3044 size_t *const mb_len) {
3045 DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
3046 if (!rdb_mem_comparable_space[cs->number].get()) {
3047 RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
3048 if (!rdb_mem_comparable_space[cs->number].get()) {
3049 // Upper bound of how many bytes can be occupied by multi-byte form of a
3050 // character in any charset.
3051 const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
3052 DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
3053
3054 // multi-byte form of the ' ' (space) character
3055 uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
3056
3057 const size_t space_mb_len = cs->cset->wc_mb(
3058 cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
3059
3060 // mem-comparable image of the space character
3061 std::array<uchar, 20> space;
3062
3063 const size_t space_len = cs->coll->strnxfrm(
3064 cs, space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
3065 Rdb_charset_space_info *const info = new Rdb_charset_space_info;
3066 info->space_xfrm_len = space_len;
3067 info->space_mb_len = space_mb_len;
3068 while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
3069 info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
3070 space.data() + space_len);
3071 }
3072 rdb_mem_comparable_space[cs->number].reset(info);
3073 }
3074 RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
3075 }
3076
3077 *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
3078 *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
3079 *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
3080 }
3081
3082 mysql_mutex_t rdb_mem_cmp_space_mutex;
3083
3084 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
3085 rdb_collation_data;
3086 mysql_mutex_t rdb_collation_data_mutex;
3087
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)3088 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
3089 return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 &&
3090 !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD));
3091 }
3092
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)3093 static const Rdb_collation_codec *rdb_init_collation_mapping(
3094 const my_core::CHARSET_INFO *const cs) {
3095 DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
3096 const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
3097
3098 if (codec == nullptr && rdb_is_collation_supported(cs)) {
3099 RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
3100
3101 codec = rdb_collation_data[cs->number];
3102 if (codec == nullptr) {
3103 Rdb_collation_codec *cur = nullptr;
3104
3105 // Compute reverse mapping for simple collations.
3106 if (rdb_is_collation_supported(cs)) {
3107 cur = new Rdb_collation_codec;
3108 std::map<uchar, std::vector<uchar>> rev_map;
3109 size_t max_conflict_size = 0;
3110 for (int src = 0; src < 256; src++) {
3111 uchar dst = cs->sort_order[src];
3112 rev_map[dst].push_back(src);
3113 max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
3114 }
3115 cur->m_dec_idx.resize(max_conflict_size);
3116
3117 for (auto const &p : rev_map) {
3118 uchar dst = p.first;
3119 for (uint idx = 0; idx < p.second.size(); idx++) {
3120 uchar src = p.second[idx];
3121 uchar bits =
3122 my_bit_log2(my_round_up_to_next_power(p.second.size()));
3123 cur->m_enc_idx[src] = idx;
3124 cur->m_enc_size[src] = bits;
3125 cur->m_dec_size[dst] = bits;
3126 cur->m_dec_idx[idx][dst] = src;
3127 }
3128 }
3129
3130 cur->m_make_unpack_info_func = {Rdb_key_def::make_unpack_simple_varchar,
3131 Rdb_key_def::make_unpack_simple};
3132 cur->m_unpack_func = {Rdb_key_def::unpack_simple_varchar_space_pad,
3133 Rdb_key_def::unpack_simple};
3134 } else {
3135 // Out of luck for now.
3136 }
3137
3138 if (cur != nullptr) {
3139 codec = cur;
3140 cur->m_cs = cs;
3141 rdb_collation_data[cs->number] = cur;
3142 }
3143 }
3144
3145 RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3146 }
3147
3148 return codec;
3149 }
3150
get_segment_size_from_collation(const CHARSET_INFO * const cs)3151 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3152 int ret;
3153 if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN ||
3154 cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) {
3155 /*
3156 In these collations, a character produces one weight, which is 3 bytes.
3157 Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3158 get 3*3+1=10
3159 */
3160 ret = 10;
3161 } else {
3162 /*
3163 All other collations. There are two classes:
3164 - Unicode-based, except for collations mentioned in the if-condition.
3165 For these all weights are 2 bytes long, a character may produce 0..8
3166 weights.
3167 in any case, 8 bytes of payload in the segment guarantee that the last
3168 space character won't span across segments.
3169
3170 - Collations not based on unicode. These have length(strxfrm(' '))=1,
3171 there nothing to worry about.
3172
3173 In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3174 */
3175 ret = 9;
3176 }
3177 DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
3178 return ret;
3179 }
3180
3181 /*
3182 @brief
3183 Setup packing of index field into its mem-comparable form
3184
3185 @detail
3186 - It is possible produce mem-comparable form for any datatype.
3187 - Some datatypes also allow to unpack the original value from its
3188 mem-comparable form.
3189 = Some of these require extra information to be stored in "unpack_info".
3190 unpack_info is not a part of mem-comparable form, it is only used to
3191 restore the original value
3192
3193 @param
3194 field IN field to be packed/un-packed
3195
3196 @return
3197 TRUE - Field can be read with index-only reads
3198 FALSE - Otherwise
3199 */
3200
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint keynr_arg,const uint key_part_arg,const uint16 key_length)3201 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3202 const Field *const field, const uint keynr_arg,
3203 const uint key_part_arg,
3204 const uint16 key_length) {
3205 int res = false;
3206 enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3207
3208 m_keynr = keynr_arg;
3209 m_key_part = key_part_arg;
3210
3211 m_maybe_null = field ? field->real_maybe_null() : false;
3212 m_unpack_func = nullptr;
3213 m_make_unpack_info_func = nullptr;
3214 m_unpack_data_len = 0;
3215 space_xfrm = nullptr; // safety
3216 // whether to use legacy format for varchar
3217 m_use_legacy_varbinary_format = false;
3218 // ha_rocksdb::index_flags() will pass key_descr == null to
3219 // see whether field(column) can be read-only reads through return value,
3220 // but the legacy vs. new varchar format doesn't affect return value.
3221 // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
3222 if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3223 m_use_legacy_varbinary_format = true;
3224 }
3225 /* Calculate image length. By default, is is pack_length() */
3226 m_max_image_len =
3227 field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3228 m_skip_func = Rdb_key_def::skip_max_length;
3229 m_pack_func = Rdb_key_def::pack_with_make_sort_key;
3230
3231 m_covered = false;
3232
3233 switch (type) {
3234 case MYSQL_TYPE_LONGLONG:
3235 case MYSQL_TYPE_LONG:
3236 case MYSQL_TYPE_INT24:
3237 case MYSQL_TYPE_SHORT:
3238 case MYSQL_TYPE_TINY:
3239 m_unpack_func = Rdb_key_def::unpack_integer;
3240 m_covered = true;
3241 return true;
3242
3243 case MYSQL_TYPE_DOUBLE:
3244 m_unpack_func = Rdb_key_def::unpack_double;
3245 m_covered = true;
3246 return true;
3247
3248 case MYSQL_TYPE_FLOAT:
3249 m_unpack_func = Rdb_key_def::unpack_float;
3250 m_covered = true;
3251 return true;
3252
3253 case MYSQL_TYPE_NEWDECIMAL:
3254 /*
3255 Decimal is packed with Field_new_decimal::make_sort_key, which just
3256 does memcpy.
3257 Unpacking decimal values was supported only after fix for issue#253,
3258 because of that ha_rocksdb::get_storage_type() handles decimal values
3259 in a special way.
3260 */
3261 case MYSQL_TYPE_DATETIME2:
3262 case MYSQL_TYPE_TIMESTAMP2:
3263 /* These are packed with Field_temporal_with_date_and_timef::make_sort_key
3264 */
3265 case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
3266 case MYSQL_TYPE_YEAR: /* YEAR is packed with Field_tiny::make_sort_key */
3267 /* Everything that comes here is packed with just a memcpy(). */
3268 m_unpack_func = Rdb_key_def::unpack_binary_str;
3269 m_covered = true;
3270 return true;
3271
3272 case MYSQL_TYPE_NEWDATE:
3273 /*
3274 This is packed by Field_newdate::make_sort_key. It assumes the data is
3275 3 bytes, and packing is done by swapping the byte order (for both big-
3276 and little-endian)
3277 */
3278 m_unpack_func = Rdb_key_def::unpack_newdate;
3279 m_covered = true;
3280 return true;
3281 case MYSQL_TYPE_TINY_BLOB:
3282 case MYSQL_TYPE_MEDIUM_BLOB:
3283 case MYSQL_TYPE_LONG_BLOB:
3284 case MYSQL_TYPE_BLOB: {
3285 if (key_descr) {
3286 // The my_charset_bin collation is special in that it will consider
3287 // shorter strings sorting as less than longer strings.
3288 //
3289 // See Field_blob::make_sort_key for details.
3290 m_max_image_len =
3291 key_length + (field->charset()->number == COLLATION_BINARY
3292 ? reinterpret_cast<const Field_blob *>(field)
3293 ->pack_length_no_ptr()
3294 : 0);
3295 // Return false because indexes on text/blob will always require
3296 // a prefix. With a prefix, the optimizer will not be able to do an
3297 // index-only scan since there may be content occuring after the prefix
3298 // length.
3299 return false;
3300 }
3301 break;
3302 }
3303 default:
3304 break;
3305 }
3306
3307 m_unpack_info_stores_value = false;
3308 /* Handle [VAR](CHAR|BINARY) */
3309
3310 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3311 /*
3312 For CHAR-based columns, check how strxfrm image will take.
3313 field->field_length = field->char_length() * cs->mbmaxlen.
3314 */
3315 const CHARSET_INFO *cs = field->charset();
3316 m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
3317 }
3318 const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3319 const CHARSET_INFO *cs = field->charset();
3320 // max_image_len before chunking is taken into account
3321 const int max_image_len_before_chunks = m_max_image_len;
3322
3323 if (is_varchar) {
3324 // The default for varchar is variable-length, without space-padding for
3325 // comparisons
3326 m_varchar_charset = cs;
3327 m_skip_func = Rdb_key_def::skip_variable_length;
3328 m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3329 if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3330 m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3331 } else {
3332 // Calculate the maximum size of the short section plus the
3333 // maximum size of the long section
3334 m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3335 }
3336
3337 const auto field_var = static_cast<const Field_varstring *>(field);
3338 m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3339 }
3340
3341 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3342 // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3343 // information about character-based datatypes are compared.
3344 bool use_unknown_collation = false;
3345 DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3346 use_unknown_collation = true;);
3347
3348 if (cs->number == COLLATION_BINARY) {
3349 // - SQL layer pads BINARY(N) so that it always is N bytes long.
3350 // - For VARBINARY(N), values may have different lengths, so we're using
3351 // variable-length encoding. This is also the only charset where the
3352 // values are not space-padded for comparison.
3353 m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
3354 : Rdb_key_def::unpack_binary_str;
3355 res = true;
3356 } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) {
3357 // For _bin collations, mem-comparable form of the string is the string
3358 // itself.
3359
3360 if (is_varchar) {
3361 // VARCHARs - are compared as if they were space-padded - but are
3362 // not actually space-padded (reading the value back produces the
3363 // original value, without the padding)
3364 m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3365 m_skip_func = Rdb_key_def::skip_variable_space_pad;
3366 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3367 m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
3368 m_segment_size = get_segment_size_from_collation(cs);
3369 m_max_image_len =
3370 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3371 m_segment_size;
3372 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3373 &space_mb_len);
3374 } else {
3375 // SQL layer pads CHAR(N) values to their maximum length.
3376 // We just store that and restore it back.
3377 m_unpack_func = (cs->number == COLLATION_LATIN1_BIN)
3378 ? Rdb_key_def::unpack_binary_str
3379 : Rdb_key_def::unpack_utf8_str;
3380 }
3381 res = true;
3382 } else {
3383 // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3384
3385 res = true; // index-only scans are possible
3386 m_unpack_data_len = is_varchar ? 0 : field->field_length;
3387 const uint idx = is_varchar ? 0 : 1;
3388 const Rdb_collation_codec *codec = nullptr;
3389
3390 if (is_varchar) {
3391 // VARCHAR requires space-padding for doing comparisons
3392 //
3393 // The check for cs->levels_for_order is to catch
3394 // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3395 // that Variable-Length Space Padded Encoding can't handle.
3396 // It is not expected to work for any other multi-level collations,
3397 // either.
3398 // Currently we handle these collations as NO_PAD, even if they have
3399 // PAD_SPACE attribute.
3400 if (cs->levels_for_order == 1) {
3401 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3402 m_skip_func = Rdb_key_def::skip_variable_space_pad;
3403 m_segment_size = get_segment_size_from_collation(cs);
3404 m_max_image_len =
3405 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3406 m_segment_size;
3407 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3408 &space_mb_len);
3409 } else {
3410 // NO_LINT_DEBUG
3411 sql_print_warning(
3412 "RocksDB: you're trying to create an index "
3413 "with a multi-level collation %s",
3414 cs->name);
3415 // NO_LINT_DEBUG
3416 sql_print_warning(
3417 "MyRocks will handle this collation internally "
3418 " as if it had a NO_PAD attribute.");
3419 m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3420 m_skip_func = Rdb_key_def::skip_variable_length;
3421 }
3422 }
3423
3424 if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
3425 // The collation allows to store extra information in the unpack_info
3426 // which can be used to restore the original value from the
3427 // mem-comparable form.
3428 m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
3429 m_unpack_func = codec->m_unpack_func[idx];
3430 m_charset_codec = codec;
3431 } else if (use_unknown_collation) {
3432 // We have no clue about how this collation produces mem-comparable
3433 // form. Our way of restoring the original value is to keep a copy of
3434 // the original value in unpack_info.
3435 m_unpack_info_stores_value = true;
3436 m_make_unpack_info_func = is_varchar
3437 ? Rdb_key_def::make_unpack_unknown_varchar
3438 : Rdb_key_def::make_unpack_unknown;
3439 m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
3440 : Rdb_key_def::unpack_unknown;
3441 } else {
3442 // Same as above: we don't know how to restore the value from its
3443 // mem-comparable form.
3444 // Here, we just indicate to the SQL layer we can't do it.
3445 DBUG_ASSERT(m_unpack_func == nullptr);
3446 m_unpack_info_stores_value = false;
3447 res = false; // Indicate that index-only reads are not possible
3448 }
3449 }
3450
3451 // Make an adjustment: if this column is partially covered, tell the SQL
3452 // layer we can't do index-only scans. Later when we perform an index read,
3453 // we'll check on a record-by-record basis if we can do an index-only scan
3454 // or not.
3455 uint field_length;
3456 if (field->table) {
3457 field_length = field->table->field[field->field_index]->field_length;
3458 } else {
3459 field_length = field->field_length;
3460 }
3461
3462 if (field_length != key_length) {
3463 res = false;
3464 // If this index doesn't support covered bitmaps, then we won't know
3465 // during a read if the column is actually covered or not. If so, we need
3466 // to assume the column isn't covered and skip it during unpacking.
3467 //
3468 // If key_descr == NULL, then this is a dummy field and we probably don't
3469 // need to perform this step. However, to preserve the behavior before
3470 // this change, we'll only skip this step if we have an index which
3471 // supports covered bitmaps.
3472 if (!key_descr || !key_descr->use_covered_bitmap_format()) {
3473 m_unpack_func = nullptr;
3474 m_make_unpack_info_func = nullptr;
3475 m_unpack_info_stores_value = true;
3476 }
3477 }
3478 }
3479
3480 m_covered = res;
3481 return res;
3482 }
3483
get_field_in_table(const TABLE * const tbl) const3484 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
3485 return tbl->key_info[m_keynr].key_part[m_key_part].field;
3486 }
3487
fill_hidden_pk_val(uchar ** dst,const longlong hidden_pk_id) const3488 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
3489 const longlong hidden_pk_id) const {
3490 DBUG_ASSERT(m_max_image_len == 8);
3491
3492 String to;
3493 rdb_netstr_append_uint64(&to, hidden_pk_id);
3494 memcpy(*dst, to.ptr(), m_max_image_len);
3495
3496 *dst += m_max_image_len;
3497 }
3498
3499 ///////////////////////////////////////////////////////////////////////////////////////////
3500 // Rdb_ddl_manager
3501 ///////////////////////////////////////////////////////////////////////////////////////////
3502
~Rdb_tbl_def()3503 Rdb_tbl_def::~Rdb_tbl_def() {
3504 auto ddl_manager = rdb_get_ddl_manager();
3505 /* Don't free key definitions */
3506 if (m_key_descr_arr) {
3507 for (uint i = 0; i < m_key_count; i++) {
3508 if (ddl_manager && m_key_descr_arr[i]) {
3509 ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
3510 }
3511
3512 m_key_descr_arr[i] = nullptr;
3513 }
3514
3515 delete[] m_key_descr_arr;
3516 m_key_descr_arr = nullptr;
3517 }
3518 }
3519
3520 /*
3521 Put table definition DDL entry. Actual write is done at
3522 Rdb_dict_manager::commit.
3523
3524 We write
3525 dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
3526
3527 Where key entries are a tuple of
3528 ( cf_id, index_nr )
3529 */
3530
put_dict(Rdb_dict_manager * const dict,rocksdb::WriteBatch * const batch,const rocksdb::Slice & key)3531 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
3532 rocksdb::WriteBatch *const batch,
3533 const rocksdb::Slice &key) {
3534 StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
3535 indexes.alloc(Rdb_key_def::VERSION_SIZE +
3536 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
3537 rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
3538
3539 for (uint i = 0; i < m_key_count; i++) {
3540 const Rdb_key_def &kd = *m_key_descr_arr[i];
3541
3542 uchar flags =
3543 (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
3544 (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0);
3545
3546 const uint cf_id = kd.get_cf()->GetID();
3547 /*
3548 If cf_id already exists, cf_flags must be the same.
3549 To prevent race condition, reading/modifying/committing CF flags
3550 need to be protected by mutex (dict_manager->lock()).
3551 When RocksDB supports transaction with pessimistic concurrency
3552 control, we can switch to use it and removing mutex.
3553 */
3554 uint existing_cf_flags;
3555 const std::string cf_name = kd.get_cf()->GetName();
3556
3557 if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
3558 // For the purposes of comparison we'll clear the partitioning bit. The
3559 // intent here is to make sure that both partitioned and non-partitioned
3560 // tables can refer to the same CF.
3561 existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3562 flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3563
3564 if (existing_cf_flags != flags) {
3565 my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags,
3566 existing_cf_flags);
3567 return true;
3568 }
3569 } else {
3570 dict->add_cf_flags(batch, cf_id, flags);
3571 }
3572
3573 rdb_netstr_append_uint32(&indexes, cf_id);
3574
3575 uint32 index_number = kd.get_index_number();
3576 rdb_netstr_append_uint32(&indexes, index_number);
3577
3578 struct Rdb_index_info index_info;
3579 index_info.m_gl_index_id = {cf_id, index_number};
3580 index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
3581 index_info.m_index_type = kd.m_index_type;
3582 index_info.m_kv_version = kd.m_kv_format_version;
3583 index_info.m_index_flags = kd.m_index_flags_bitmap;
3584 index_info.m_ttl_duration = kd.m_ttl_duration;
3585
3586 dict->add_or_update_index_cf_mapping(batch, &index_info);
3587 }
3588
3589 const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
3590
3591 dict->put_key(batch, key, svalue);
3592 return false;
3593 }
3594
get_create_time()3595 time_t Rdb_tbl_def::get_create_time() {
3596 time_t create_time = m_create_time;
3597
3598 if (create_time == CREATE_TIME_UNKNOWN) {
3599 // Read it from the .frm file. It's not a problem if several threads do this
3600 // concurrently
3601 char path[FN_REFLEN];
3602 snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
3603 m_dbname.c_str(), m_tablename.c_str(), reg_ext);
3604 unpack_filename(path,path);
3605 MY_STAT f_stat;
3606 if (my_stat(path, &f_stat, MYF(0)))
3607 create_time = f_stat.st_ctime;
3608 else
3609 create_time = 0; // will be shown as SQL NULL
3610 m_create_time = create_time;
3611 }
3612 return create_time;
3613 }
3614
3615 // Length that each index flag takes inside the record.
3616 // Each index in the array maps to the enum INDEX_FLAG
3617 static const std::array<uint, 1> index_flag_lengths = {
3618 {ROCKSDB_SIZEOF_TTL_RECORD}};
3619
has_index_flag(uint32 index_flags,enum INDEX_FLAG flag)3620 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
3621 return flag & index_flags;
3622 }
3623
calculate_index_flag_offset(uint32 index_flags,enum INDEX_FLAG flag,uint * const length)3624 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
3625 enum INDEX_FLAG flag,
3626 uint *const length) {
3627 DBUG_ASSERT_IMP(flag != MAX_FLAG,
3628 Rdb_key_def::has_index_flag(index_flags, flag));
3629
3630 uint offset = 0;
3631 for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
3632 int mask = 1 << bit;
3633
3634 /* Exit once we've reached the proper flag */
3635 if (flag & mask) {
3636 if (length != nullptr) {
3637 *length = index_flag_lengths[bit];
3638 }
3639 break;
3640 }
3641
3642 if (index_flags & mask) {
3643 offset += index_flag_lengths[bit];
3644 }
3645 }
3646
3647 return offset;
3648 }
3649
write_index_flag_field(Rdb_string_writer * const buf,const uchar * const val,enum INDEX_FLAG flag) const3650 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
3651 const uchar *const val,
3652 enum INDEX_FLAG flag) const {
3653 uint len;
3654 uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
3655 DBUG_ASSERT(offset + len <= buf->get_current_pos());
3656 memcpy(buf->ptr() + offset, val, len);
3657 }
3658
check_if_is_mysql_system_table()3659 void Rdb_tbl_def::check_if_is_mysql_system_table() {
3660 static const char *const system_dbs[] = {
3661 "mysql",
3662 "performance_schema",
3663 "information_schema",
3664 };
3665
3666 m_is_mysql_system_table = false;
3667 for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
3668 if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
3669 m_is_mysql_system_table = true;
3670 break;
3671 }
3672 }
3673 }
3674
check_and_set_read_free_rpl_table()3675 void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
3676 m_is_read_free_rpl_table =
3677 #if 0 // MARIAROCKS_NOT_YET : read-free replication is not supported
3678 rdb_read_free_regex_handler.matches(base_tablename());
3679 #else
3680 false;
3681 #endif
3682 }
3683
set_name(const std::string & name)3684 void Rdb_tbl_def::set_name(const std::string &name) {
3685 int err MY_ATTRIBUTE((__unused__));
3686
3687 m_dbname_tablename = name;
3688 err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
3689 &m_partition);
3690 DBUG_ASSERT(err == 0);
3691
3692 check_if_is_mysql_system_table();
3693 }
3694
get_autoincr_gl_index_id()3695 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
3696 for (uint i = 0; i < m_key_count; i++) {
3697 auto &k = m_key_descr_arr[i];
3698 if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
3699 k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
3700 return k->get_gl_index_id();
3701 }
3702 }
3703
3704 // Every table must have a primary key, even if it's hidden.
3705 abort();
3706 return GL_INDEX_ID();
3707 }
3708
erase_index_num(const GL_INDEX_ID & gl_index_id)3709 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
3710 m_index_num_to_keydef.erase(gl_index_id);
3711 }
3712
add_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3713 void Rdb_ddl_manager::add_uncommitted_keydefs(
3714 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3715 mysql_rwlock_wrlock(&m_rwlock);
3716 for (const auto &index : indexes) {
3717 m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
3718 }
3719 mysql_rwlock_unlock(&m_rwlock);
3720 }
3721
remove_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3722 void Rdb_ddl_manager::remove_uncommitted_keydefs(
3723 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3724 mysql_rwlock_wrlock(&m_rwlock);
3725 for (const auto &index : indexes) {
3726 m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
3727 }
3728 mysql_rwlock_unlock(&m_rwlock);
3729 }
3730
3731 namespace // anonymous namespace = not visible outside this source file
3732 {
3733 struct Rdb_validate_tbls : public Rdb_tables_scanner {
3734 using tbl_info_t = std::pair<std::string, bool>;
3735 using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
3736
3737 tbl_list_t m_list;
3738
3739 int add_table(Rdb_tbl_def *tdef) override;
3740
3741 bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
3742
3743 bool scan_for_frms(const std::string &datadir, const std::string &dbname,
3744 bool *has_errors);
3745
3746 bool check_frm_file(const std::string &fullpath, const std::string &dbname,
3747 const std::string &tablename, bool *has_errors);
3748 };
3749 } // anonymous namespace
3750
3751 /*
3752 Get a list of tables that we expect to have .frm files for. This will use the
3753 information just read from the RocksDB data dictionary.
3754 */
add_table(Rdb_tbl_def * tdef)3755 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
3756 DBUG_ASSERT(tdef != nullptr);
3757
3758 /* Add the database/table into the list that are not temp table */
3759 if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) {
3760 bool is_partition = tdef->base_partition().size() != 0;
3761 m_list[tdef->base_dbname()].insert(
3762 tbl_info_t(tdef->base_tablename(), is_partition));
3763 }
3764
3765 return HA_EXIT_SUCCESS;
3766 }
3767
3768 /*
3769 Access the .frm file for this dbname/tablename and see if it is a RocksDB
3770 table (or partition table).
3771 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)3772 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
3773 const std::string &dbname,
3774 const std::string &tablename,
3775 bool *has_errors) {
3776 /* Check this .frm file to see what engine it uses */
3777 String fullfilename(fullpath.c_str(), &my_charset_bin);
3778 fullfilename.append(FN_DIRSEP);
3779 fullfilename.append(tablename.c_str());
3780 fullfilename.append(".frm");
3781
3782 /*
3783 This function will return the legacy_db_type of the table. Currently
3784 it does not reference the first parameter (THD* thd), but if it ever
3785 did in the future we would need to make a version that does it without
3786 the connection handle as we don't have one here.
3787 */
3788 char eng_type_buf[NAME_CHAR_LEN+1];
3789 LEX_CSTRING eng_type_str = {eng_type_buf, 0};
3790 bool is_sequence;
3791 enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type_str, &is_sequence);
3792 if (type == TABLE_TYPE_UNKNOWN) {
3793 // NO_LINT_DEBUG
3794 sql_print_warning("RocksDB: Failed to open/read .from file: %s",
3795 fullfilename.ptr());
3796 return false;
3797 }
3798
3799 if (type == TABLE_TYPE_NORMAL) {
3800 /* For a RocksDB table do we have a reference in the data dictionary? */
3801 if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) {
3802 /*
3803 Attempt to remove the table entry from the list of tables. If this
3804 fails then we know we had a .frm file that wasn't registered in RocksDB.
3805 */
3806 tbl_info_t element(tablename, false);
3807 if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
3808 // NO_LINT_DEBUG
3809 sql_print_warning(
3810 "RocksDB: Schema mismatch - "
3811 "A .frm file exists for table %s.%s, "
3812 "but that table is not registered in RocksDB",
3813 dbname.c_str(), tablename.c_str());
3814 *has_errors = true;
3815 }
3816 } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) {
3817 /*
3818 For partition tables, see if it is in the m_list as a partition,
3819 but don't generate an error if it isn't there - we don't know that the
3820 .frm is for RocksDB.
3821 */
3822 if (m_list.count(dbname) > 0) {
3823 m_list[dbname].erase(tbl_info_t(tablename, true));
3824 }
3825 }
3826 }
3827
3828 return true;
3829 }
3830
3831 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)3832 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
3833 const std::string &dbname,
3834 bool *has_errors) {
3835 bool result = true;
3836 std::string fullpath = datadir + dbname;
3837 struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
3838
3839 /* Access the directory */
3840 if (dir_info == nullptr) {
3841 // NO_LINT_DEBUG
3842 sql_print_warning("RocksDB: Could not open database directory: %s",
3843 fullpath.c_str());
3844 return false;
3845 }
3846
3847 /* Scan through the files in the directory */
3848 struct fileinfo *file_info = dir_info->dir_entry;
3849 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3850 /* Find .frm files that are not temp files (those that contain '#sql') */
3851 const char *ext = strrchr(file_info->name, '.');
3852 if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
3853 strcmp(ext, ".frm") == 0) {
3854 std::string tablename =
3855 std::string(file_info->name, ext - file_info->name);
3856
3857 /* Check to see if the .frm file is from RocksDB */
3858 if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
3859 result = false;
3860 break;
3861 }
3862 }
3863 }
3864
3865 /* Remove any databases who have no more tables listed */
3866 if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
3867 m_list.erase(dbname);
3868 }
3869
3870 /* Release the directory entry */
3871 my_dirend(dir_info);
3872
3873 return result;
3874 }
3875
3876 /*
3877 Scan the datadir for all databases (subdirectories) and get a list of .frm
3878 files they contain
3879 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)3880 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
3881 bool *has_errors) {
3882 bool result = true;
3883 struct st_my_dir *dir_info;
3884 struct fileinfo *file_info;
3885
3886 dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
3887 if (dir_info == nullptr) {
3888 // NO_LINT_DEBUG
3889 sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
3890 return false;
3891 }
3892
3893 file_info = dir_info->dir_entry;
3894 for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3895 /* Ignore files/dirs starting with '.' */
3896 if (file_info->name[0] == '.') continue;
3897
3898 /* Ignore all non-directory files */
3899 if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
3900
3901 /* Scan all the .frm files in the directory */
3902 if (!scan_for_frms(datadir, file_info->name, has_errors)) {
3903 result = false;
3904 break;
3905 }
3906 }
3907
3908 /* Release the directory info */
3909 my_dirend(dir_info);
3910
3911 return result;
3912 }
3913
3914 /*
3915 Validate that all auto increment values in the data dictionary are on a
3916 supported version.
3917 */
validate_auto_incr()3918 bool Rdb_ddl_manager::validate_auto_incr() {
3919 std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
3920
3921 uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
3922 rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
3923 const rocksdb::Slice auto_incr_entry_slice(
3924 reinterpret_cast<char *>(auto_incr_entry),
3925 Rdb_key_def::INDEX_NUMBER_SIZE);
3926 for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
3927 const rocksdb::Slice key = it->key();
3928 const rocksdb::Slice val = it->value();
3929 GL_INDEX_ID gl_index_id;
3930
3931 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
3932 memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
3933 break;
3934 }
3935
3936 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
3937 return false;
3938 }
3939
3940 if (val.size() <= Rdb_key_def::VERSION_SIZE) {
3941 return false;
3942 }
3943
3944 // Check if we have orphaned entries for whatever reason by cross
3945 // referencing ddl entries.
3946 auto ptr = reinterpret_cast<const uchar *>(key.data());
3947 ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
3948 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
3949 if (!m_dict->get_index_info(gl_index_id, nullptr)) {
3950 // NO_LINT_DEBUG
3951 sql_print_warning(
3952 "RocksDB: AUTOINC mismatch - "
3953 "Index number (%u, %u) found in AUTOINC "
3954 "but does not exist as a DDL entry",
3955 gl_index_id.cf_id, gl_index_id.index_id);
3956 return false;
3957 }
3958
3959 ptr = reinterpret_cast<const uchar *>(val.data());
3960 const int version = rdb_netbuf_read_uint16(&ptr);
3961 if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
3962 // NO_LINT_DEBUG
3963 sql_print_warning(
3964 "RocksDB: AUTOINC mismatch - "
3965 "Index number (%u, %u) found in AUTOINC "
3966 "is on unsupported version %d",
3967 gl_index_id.cf_id, gl_index_id.index_id, version);
3968 return false;
3969 }
3970 }
3971
3972 if (!it->status().ok()) {
3973 return false;
3974 }
3975
3976 return true;
3977 }
3978
3979 /*
3980 Validate that all the tables in the RocksDB database dictionary match the .frm
3981 files in the datadir
3982 */
validate_schemas(void)3983 bool Rdb_ddl_manager::validate_schemas(void) {
3984 bool has_errors = false;
3985 const std::string datadir = std::string(mysql_real_data_home);
3986 Rdb_validate_tbls table_list;
3987
3988 /* Get the list of tables from the database dictionary */
3989 if (scan_for_tables(&table_list) != 0) {
3990 return false;
3991 }
3992
3993 /* Compare that to the list of actual .frm files */
3994 if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
3995 return false;
3996 }
3997
3998 /*
3999 Any tables left in the tables list are ones that are registered in RocksDB
4000 but don't have .frm files.
4001 */
4002 for (const auto &db : table_list.m_list) {
4003 for (const auto &table : db.second) {
4004 // NO_LINT_DEBUG
4005 sql_print_warning(
4006 "RocksDB: Schema mismatch - "
4007 "Table %s.%s is registered in RocksDB "
4008 "but does not have a .frm file",
4009 db.first.c_str(), table.first.c_str());
4010 has_errors = true;
4011 }
4012 }
4013
4014 return !has_errors;
4015 }
4016
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t validate_tables)4017 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4018 Rdb_cf_manager *const cf_manager,
4019 const uint32_t validate_tables) {
4020 m_dict = dict_arg;
4021 mysql_rwlock_init(0, &m_rwlock);
4022
4023 /* Read the data dictionary and populate the hash */
4024 uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4025 rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4026 const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
4027 Rdb_key_def::INDEX_NUMBER_SIZE);
4028
4029 /* Reading data dictionary should always skip bloom filter */
4030 rocksdb::Iterator *it = m_dict->new_iterator();
4031 int i = 0;
4032
4033 uint max_index_id_in_dict = 0;
4034 m_dict->get_max_index_id(&max_index_id_in_dict);
4035
4036 for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
4037 const uchar *ptr;
4038 const uchar *ptr_end;
4039 const rocksdb::Slice key = it->key();
4040 const rocksdb::Slice val = it->value();
4041
4042 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4043 memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4044 break;
4045 }
4046
4047 if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
4048 // NO_LINT_DEBUG
4049 sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
4050 (int)key.size());
4051 return true;
4052 }
4053
4054 Rdb_tbl_def *const tdef =
4055 new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
4056
4057 // Now, read the DDLs.
4058 const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
4059 if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
4060 // NO_LINT_DEBUG
4061 sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
4062 tdef->full_tablename().c_str());
4063 return true;
4064 }
4065 tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
4066 tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
4067
4068 ptr = reinterpret_cast<const uchar *>(val.data());
4069 const int version = rdb_netbuf_read_uint16(&ptr);
4070 if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
4071 // NO_LINT_DEBUG
4072 sql_print_error(
4073 "RocksDB: DDL ENTRY Version was not expected."
4074 "Expected: %d, Actual: %d",
4075 Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
4076 return true;
4077 }
4078 ptr_end = ptr + real_val_size;
4079 for (uint keyno = 0; ptr < ptr_end; keyno++) {
4080 GL_INDEX_ID gl_index_id;
4081 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4082 uint flags = 0;
4083 struct Rdb_index_info index_info;
4084 if (!m_dict->get_index_info(gl_index_id, &index_info)) {
4085 // NO_LINT_DEBUG
4086 sql_print_error(
4087 "RocksDB: Could not get index information "
4088 "for Index Number (%u,%u), table %s",
4089 gl_index_id.cf_id, gl_index_id.index_id,
4090 tdef->full_tablename().c_str());
4091 return true;
4092 }
4093 if (max_index_id_in_dict < gl_index_id.index_id) {
4094 // NO_LINT_DEBUG
4095 sql_print_error(
4096 "RocksDB: Found max index id %u from data dictionary "
4097 "but also found larger index id %u from dictionary. "
4098 "This should never happen and possibly a bug.",
4099 max_index_id_in_dict, gl_index_id.index_id);
4100 return true;
4101 }
4102 if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
4103 // NO_LINT_DEBUG
4104 sql_print_error(
4105 "RocksDB: Could not get Column Family Flags "
4106 "for CF Number %d, table %s",
4107 gl_index_id.cf_id, tdef->full_tablename().c_str());
4108 return true;
4109 }
4110
4111 if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
4112 // The per-index cf option is deprecated. Make sure we don't have the
4113 // flag set in any existing database. NO_LINT_DEBUG
4114 // NO_LINT_DEBUG
4115 sql_print_error(
4116 "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
4117 "number %d, table %s",
4118 gl_index_id.cf_id, tdef->full_tablename().c_str());
4119 }
4120
4121 rocksdb::ColumnFamilyHandle *const cfh =
4122 cf_manager->get_cf(gl_index_id.cf_id);
4123 DBUG_ASSERT(cfh != nullptr);
4124
4125 uint32 ttl_rec_offset =
4126 Rdb_key_def::has_index_flag(index_info.m_index_flags,
4127 Rdb_key_def::TTL_FLAG)
4128 ? Rdb_key_def::calculate_index_flag_offset(
4129 index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
4130 : UINT_MAX;
4131
4132 /*
4133 We can't fully initialize Rdb_key_def object here, because full
4134 initialization requires that there is an open TABLE* where we could
4135 look at Field* objects and set max_length and other attributes
4136 */
4137 tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
4138 gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
4139 index_info.m_index_type, index_info.m_kv_version,
4140 flags & Rdb_key_def::REVERSE_CF_FLAG,
4141 flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
4142 m_dict->get_stats(gl_index_id), index_info.m_index_flags,
4143 ttl_rec_offset, index_info.m_ttl_duration);
4144 }
4145 put(tdef);
4146 i++;
4147 }
4148
4149 /*
4150 If validate_tables is greater than 0 run the validation. Only fail the
4151 initialzation if the setting is 1. If the setting is 2 we continue.
4152 */
4153 if (validate_tables > 0) {
4154 std::string msg;
4155 if (!validate_schemas()) {
4156 msg =
4157 "RocksDB: Problems validating data dictionary "
4158 "against .frm files, exiting";
4159 } else if (!validate_auto_incr()) {
4160 msg =
4161 "RocksDB: Problems validating auto increment values in "
4162 "data dictionary, exiting";
4163 }
4164 if (validate_tables == 1 && !msg.empty()) {
4165 // NO_LINT_DEBUG
4166 sql_print_error("%s", msg.c_str());
4167 return true;
4168 }
4169 }
4170
4171 // index ids used by applications should not conflict with
4172 // data dictionary index ids
4173 if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4174 max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4175 }
4176
4177 m_sequence.init(max_index_id_in_dict + 1);
4178
4179 if (!it->status().ok()) {
4180 rdb_log_status_error(it->status(), "Table_store load error");
4181 return true;
4182 }
4183 delete it;
4184 // NO_LINT_DEBUG
4185 sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4186 i);
4187 return false;
4188 }
4189
find(const std::string & table_name,const bool lock)4190 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4191 const bool lock) {
4192 if (lock) {
4193 mysql_rwlock_rdlock(&m_rwlock);
4194 }
4195
4196 Rdb_tbl_def *rec = nullptr;
4197 const auto it = m_ddl_map.find(table_name);
4198 if (it != m_ddl_map.end()) {
4199 rec = it->second;
4200 }
4201
4202 if (lock) {
4203 mysql_rwlock_unlock(&m_rwlock);
4204 }
4205
4206 return rec;
4207 }
4208
4209 // this is a safe version of the find() function below. It acquires a read
4210 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4211 // are finding it. Copying it into 'ret' increments the count making sure
4212 // that the object will not be discarded until we are finished with it.
safe_find(GL_INDEX_ID gl_index_id)4213 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
4214 GL_INDEX_ID gl_index_id) {
4215 std::shared_ptr<const Rdb_key_def> ret(nullptr);
4216
4217 mysql_rwlock_rdlock(&m_rwlock);
4218
4219 auto it = m_index_num_to_keydef.find(gl_index_id);
4220 if (it != m_index_num_to_keydef.end()) {
4221 const auto table_def = find(it->second.first, false);
4222 if (table_def && it->second.second < table_def->m_key_count) {
4223 const auto &kd = table_def->m_key_descr_arr[it->second.second];
4224 if (kd->max_storage_fmt_length() != 0) {
4225 ret = kd;
4226 }
4227 }
4228 } else {
4229 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4230 if (it != m_index_num_to_uncommitted_keydef.end()) {
4231 const auto &kd = it->second;
4232 if (kd->max_storage_fmt_length() != 0) {
4233 ret = kd;
4234 }
4235 }
4236 }
4237
4238 mysql_rwlock_unlock(&m_rwlock);
4239
4240 return ret;
4241 }
4242
4243 // this method assumes at least read-only lock on m_rwlock
find(GL_INDEX_ID gl_index_id)4244 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
4245 GL_INDEX_ID gl_index_id) {
4246 auto it = m_index_num_to_keydef.find(gl_index_id);
4247 if (it != m_index_num_to_keydef.end()) {
4248 auto table_def = find(it->second.first, false);
4249 if (table_def) {
4250 if (it->second.second < table_def->m_key_count) {
4251 return table_def->m_key_descr_arr[it->second.second];
4252 }
4253 }
4254 } else {
4255 auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4256 if (it != m_index_num_to_uncommitted_keydef.end()) {
4257 return it->second;
4258 }
4259 }
4260
4261 static std::shared_ptr<Rdb_key_def> empty = nullptr;
4262
4263 return empty;
4264 }
4265
4266 // this method returns the name of the table based on an index id. It acquires
4267 // a read lock on m_rwlock.
safe_get_table_name(const GL_INDEX_ID & gl_index_id)4268 const std::string Rdb_ddl_manager::safe_get_table_name(
4269 const GL_INDEX_ID &gl_index_id) {
4270 std::string ret;
4271 mysql_rwlock_rdlock(&m_rwlock);
4272 auto it = m_index_num_to_keydef.find(gl_index_id);
4273 if (it != m_index_num_to_keydef.end()) {
4274 ret = it->second.first;
4275 }
4276 mysql_rwlock_unlock(&m_rwlock);
4277 return ret;
4278 }
4279
set_stats(const std::unordered_map<GL_INDEX_ID,Rdb_index_stats> & stats)4280 void Rdb_ddl_manager::set_stats(
4281 const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4282 mysql_rwlock_wrlock(&m_rwlock);
4283 for (auto src : stats) {
4284 const auto &keydef = find(src.second.m_gl_index_id);
4285 if (keydef) {
4286 keydef->m_stats = src.second;
4287 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4288 }
4289 }
4290 mysql_rwlock_unlock(&m_rwlock);
4291 }
4292
adjust_stats(const std::vector<Rdb_index_stats> & new_data,const std::vector<Rdb_index_stats> & deleted_data)4293 void Rdb_ddl_manager::adjust_stats(
4294 const std::vector<Rdb_index_stats> &new_data,
4295 const std::vector<Rdb_index_stats> &deleted_data) {
4296 mysql_rwlock_wrlock(&m_rwlock);
4297 int i = 0;
4298 for (const auto &data : {new_data, deleted_data}) {
4299 for (const auto &src : data) {
4300 const auto &keydef = find(src.m_gl_index_id);
4301 if (keydef) {
4302 keydef->m_stats.m_distinct_keys_per_prefix.resize(
4303 keydef->get_key_parts());
4304 keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4305 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4306 }
4307 }
4308 i++;
4309 }
4310 const bool should_save_stats = !m_stats2store.empty();
4311 mysql_rwlock_unlock(&m_rwlock);
4312 if (should_save_stats) {
4313 // Queue an async persist_stats(false) call to the background thread.
4314 rdb_queue_save_stats_request();
4315 }
4316 }
4317
persist_stats(const bool sync)4318 void Rdb_ddl_manager::persist_stats(const bool sync) {
4319 mysql_rwlock_wrlock(&m_rwlock);
4320 const auto local_stats2store = std::move(m_stats2store);
4321 m_stats2store.clear();
4322 mysql_rwlock_unlock(&m_rwlock);
4323
4324 // Persist stats
4325 const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4326 std::vector<Rdb_index_stats> stats;
4327 std::transform(local_stats2store.begin(), local_stats2store.end(),
4328 std::back_inserter(stats),
4329 [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4330 return s.second;
4331 });
4332 m_dict->add_stats(wb.get(), stats);
4333 m_dict->commit(wb.get(), sync);
4334 }
4335
4336 /*
4337 Put table definition of `tbl` into the mapping, and also write it to the
4338 on-disk data dictionary.
4339 */
4340
put_and_write(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch)4341 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
4342 rocksdb::WriteBatch *const batch) {
4343 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
4344
4345 buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4346
4347 const std::string &dbname_tablename = tbl->full_tablename();
4348 buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4349
4350 int res;
4351 if ((res = tbl->put_dict(m_dict, batch, buf_writer.to_slice()))) {
4352 return res;
4353 }
4354 if ((res = put(tbl))) {
4355 return res;
4356 }
4357 return HA_EXIT_SUCCESS;
4358 }
4359
4360 /* Return 0 - ok, other value - error */
4361 /* TODO:
4362 This function modifies m_ddl_map and m_index_num_to_keydef.
4363 However, these changes need to be reversed if dict_manager.commit fails
4364 See the discussion here: https://reviews.facebook.net/D35925#inline-259167
4365 Tracked by https://github.com/facebook/mysql-5.6/issues/33
4366 */
put(Rdb_tbl_def * const tbl,const bool lock)4367 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
4368 Rdb_tbl_def *rec;
4369 const std::string &dbname_tablename = tbl->full_tablename();
4370
4371 if (lock) mysql_rwlock_wrlock(&m_rwlock);
4372
4373 // We have to do this find because 'tbl' is not yet in the list. We need
4374 // to find the one we are replacing ('rec')
4375 rec = find(dbname_tablename, false);
4376 if (rec) {
4377 // Free the old record.
4378 delete rec;
4379 m_ddl_map.erase(dbname_tablename);
4380 }
4381 m_ddl_map.emplace(dbname_tablename, tbl);
4382
4383 for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
4384 m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
4385 std::make_pair(dbname_tablename, keyno);
4386 }
4387 tbl->check_and_set_read_free_rpl_table();
4388
4389 if (lock) mysql_rwlock_unlock(&m_rwlock);
4390 return 0;
4391 }
4392
remove(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch,const bool lock)4393 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
4394 rocksdb::WriteBatch *const batch,
4395 const bool lock) {
4396 if (lock) mysql_rwlock_wrlock(&m_rwlock);
4397
4398 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
4399 key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4400 const std::string &dbname_tablename = tbl->full_tablename();
4401 key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4402
4403 m_dict->delete_key(batch, key_writer.to_slice());
4404
4405 const auto it = m_ddl_map.find(dbname_tablename);
4406 if (it != m_ddl_map.end()) {
4407 // Free Rdb_tbl_def
4408 delete it->second;
4409
4410 m_ddl_map.erase(it);
4411 }
4412
4413 if (lock) mysql_rwlock_unlock(&m_rwlock);
4414 }
4415
rename(const std::string & from,const std::string & to,rocksdb::WriteBatch * const batch)4416 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
4417 rocksdb::WriteBatch *const batch) {
4418 Rdb_tbl_def *rec;
4419 Rdb_tbl_def *new_rec;
4420 bool res = true;
4421 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
4422
4423 mysql_rwlock_wrlock(&m_rwlock);
4424 if (!(rec = find(from, false))) {
4425 mysql_rwlock_unlock(&m_rwlock);
4426 return true;
4427 }
4428
4429 new_rec = new Rdb_tbl_def(to);
4430
4431 new_rec->m_key_count = rec->m_key_count;
4432 new_rec->m_auto_incr_val =
4433 rec->m_auto_incr_val.load(std::memory_order_relaxed);
4434 new_rec->m_key_descr_arr = rec->m_key_descr_arr;
4435
4436 new_rec->m_hidden_pk_val =
4437 rec->m_hidden_pk_val.load(std::memory_order_relaxed);
4438
4439 // so that it's not free'd when deleting the old rec
4440 rec->m_key_descr_arr = nullptr;
4441
4442 // Create a new key
4443 new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4444
4445 const std::string &dbname_tablename = new_rec->full_tablename();
4446 new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4447
4448 // Create a key to add
4449 if (!new_rec->put_dict(m_dict, batch, new_buf_writer.to_slice())) {
4450 remove(rec, batch, false);
4451 put(new_rec, false);
4452 res = false; // ok
4453 }
4454
4455 mysql_rwlock_unlock(&m_rwlock);
4456 return res;
4457 }
4458
cleanup()4459 void Rdb_ddl_manager::cleanup() {
4460 for (const auto &kv : m_ddl_map) {
4461 delete kv.second;
4462 }
4463 m_ddl_map.clear();
4464
4465 mysql_rwlock_destroy(&m_rwlock);
4466 m_sequence.cleanup();
4467 }
4468
scan_for_tables(Rdb_tables_scanner * const tables_scanner)4469 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
4470 int ret;
4471 Rdb_tbl_def *rec;
4472
4473 DBUG_ASSERT(tables_scanner != nullptr);
4474
4475 mysql_rwlock_rdlock(&m_rwlock);
4476
4477 ret = 0;
4478
4479 for (const auto &kv : m_ddl_map) {
4480 rec = kv.second;
4481 ret = tables_scanner->add_table(rec);
4482 if (ret) break;
4483 }
4484
4485 mysql_rwlock_unlock(&m_rwlock);
4486 return ret;
4487 }
4488
4489 /*
4490 Rdb_binlog_manager class implementation
4491 */
4492
init(Rdb_dict_manager * const dict_arg)4493 bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) {
4494 DBUG_ASSERT(dict_arg != nullptr);
4495 m_dict = dict_arg;
4496
4497 m_key_writer.reset();
4498 m_key_writer.write_index(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER);
4499 m_key_slice = m_key_writer.to_slice();
4500 return false;
4501 }
4502
cleanup()4503 void Rdb_binlog_manager::cleanup() {}
4504
4505 /**
4506 Set binlog name, pos and optionally gtid into WriteBatch.
4507 This function should be called as part of transaction commit,
4508 since binlog info is set only at transaction commit.
4509 Actual write into RocksDB is not done here, so checking if
4510 write succeeded or not is not possible here.
4511 @param binlog_name Binlog name
4512 @param binlog_pos Binlog pos
4513 @param batch WriteBatch
4514 */
update(const char * const binlog_name,const my_off_t binlog_pos,rocksdb::WriteBatchBase * const batch)4515 void Rdb_binlog_manager::update(const char *const binlog_name,
4516 const my_off_t binlog_pos,
4517 rocksdb::WriteBatchBase *const batch) {
4518 if (binlog_name && binlog_pos) {
4519 // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024
4520 const size_t RDB_MAX_BINLOG_INFO_LEN = 1024;
4521 Rdb_buf_writer<RDB_MAX_BINLOG_INFO_LEN> value_writer;
4522
4523 // store version
4524 value_writer.write_uint16(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION);
4525
4526 // store binlog file name length
4527 DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN);
4528 const uint16_t binlog_name_len = strlen(binlog_name);
4529 value_writer.write_uint16(binlog_name_len);
4530
4531 // store binlog file name
4532 value_writer.write(binlog_name, binlog_name_len);
4533
4534 // store binlog pos
4535 value_writer.write_uint32(binlog_pos);
4536
4537 #ifdef MARIADB_MERGE_2019
4538 // store binlog gtid length.
4539 // If gtid was not set, store 0 instead
4540 const uint16_t binlog_max_gtid_len =
4541 binlog_max_gtid ? strlen(binlog_max_gtid) : 0;
4542 value_writer.write_uint16(binlog_max_gtid_len);
4543
4544 if (binlog_max_gtid_len > 0) {
4545 // store binlog gtid
4546 value_writer.write(binlog_max_gtid, binlog_max_gtid_len);
4547 }
4548 #endif
4549
4550 m_dict->put_key(batch, m_key_slice, value_writer.to_slice());
4551 }
4552 }
4553
4554 /**
4555 Read binlog committed entry stored in RocksDB, then unpack
4556 @param[OUT] binlog_name Binlog name
4557 @param[OUT] binlog_pos Binlog pos
4558 @param[OUT] binlog_gtid Binlog GTID
4559 @return
4560 true is binlog info was found (valid behavior)
4561 false otherwise
4562 */
read(char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4563 bool Rdb_binlog_manager::read(char *const binlog_name,
4564 my_off_t *const binlog_pos,
4565 char *const binlog_gtid) const {
4566 bool ret = false;
4567 if (binlog_name) {
4568 std::string value;
4569 rocksdb::Status status = m_dict->get_value(m_key_slice, &value);
4570 if (status.ok()) {
4571 if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos,
4572 binlog_gtid)) {
4573 ret = true;
4574 }
4575 }
4576 }
4577 return ret;
4578 }
4579
4580 /**
4581 Unpack value then split into binlog_name, binlog_pos (and binlog_gtid)
4582 @param[IN] value Binlog state info fetched from RocksDB
4583 @param[OUT] binlog_name Binlog name
4584 @param[OUT] binlog_pos Binlog pos
4585 @param[OUT] binlog_gtid Binlog GTID
4586 @return true on error
4587 */
unpack_value(const uchar * const value,size_t value_size_arg,char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4588 bool Rdb_binlog_manager::unpack_value(const uchar *const value,
4589 size_t value_size_arg,
4590 char *const binlog_name,
4591 my_off_t *const binlog_pos,
4592 char *const binlog_gtid) const {
4593 uint pack_len = 0;
4594 intmax_t value_size= value_size_arg;
4595
4596 DBUG_ASSERT(binlog_pos != nullptr);
4597
4598 if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0)
4599 return true;
4600 // read version
4601 const uint16_t version = rdb_netbuf_to_uint16(value);
4602
4603 pack_len += Rdb_key_def::VERSION_SIZE;
4604 if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION) return true;
4605
4606 if ((value_size -= sizeof(uint16)) < 0)
4607 return true;
4608
4609 // read binlog file name length
4610 const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len);
4611 pack_len += sizeof(uint16);
4612
4613 if (binlog_name_len >= (FN_REFLEN+1))
4614 return true;
4615
4616 if ((value_size -= binlog_name_len) < 0)
4617 return true;
4618
4619 if (binlog_name_len) {
4620 // read and set binlog name
4621 memcpy(binlog_name, value + pack_len, binlog_name_len);
4622 binlog_name[binlog_name_len] = '\0';
4623 pack_len += binlog_name_len;
4624
4625 if ((value_size -= sizeof(uint32)) < 0)
4626 return true;
4627 // read and set binlog pos
4628 *binlog_pos = rdb_netbuf_to_uint32(value + pack_len);
4629 pack_len += sizeof(uint32);
4630
4631 if ((value_size -= sizeof(uint16)) < 0)
4632 return true;
4633 // read gtid length
4634 const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len);
4635 pack_len += sizeof(uint16);
4636
4637 if (binlog_gtid_len >= GTID_BUF_LEN)
4638 return true;
4639 if ((value_size -= binlog_gtid_len) < 0)
4640 return true;
4641
4642 if (binlog_gtid && binlog_gtid_len > 0) {
4643 // read and set gtid
4644 memcpy(binlog_gtid, value + pack_len, binlog_gtid_len);
4645 binlog_gtid[binlog_gtid_len] = '\0';
4646 pack_len += binlog_gtid_len;
4647 }
4648 }
4649 return false;
4650 }
4651
4652 /**
4653 Inserts a row into mysql.slave_gtid_info table. Doing this inside
4654 storage engine is more efficient than inserting/updating through MySQL.
4655
4656 @param[IN] id Primary key of the table.
4657 @param[IN] db Database name. This is column 2 of the table.
4658 @param[IN] gtid Gtid in human readable form. This is column 3 of the table.
4659 @param[IN] write_batch Handle to storage engine writer.
4660 */
update_slave_gtid_info(const uint id,const char * const db,const char * const gtid,rocksdb::WriteBatchBase * const write_batch)4661 void Rdb_binlog_manager::update_slave_gtid_info(
4662 const uint id, const char *const db, const char *const gtid,
4663 rocksdb::WriteBatchBase *const write_batch) {
4664 if (id && db && gtid) {
4665 // Make sure that if the slave_gtid_info table exists we have a
4666 // pointer to it via m_slave_gtid_info_tbl.
4667 if (!m_slave_gtid_info_tbl.load()) {
4668 m_slave_gtid_info_tbl.store(
4669 rdb_get_ddl_manager()->find("mysql.slave_gtid_info"));
4670 }
4671 if (!m_slave_gtid_info_tbl.load()) {
4672 // slave_gtid_info table is not present. Simply return.
4673 return;
4674 }
4675 DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1);
4676
4677 const std::shared_ptr<const Rdb_key_def> &kd =
4678 m_slave_gtid_info_tbl.load()->m_key_descr_arr[0];
4679 String value;
4680
4681 // Build key
4682 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE + 4> key_writer;
4683 key_writer.write_index(kd->get_index_number());
4684 key_writer.write_uint32(id);
4685
4686 // Build value
4687 Rdb_buf_writer<128> value_writer;
4688 DBUG_ASSERT(gtid);
4689 const uint db_len = strlen(db);
4690 const uint gtid_len = strlen(gtid);
4691 // 1 byte used for flags. Empty here.
4692 value_writer.write_byte(0);
4693
4694 // Write column 1.
4695 DBUG_ASSERT(strlen(db) <= 64);
4696 value_writer.write_byte(db_len);
4697 value_writer.write(db, db_len);
4698
4699 // Write column 2.
4700 DBUG_ASSERT(gtid_len <= 56);
4701 value_writer.write_byte(gtid_len);
4702 value_writer.write(gtid, gtid_len);
4703
4704 write_batch->Put(kd->get_cf(), key_writer.to_slice(),
4705 value_writer.to_slice());
4706 }
4707 }
4708
init(rocksdb::TransactionDB * const rdb_dict,Rdb_cf_manager * const cf_manager)4709 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
4710 Rdb_cf_manager *const cf_manager) {
4711 DBUG_ASSERT(rdb_dict != nullptr);
4712 DBUG_ASSERT(cf_manager != nullptr);
4713
4714 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
4715
4716 m_db = rdb_dict;
4717
4718 m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME);
4719 rocksdb::ColumnFamilyHandle *default_cfh =
4720 cf_manager->get_cf(DEFAULT_CF_NAME);
4721
4722 // System CF and default CF should be initialized
4723 if (m_system_cfh == nullptr || default_cfh == nullptr) {
4724 return HA_EXIT_FAILURE;
4725 }
4726
4727 rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
4728
4729 m_key_slice_max_index_id =
4730 rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
4731 Rdb_key_def::INDEX_NUMBER_SIZE);
4732
4733 resume_drop_indexes();
4734 rollback_ongoing_index_creation();
4735
4736 // Initialize system CF and default CF flags
4737 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
4738 rocksdb::WriteBatch *const batch = wb.get();
4739
4740 add_cf_flags(batch, m_system_cfh->GetID(), 0);
4741 add_cf_flags(batch, default_cfh->GetID(), 0);
4742 commit(batch);
4743
4744 return HA_EXIT_SUCCESS;
4745 }
4746
begin() const4747 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
4748 return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
4749 }
4750
put_key(rocksdb::WriteBatchBase * const batch,const rocksdb::Slice & key,const rocksdb::Slice & value) const4751 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
4752 const rocksdb::Slice &key,
4753 const rocksdb::Slice &value) const {
4754 batch->Put(m_system_cfh, key, value);
4755 }
4756
get_value(const rocksdb::Slice & key,std::string * const value) const4757 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
4758 std::string *const value) const {
4759 rocksdb::ReadOptions options;
4760 options.total_order_seek = true;
4761 return m_db->Get(options, m_system_cfh, key, value);
4762 }
4763
delete_key(rocksdb::WriteBatchBase * batch,const rocksdb::Slice & key) const4764 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
4765 const rocksdb::Slice &key) const {
4766 batch->Delete(m_system_cfh, key);
4767 }
4768
new_iterator() const4769 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
4770 /* Reading data dictionary should always skip bloom filter */
4771 rocksdb::ReadOptions read_options;
4772 read_options.total_order_seek = true;
4773 return m_db->NewIterator(read_options, m_system_cfh);
4774 }
4775
commit(rocksdb::WriteBatch * const batch,const bool sync) const4776 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
4777 const bool sync) const {
4778 if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
4779 int res = HA_EXIT_SUCCESS;
4780 rocksdb::WriteOptions options;
4781 options.sync = sync;
4782 rocksdb::TransactionDBWriteOptimizations optimize;
4783 optimize.skip_concurrency_control = true;
4784 rocksdb::Status s = m_db->Write(options, optimize, batch);
4785 res = !s.ok(); // we return true when something failed
4786 if (res) {
4787 rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
4788 }
4789 batch->Clear();
4790 return res;
4791 }
4792
dump_index_id(uchar * const netbuf,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id)4793 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
4794 Rdb_key_def::DATA_DICT_TYPE dict_type,
4795 const GL_INDEX_ID &gl_index_id) {
4796 rdb_netbuf_store_uint32(netbuf, dict_type);
4797 rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
4798 gl_index_id.cf_id);
4799 rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
4800 gl_index_id.index_id);
4801 }
4802
delete_with_prefix(rocksdb::WriteBatch * const batch,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id) const4803 void Rdb_dict_manager::delete_with_prefix(
4804 rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
4805 const GL_INDEX_ID &gl_index_id) const {
4806 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4807 dump_index_id(&key_writer, dict_type, gl_index_id);
4808
4809 delete_key(batch, key_writer.to_slice());
4810 }
4811
add_or_update_index_cf_mapping(rocksdb::WriteBatch * batch,struct Rdb_index_info * const index_info) const4812 void Rdb_dict_manager::add_or_update_index_cf_mapping(
4813 rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
4814 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4815 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
4816 index_info->m_gl_index_id);
4817
4818 Rdb_buf_writer<256> value_writer;
4819
4820 value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
4821 value_writer.write_byte(index_info->m_index_type);
4822 value_writer.write_uint16(index_info->m_kv_version);
4823 value_writer.write_uint32(index_info->m_index_flags);
4824 value_writer.write_uint64(index_info->m_ttl_duration);
4825
4826 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4827 }
4828
add_cf_flags(rocksdb::WriteBatch * const batch,const uint32_t cf_id,const uint32_t cf_flags) const4829 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
4830 const uint32_t cf_id,
4831 const uint32_t cf_flags) const {
4832 DBUG_ASSERT(batch != nullptr);
4833
4834 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4835 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4836 key_writer.write_uint32(cf_id);
4837
4838 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
4839 value_writer;
4840 value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
4841 value_writer.write_uint32(cf_flags);
4842
4843 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4844 }
4845
delete_index_info(rocksdb::WriteBatch * batch,const GL_INDEX_ID & gl_index_id) const4846 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
4847 const GL_INDEX_ID &gl_index_id) const {
4848 delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
4849 delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
4850 delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
4851 }
4852
get_index_info(const GL_INDEX_ID & gl_index_id,struct Rdb_index_info * const index_info) const4853 bool Rdb_dict_manager::get_index_info(
4854 const GL_INDEX_ID &gl_index_id,
4855 struct Rdb_index_info *const index_info) const {
4856 if (index_info) {
4857 index_info->m_gl_index_id = gl_index_id;
4858 }
4859
4860 bool found = false;
4861 bool error = false;
4862 std::string value;
4863 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4864 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
4865
4866 const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
4867 if (status.ok()) {
4868 if (!index_info) {
4869 return true;
4870 }
4871
4872 const uchar *const val = (const uchar *)value.c_str();
4873 const uchar *ptr = val;
4874 index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
4875 ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
4876
4877 switch (index_info->m_index_dict_version) {
4878 case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
4879 /* Sanity check to prevent reading bogus TTL record. */
4880 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4881 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4882 RDB_SIZEOF_INDEX_FLAGS +
4883 ROCKSDB_SIZEOF_TTL_RECORD) {
4884 error = true;
4885 break;
4886 }
4887 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4888 ptr += RDB_SIZEOF_INDEX_TYPE;
4889 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4890 ptr += RDB_SIZEOF_KV_VERSION;
4891 index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
4892 ptr += RDB_SIZEOF_INDEX_FLAGS;
4893 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4894 found = true;
4895 break;
4896
4897 case Rdb_key_def::INDEX_INFO_VERSION_TTL:
4898 /* Sanity check to prevent reading bogus into TTL record. */
4899 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4900 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4901 ROCKSDB_SIZEOF_TTL_RECORD) {
4902 error = true;
4903 break;
4904 }
4905 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4906 ptr += RDB_SIZEOF_INDEX_TYPE;
4907 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4908 ptr += RDB_SIZEOF_KV_VERSION;
4909 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4910 if ((index_info->m_kv_version ==
4911 Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
4912 index_info->m_ttl_duration > 0) {
4913 index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
4914 }
4915 found = true;
4916 break;
4917
4918 case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
4919 case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
4920 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4921 ptr += RDB_SIZEOF_INDEX_TYPE;
4922 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4923 found = true;
4924 break;
4925
4926 default:
4927 error = true;
4928 break;
4929 }
4930
4931 switch (index_info->m_index_type) {
4932 case Rdb_key_def::INDEX_TYPE_PRIMARY:
4933 case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
4934 error = index_info->m_kv_version >
4935 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
4936 break;
4937 }
4938 case Rdb_key_def::INDEX_TYPE_SECONDARY:
4939 error = index_info->m_kv_version >
4940 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
4941 break;
4942 default:
4943 error = true;
4944 break;
4945 }
4946 }
4947
4948 if (error) {
4949 // NO_LINT_DEBUG
4950 sql_print_error(
4951 "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
4952 "from data dictionary. This should never happen "
4953 "and it may be a bug.",
4954 index_info->m_index_dict_version, index_info->m_index_type,
4955 index_info->m_kv_version, index_info->m_ttl_duration);
4956 abort();
4957 }
4958
4959 return found;
4960 }
4961
get_cf_flags(const uint32_t cf_id,uint32_t * const cf_flags) const4962 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
4963 uint32_t *const cf_flags) const {
4964 DBUG_ASSERT(cf_flags != nullptr);
4965
4966 bool found = false;
4967 std::string value;
4968 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4969
4970 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4971 key_writer.write_uint32(cf_id);
4972
4973 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
4974
4975 if (status.ok()) {
4976 const uchar *val = (const uchar *)value.c_str();
4977 DBUG_ASSERT(val);
4978
4979 const uint16_t version = rdb_netbuf_to_uint16(val);
4980
4981 if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
4982 *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
4983 found = true;
4984 }
4985 }
4986
4987 return found;
4988 }
4989
4990 /*
4991 Returning index ids that were marked as deleted (via DROP TABLE) but
4992 still not removed by drop_index_thread yet, or indexes that are marked as
4993 ongoing creation.
4994 */
get_ongoing_index_operation(std::unordered_set<GL_INDEX_ID> * gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const4995 void Rdb_dict_manager::get_ongoing_index_operation(
4996 std::unordered_set<GL_INDEX_ID> *gl_index_ids,
4997 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
4998 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
4999 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5000
5001 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
5002 index_writer.write_uint32(dd_type);
5003 const rocksdb::Slice index_slice = index_writer.to_slice();
5004
5005 rocksdb::Iterator *it = new_iterator();
5006 for (it->Seek(index_slice); it->Valid(); it->Next()) {
5007 rocksdb::Slice key = it->key();
5008 const uchar *const ptr = (const uchar *)key.data();
5009
5010 /*
5011 Ongoing drop/create index operations require key to be of the form:
5012 dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
5013
5014 This may need to be changed in the future if we want to process a new
5015 ddl_type with different format.
5016 */
5017 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
5018 rdb_netbuf_to_uint32(ptr) != dd_type) {
5019 break;
5020 }
5021
5022 // We don't check version right now since currently we always store only
5023 // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
5024 // If increasing version number, we need to add version check logic here.
5025 GL_INDEX_ID gl_index_id;
5026 gl_index_id.cf_id =
5027 rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5028 gl_index_id.index_id =
5029 rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
5030 gl_index_ids->insert(gl_index_id);
5031 }
5032 delete it;
5033 }
5034
5035 /*
5036 Returning true if index_id is create/delete ongoing (undergoing creation or
5037 marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
5038 or not.
5039 */
is_index_operation_ongoing(const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5040 bool Rdb_dict_manager::is_index_operation_ongoing(
5041 const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5042 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5043 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5044
5045 bool found = false;
5046 std::string value;
5047 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5048 dump_index_id(&key_writer, dd_type, gl_index_id);
5049
5050 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5051 if (status.ok()) {
5052 found = true;
5053 }
5054 return found;
5055 }
5056
5057 /*
5058 Adding index_id to data dictionary so that the index id is removed
5059 by drop_index_thread, or to track online index creation.
5060 */
start_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5061 void Rdb_dict_manager::start_ongoing_index_operation(
5062 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5063 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5064 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5065 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5066
5067 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5068 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
5069
5070 dump_index_id(&key_writer, dd_type, gl_index_id);
5071
5072 // version as needed
5073 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5074 value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
5075 } else {
5076 value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
5077 }
5078
5079 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5080 }
5081
5082 /*
5083 Removing index_id from data dictionary to confirm drop_index_thread
5084 completed dropping entire key/values of the index_id
5085 */
end_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5086 void Rdb_dict_manager::end_ongoing_index_operation(
5087 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5088 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5089 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5090 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5091
5092 delete_with_prefix(batch, dd_type, gl_index_id);
5093 }
5094
5095 /*
5096 Returning true if there is no target index ids to be removed
5097 by drop_index_thread
5098 */
is_drop_index_empty() const5099 bool Rdb_dict_manager::is_drop_index_empty() const {
5100 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5101 get_ongoing_drop_indexes(&gl_index_ids);
5102 return gl_index_ids.empty();
5103 }
5104
5105 /*
5106 This function is supposed to be called by DROP TABLE. Logging messages
5107 that dropping indexes started, and adding data dictionary so that
5108 all associated indexes to be removed
5109 */
add_drop_table(std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,rocksdb::WriteBatch * const batch) const5110 void Rdb_dict_manager::add_drop_table(
5111 std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5112 rocksdb::WriteBatch *const batch) const {
5113 std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5114 for (uint32 i = 0; i < n_keys; i++) {
5115 dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5116 }
5117
5118 add_drop_index(dropped_index_ids, batch);
5119 }
5120
5121 /*
5122 Called during inplace index drop operations. Logging messages
5123 that dropping indexes started, and adding data dictionary so that
5124 all associated indexes to be removed
5125 */
add_drop_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5126 void Rdb_dict_manager::add_drop_index(
5127 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5128 rocksdb::WriteBatch *const batch) const {
5129 for (const auto &gl_index_id : gl_index_ids) {
5130 log_start_drop_index(gl_index_id, "Begin");
5131 start_drop_index(batch, gl_index_id);
5132 }
5133 }
5134
5135 /*
5136 Called during inplace index creation operations. Logging messages
5137 that adding indexes started, and updates data dictionary with all associated
5138 indexes to be added.
5139 */
add_create_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5140 void Rdb_dict_manager::add_create_index(
5141 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5142 rocksdb::WriteBatch *const batch) const {
5143 for (const auto &gl_index_id : gl_index_ids) {
5144 // NO_LINT_DEBUG
5145 sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)",
5146 gl_index_id.cf_id, gl_index_id.index_id);
5147 start_create_index(batch, gl_index_id);
5148 }
5149 }
5150
5151 /*
5152 This function is supposed to be called by drop_index_thread, when it
5153 finished dropping any index, or at the completion of online index creation.
5154 */
finish_indexes_operation(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const5155 void Rdb_dict_manager::finish_indexes_operation(
5156 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5157 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5158 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5159 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5160
5161 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5162 rocksdb::WriteBatch *const batch = wb.get();
5163
5164 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5165 get_ongoing_create_indexes(&incomplete_create_indexes);
5166
5167 for (const auto &gl_index_id : gl_index_ids) {
5168 if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5169 end_ongoing_index_operation(batch, gl_index_id, dd_type);
5170
5171 /*
5172 Remove the corresponding incomplete create indexes from data
5173 dictionary as well
5174 */
5175 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5176 if (incomplete_create_indexes.count(gl_index_id)) {
5177 end_ongoing_index_operation(batch, gl_index_id,
5178 Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5179 }
5180 }
5181 }
5182
5183 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5184 delete_index_info(batch, gl_index_id);
5185 }
5186 }
5187 commit(batch);
5188 }
5189
5190 /*
5191 This function is supposed to be called when initializing
5192 Rdb_dict_manager (at startup). If there is any index ids that are
5193 drop ongoing, printing out messages for diagnostics purposes.
5194 */
resume_drop_indexes() const5195 void Rdb_dict_manager::resume_drop_indexes() const {
5196 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5197 get_ongoing_drop_indexes(&gl_index_ids);
5198
5199 uint max_index_id_in_dict = 0;
5200 get_max_index_id(&max_index_id_in_dict);
5201
5202 for (const auto &gl_index_id : gl_index_ids) {
5203 log_start_drop_index(gl_index_id, "Resume");
5204 if (max_index_id_in_dict < gl_index_id.index_id) {
5205 // NO_LINT_DEBUG
5206 sql_print_error(
5207 "RocksDB: Found max index id %u from data dictionary "
5208 "but also found dropped index id (%u,%u) from drop_index "
5209 "dictionary. This should never happen and is possibly a "
5210 "bug.",
5211 max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
5212 abort();
5213 }
5214 }
5215 }
5216
rollback_ongoing_index_creation() const5217 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5218 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5219 rocksdb::WriteBatch *const batch = wb.get();
5220
5221 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5222 get_ongoing_create_indexes(&gl_index_ids);
5223
5224 for (const auto &gl_index_id : gl_index_ids) {
5225 // NO_LINT_DEBUG
5226 sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)",
5227 gl_index_id.cf_id, gl_index_id.index_id);
5228
5229 start_drop_index(batch, gl_index_id);
5230 }
5231
5232 commit(batch);
5233 }
5234
log_start_drop_table(const std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,const char * const log_action) const5235 void Rdb_dict_manager::log_start_drop_table(
5236 const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5237 const char *const log_action) const {
5238 for (uint32 i = 0; i < n_keys; i++) {
5239 log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5240 }
5241 }
5242
log_start_drop_index(GL_INDEX_ID gl_index_id,const char * log_action) const5243 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5244 const char *log_action) const {
5245 struct Rdb_index_info index_info;
5246 if (!get_index_info(gl_index_id, &index_info)) {
5247 /*
5248 If we don't find the index info, it could be that it's because it was a
5249 partially created index that isn't in the data dictionary yet that needs
5250 to be rolled back.
5251 */
5252 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5253 get_ongoing_create_indexes(&incomplete_create_indexes);
5254
5255 if (!incomplete_create_indexes.count(gl_index_id)) {
5256 /* If it's not a partially created index, something is very wrong. */
5257 // NO_LINT_DEBUG
5258 sql_print_error(
5259 "RocksDB: Failed to get column family info "
5260 "from index id (%u,%u). MyRocks data dictionary may "
5261 "get corrupted.",
5262 gl_index_id.cf_id, gl_index_id.index_id);
5263 if (rocksdb_ignore_datadic_errors)
5264 {
5265 sql_print_error("RocksDB: rocksdb_ignore_datadic_errors=1, "
5266 "trying to continue");
5267 return;
5268 }
5269 abort();
5270 }
5271 }
5272 }
5273
get_max_index_id(uint32_t * const index_id) const5274 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5275 bool found = false;
5276 std::string value;
5277
5278 const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5279 if (status.ok()) {
5280 const uchar *const val = (const uchar *)value.c_str();
5281 const uint16_t version = rdb_netbuf_to_uint16(val);
5282 if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5283 *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5284 found = true;
5285 }
5286 }
5287 return found;
5288 }
5289
update_max_index_id(rocksdb::WriteBatch * const batch,const uint32_t index_id) const5290 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5291 const uint32_t index_id) const {
5292 DBUG_ASSERT(batch != nullptr);
5293
5294 uint32_t old_index_id = -1;
5295 if (get_max_index_id(&old_index_id)) {
5296 if (old_index_id > index_id) {
5297 // NO_LINT_DEBUG
5298 sql_print_error(
5299 "RocksDB: Found max index id %u from data dictionary "
5300 "but trying to update to older value %u. This should "
5301 "never happen and possibly a bug.",
5302 old_index_id, index_id);
5303 return true;
5304 }
5305 }
5306
5307 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5308 value_writer;
5309 value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
5310 value_writer.write_uint32(index_id);
5311
5312 batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
5313 return false;
5314 }
5315
add_stats(rocksdb::WriteBatch * const batch,const std::vector<Rdb_index_stats> & stats) const5316 void Rdb_dict_manager::add_stats(
5317 rocksdb::WriteBatch *const batch,
5318 const std::vector<Rdb_index_stats> &stats) const {
5319 DBUG_ASSERT(batch != nullptr);
5320
5321 for (const auto &it : stats) {
5322 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5323 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5324
5325 // IndexStats::materialize takes complete care of serialization including
5326 // storing the version
5327 const auto value =
5328 Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5329
5330 batch->Put(m_system_cfh, key_writer.to_slice(), value);
5331 }
5332 }
5333
get_stats(GL_INDEX_ID gl_index_id) const5334 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5335 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5336 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5337
5338 std::string value;
5339 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5340 if (status.ok()) {
5341 std::vector<Rdb_index_stats> v;
5342 // unmaterialize checks if the version matches
5343 if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5344 return v[0];
5345 }
5346 }
5347
5348 return Rdb_index_stats();
5349 }
5350
put_auto_incr_val(rocksdb::WriteBatchBase * batch,const GL_INDEX_ID & gl_index_id,ulonglong val,bool overwrite) const5351 rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
5352 rocksdb::WriteBatchBase *batch, const GL_INDEX_ID &gl_index_id,
5353 ulonglong val, bool overwrite) const {
5354 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5355 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5356
5357 // Value is constructed by storing the version and the value.
5358 Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5359 ROCKSDB_SIZEOF_AUTOINC_VALUE>
5360 value_writer;
5361 value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
5362 value_writer.write_uint64(val);
5363
5364 if (overwrite) {
5365 return batch->Put(m_system_cfh, key_writer.to_slice(),
5366 value_writer.to_slice());
5367 }
5368 return batch->Merge(m_system_cfh, key_writer.to_slice(),
5369 value_writer.to_slice());
5370 }
5371
get_auto_incr_val(const GL_INDEX_ID & gl_index_id,ulonglong * new_val) const5372 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5373 ulonglong *new_val) const {
5374 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5375 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5376
5377 std::string value;
5378 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5379
5380 if (status.ok()) {
5381 const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5382
5383 if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5384 *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5385 return true;
5386 }
5387 }
5388 return false;
5389 }
5390
get_and_update_next_number(Rdb_dict_manager * const dict)5391 uint Rdb_seq_generator::get_and_update_next_number(
5392 Rdb_dict_manager *const dict) {
5393 DBUG_ASSERT(dict != nullptr);
5394
5395 uint res;
5396 RDB_MUTEX_LOCK_CHECK(m_mutex);
5397
5398 res = m_next_number++;
5399
5400 const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
5401 rocksdb::WriteBatch *const batch = wb.get();
5402
5403 DBUG_ASSERT(batch != nullptr);
5404 dict->update_max_index_id(batch, res);
5405 dict->commit(batch);
5406
5407 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
5408
5409 return res;
5410 }
5411
5412 } // namespace myrocks
5413