1 /*
2 Copyright (c) 2012,2013 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation // gcc: Class implementation
19 #endif
20
21 /* This C++ file's header file */
22 #include "./rdb_datadic.h"
23
24 /* C++ standard header files */
25 #include <algorithm>
26 #include <array>
27 #include <limits>
28 #include <map>
29 #include <set>
30 #include <string>
31 #include <utility>
32 #include <vector>
33
34 /* MySQL header files */
35 #include "field.h"
36 #include "key.h"
37 #include "m_ctype.h"
38 #include "my_bit.h"
39 #include "my_bitmap.h"
40 #include "my_stacktrace.h"
41 #include "sql_table.h"
42
43 /* MyRocks header files */
44 #include "./ha_rocksdb.h"
45 #include "./ha_rocksdb_proto.h"
46 #include "./rdb_cf_manager.h"
47 #include "./rdb_psi.h"
48 #include "./rdb_utils.h"
49
50 #include "partitioning/partition_base.h"
51
52 namespace myrocks {
53
54 void get_mem_comparable_space(const CHARSET_INFO *cs,
55 const std::vector<uchar> **xfrm, size_t *xfrm_len,
56 size_t *mb_len);
57
58 /*
59 Decode current key field
60 @param fpi IN data structure contains field metadata
61 @param field IN current field
62 @param reader IN key slice reader
63 @param unp_reader IN unpack information reader
64 @return
65 HA_EXIT_SUCCESS OK
66 other HA_ERR error code
67 */
decode_field(Rdb_field_packing * fpi,TABLE * table,uchar * buf,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)68 int Rdb_convert_to_record_key_decoder::decode_field(
69 Rdb_field_packing *fpi, TABLE *table, uchar *buf, Rdb_string_reader *reader,
70 Rdb_string_reader *unpack_reader) {
71 if (fpi->m_field_maybe_null) {
72 const char *nullp;
73 if (!(nullp = reader->read(1))) {
74 return HA_EXIT_FAILURE;
75 }
76
77 if (likely(*nullp == 1)) {
78 /* Clear the NULL-bit of this field */
79 buf[fpi->m_field_null_offset] &= (uchar) ~(fpi->m_field_null_bit_mask);
80 } else if (*nullp == 0) {
81 /* Set the NULL-bit of this field */
82 buf[fpi->m_field_null_offset] |= fpi->m_field_null_bit_mask;
83
84 /* Also set the field to its default value */
85 auto default_value = table->s->default_values + fpi->m_field_offset;
86 memcpy(buf + fpi->m_field_offset, default_value,
87 fpi->m_field_pack_length);
88 return HA_EXIT_SUCCESS;
89 } else {
90 return HA_EXIT_FAILURE;
91 }
92 }
93
94 return (fpi->m_unpack_func)(fpi, buf + fpi->m_field_offset, reader,
95 unpack_reader);
96 }
97
98 /*
99 Decode current key field
100
101 @param buf OUT the buf starting address
102 @param offset OUT the bytes offset when data is written
103 @param fpi IN data structure contains field metadata
104 @param table IN current table
105 @param field IN current field
106 @param has_unpack_inf IN whether contains unpack inf
107 @param reader IN key slice reader
108 @param unp_reader IN unpack information reader
109 @return
110 HA_EXIT_SUCCESS OK
111 other HA_ERR error code
112 */
decode(uchar * const buf,Rdb_field_packing * fpi,TABLE * table,bool has_unpack_info,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)113 int Rdb_convert_to_record_key_decoder::decode(
114 uchar *const buf, Rdb_field_packing *fpi, TABLE *table,
115 bool has_unpack_info, Rdb_string_reader *reader,
116 Rdb_string_reader *unpack_reader) {
117 assert(buf != nullptr);
118
119 // If we need unpack info, but there is none, tell the unpack function
120 // this by passing unp_reader as nullptr. If we never read unpack_info
121 // during unpacking anyway, then there won't an error.
122 bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
123
124 int res = decode_field(fpi, table, buf, reader,
125 maybe_missing_unpack ? nullptr : unpack_reader);
126
127 if (res != UNPACK_SUCCESS) {
128 return HA_ERR_ROCKSDB_CORRUPT_DATA;
129 }
130 return HA_EXIT_SUCCESS;
131 }
132
133 /*
134 Skip current key field
135
136 @param fpi IN data structure contains field metadata
137 @param field IN current field
138 @param reader IN key slice reader
139 @param unp_reader IN unpack information reader
140 @return
141 HA_EXIT_SUCCESS OK
142 other HA_ERR error code
143 */
skip(const Rdb_field_packing * fpi,const Field * field,Rdb_string_reader * reader,Rdb_string_reader * unp_reader)144 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
145 const Field *field,
146 Rdb_string_reader *reader,
147 Rdb_string_reader *unp_reader) {
148 /* It is impossible to unpack the column. Skip it. */
149 if (fpi->m_field_maybe_null) {
150 const char *nullp;
151 if (!(nullp = reader->read(1))) {
152 return HA_ERR_ROCKSDB_CORRUPT_DATA;
153 }
154 if (*nullp == 0) {
155 /* This is a NULL value */
156 return HA_EXIT_SUCCESS;
157 }
158 /* If NULL marker is not '0', it can be only '1' */
159 if (*nullp != 1) {
160 return HA_ERR_ROCKSDB_CORRUPT_DATA;
161 }
162 }
163 if ((fpi->m_skip_func)(fpi, reader)) {
164 return HA_ERR_ROCKSDB_CORRUPT_DATA;
165 }
166 // If this is a space padded varchar, we need to skip the indicator
167 // bytes for trailing bytes. They're useless since we can't restore the
168 // field anyway.
169 //
170 // There is a special case for prefixed varchars where we do not
171 // generate unpack info, because we know prefixed varchars cannot be
172 // unpacked. In this case, it is not necessary to skip.
173 if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
174 !fpi->m_unpack_info_stores_value) {
175 unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
176 }
177 return HA_EXIT_SUCCESS;
178 }
179
Rdb_key_field_iterator(const Rdb_key_def * key_def,Rdb_field_packing * pack_info,Rdb_string_reader * reader,Rdb_string_reader * unp_reader,TABLE * table,bool has_unpack_info,const MY_BITMAP * covered_bitmap,uchar * const buf)180 Rdb_key_field_iterator::Rdb_key_field_iterator(
181 const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
182 Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
183 bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
184 m_key_def = key_def;
185 m_fpi = pack_info;
186 m_fpi_end = pack_info + key_def->get_key_parts();
187 m_reader = reader;
188 m_unp_reader = unp_reader;
189 m_table = table;
190 m_has_unpack_info = has_unpack_info;
191 m_covered_bitmap = covered_bitmap;
192 m_buf = buf;
193 m_secondary_key =
194 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
195 m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
196 m_is_hidden_pk =
197 (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
198 m_curr_bitmap_pos = 0;
199 }
200
has_next()201 bool Rdb_key_field_iterator::has_next() { return m_fpi < m_fpi_end; }
202
203 /**
204 Iterate each field in the key and decode/skip one by one
205 */
next()206 int Rdb_key_field_iterator::next() {
207 int status = HA_EXIT_SUCCESS;
208 while (m_fpi < m_fpi_end) {
209 auto fpi = m_fpi++;
210
211 /*
212 Hidden pk field is packed at the end of the secondary keys, but the SQL
213 layer does not know about it. Skip retrieving field if hidden pk.
214 */
215 if ((m_secondary_key && m_hidden_pk_exists && fpi + 1 == m_fpi_end) ||
216 m_is_hidden_pk) {
217 assert(fpi->m_unpack_func);
218 if ((fpi->m_skip_func)(fpi, m_reader)) {
219 return HA_ERR_ROCKSDB_CORRUPT_DATA;
220 }
221 return HA_EXIT_SUCCESS;
222 }
223
224 bool covered_column = true;
225 if (m_covered_bitmap != nullptr &&
226 fpi->m_field_real_type == MYSQL_TYPE_VARCHAR && !fpi->m_covered) {
227 covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
228 bitmap_is_set(m_covered_bitmap, m_curr_bitmap_pos++);
229 }
230
231 if (fpi->m_unpack_func && covered_column) {
232 /* It is possible to unpack this column. Do it. */
233 status = Rdb_convert_to_record_key_decoder::decode(
234 m_buf, fpi, m_table, m_has_unpack_info, m_reader, m_unp_reader);
235 if (status) {
236 return status;
237 }
238 break;
239 } else {
240 auto field = fpi->get_field_in_table(m_table);
241 status = Rdb_convert_to_record_key_decoder::skip(fpi, field, m_reader,
242 m_unp_reader);
243 if (status) {
244 return status;
245 }
246 }
247 }
248 return HA_EXIT_SUCCESS;
249 }
250
251 /*
252 Rdb_key_def class implementation
253 */
Rdb_key_def(uint indexnr_arg,uint keyno_arg,std::shared_ptr<rocksdb::ColumnFamilyHandle> cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_per_partition_cf_arg,const char * _name,Rdb_index_stats _stats,uint32 index_flags_bitmap,uint32 ttl_rec_offset,uint64 ttl_duration)254 Rdb_key_def::Rdb_key_def(
255 uint indexnr_arg, uint keyno_arg,
256 std::shared_ptr<rocksdb::ColumnFamilyHandle> cf_handle_arg,
257 uint16_t index_dict_version_arg, uchar index_type_arg,
258 uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
259 bool is_per_partition_cf_arg, const char *_name, Rdb_index_stats _stats,
260 uint32 index_flags_bitmap, uint32 ttl_rec_offset, uint64 ttl_duration)
261 : m_index_number(indexnr_arg),
262 m_cf_handle(cf_handle_arg),
263 m_index_dict_version(index_dict_version_arg),
264 m_index_type(index_type_arg),
265 m_kv_format_version(kv_format_version_arg),
266 m_is_reverse_cf(is_reverse_cf_arg),
267 m_is_per_partition_cf(is_per_partition_cf_arg),
268 m_name(_name),
269 m_stats(_stats),
270 m_index_flags_bitmap(index_flags_bitmap),
271 m_ttl_rec_offset(ttl_rec_offset),
272 m_ttl_duration(ttl_duration),
273 m_ttl_column(""),
274 m_pk_part_no(nullptr),
275 m_pack_info(nullptr),
276 m_keyno(keyno_arg),
277 m_key_parts(0),
278 m_ttl_pk_key_part_offset(UINT_MAX),
279 m_ttl_field_index(UINT_MAX),
280 m_prefix_extractor(nullptr),
281 m_maxlength(0) // means 'not intialized'
282 {
283 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
284 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
285 m_total_index_flags_length =
286 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
287 assert_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
288 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
289 m_total_index_flags_length == 0);
290 assert_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
291 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
292 m_total_index_flags_length == 0);
293 assert(m_cf_handle);
294 }
295
Rdb_key_def(const Rdb_key_def & k)296 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
297 : m_index_number(k.m_index_number),
298 m_cf_handle(k.m_cf_handle),
299 m_is_reverse_cf(k.m_is_reverse_cf),
300 m_is_per_partition_cf(k.m_is_per_partition_cf),
301 m_name(k.m_name),
302 m_stats(k.m_stats),
303 m_index_flags_bitmap(k.m_index_flags_bitmap),
304 m_ttl_rec_offset(k.m_ttl_rec_offset),
305 m_ttl_duration(k.m_ttl_duration),
306 m_ttl_column(k.m_ttl_column),
307 m_pk_part_no(k.m_pk_part_no),
308 m_pack_info(nullptr),
309 m_keyno(k.m_keyno),
310 m_key_parts(k.m_key_parts),
311 m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
312 m_ttl_field_index(UINT_MAX),
313 m_prefix_extractor(k.m_prefix_extractor),
314 m_maxlength(k.m_maxlength) {
315 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
316 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
317 m_total_index_flags_length =
318 calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
319 assert_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
320 m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
321 m_total_index_flags_length == 0);
322 assert_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
323 m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
324 m_total_index_flags_length == 0);
325 if (k.m_pack_info) {
326 const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
327 #ifdef HAVE_PSI_INTERFACE
328 void *buf = my_malloc(rdb_datadic_memory_key, size, MYF(0));
329 #else
330 void *buf = my_malloc(PSI_NOT_INSTRUMENTED, size, MYF(0));
331 #endif
332 m_pack_info = new (buf) Rdb_field_packing(*k.m_pack_info);
333 }
334
335 if (k.m_pk_part_no) {
336 const size_t size = sizeof(uint) * m_key_parts;
337 #ifdef HAVE_PSI_INTERFACE
338 m_pk_part_no =
339 static_cast<uint *>(my_malloc(rdb_datadic_memory_key, size, MYF(0)));
340 #else
341 m_pk_part_no =
342 static_cast<uint *>(my_malloc(PSI_NOT_INSTRUMENTED, size, MYF(0)));
343 #endif
344 memcpy(m_pk_part_no, k.m_pk_part_no, size);
345 }
346 }
347
~Rdb_key_def()348 Rdb_key_def::~Rdb_key_def() {
349 mysql_mutex_destroy(&m_mutex);
350
351 my_free(m_pk_part_no);
352 m_pk_part_no = nullptr;
353
354 if (m_pack_info) {
355 m_pack_info->~Rdb_field_packing();
356 my_free(m_pack_info);
357 }
358 m_pack_info = nullptr;
359 }
360
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)361 void Rdb_key_def::setup(const TABLE *const tbl,
362 const Rdb_tbl_def *const tbl_def) {
363 assert(tbl != nullptr);
364 assert(tbl_def != nullptr);
365
366 /*
367 Set max_length based on the table. This can be called concurrently from
368 multiple threads, so there is a mutex to protect this code.
369 */
370 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
371 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
372 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
373 if (!m_maxlength) {
374 RDB_MUTEX_LOCK_CHECK(m_mutex);
375 if (m_maxlength != 0) {
376 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
377 return;
378 }
379
380 KEY *key_info = nullptr;
381 KEY *pk_info = nullptr;
382 if (!is_hidden_pk) {
383 key_info = &tbl->key_info[m_keyno];
384 if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
385 m_name = std::string(key_info->name);
386 } else {
387 m_name = HIDDEN_PK_NAME;
388 }
389
390 if (secondary_key) {
391 m_pk_key_parts = hidden_pk_exists ? 1 : pk_info->actual_key_parts;
392 } else {
393 pk_info = nullptr;
394 m_pk_key_parts = 0;
395 }
396
397 // "unique" secondary keys support:
398 m_key_parts = is_hidden_pk ? 1 : key_info->actual_key_parts;
399
400 if (secondary_key) {
401 /*
402 In most cases, SQL layer puts PK columns as invisible suffix at the
403 end of secondary key. There are cases where this doesn't happen:
404 - unique secondary indexes.
405 - partitioned tables.
406
407 Internally, we always need PK columns as suffix (and InnoDB does,
408 too, if you were wondering).
409
410 The loop below will attempt to put all PK columns at the end of key
411 definition. Columns that are already included in the index (either
412 by the user or by "extended keys" feature) are not included for the
413 second time.
414 */
415 m_key_parts += m_pk_key_parts;
416 }
417
418 if (secondary_key) {
419 #ifdef HAVE_PSI_INTERFACE
420 m_pk_part_no = static_cast<uint *>(my_malloc(
421 rdb_datadic_memory_key, sizeof(uint) * m_key_parts, MYF(0)));
422 #else
423 m_pk_part_no = static_cast<uint *>(
424 my_malloc(PSI_NOT_INSTRUMENTED, sizeof(uint) * m_key_parts, MYF(0)));
425 #endif
426 } else {
427 m_pk_part_no = nullptr;
428 }
429
430 const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
431 #ifdef HAVE_PSI_INTERFACE
432 void *buf = my_malloc(rdb_datadic_memory_key, size, MYF(0));
433 #else
434 void *buf = my_malloc(PSI_NOT_INSTRUMENTED, size, MYF(0));
435 #endif
436 m_pack_info = new (buf) Rdb_field_packing;
437
438 /*
439 Guaranteed not to error here as checks have been made already during
440 table creation.
441 */
442 Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
443 &m_ttl_field_index, true);
444
445 size_t max_len = INDEX_NUMBER_SIZE;
446 int unpack_len = 0;
447 int max_part_len = 0;
448 bool simulating_extkey = false;
449 uint dst_i = 0;
450
451 uint keyno_to_set = m_keyno;
452 uint keypart_to_set = 0;
453
454 if (is_hidden_pk) {
455 Field *field = nullptr;
456 m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
457 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
458 max_len += m_pack_info[dst_i].m_max_image_len;
459 max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
460 dst_i++;
461 } else {
462 KEY_PART_INFO *key_part = key_info->key_part;
463
464 /* this loop also loops over the 'extended key' tail */
465 for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
466 Field *const field = key_part ? key_part->field : nullptr;
467
468 if (simulating_extkey && !hidden_pk_exists) {
469 assert(secondary_key);
470 /* Check if this field is already present in the key definition */
471 bool found = false;
472 for (uint j = 0; j < key_info->actual_key_parts; j++) {
473 if (field->field_index ==
474 key_info->key_part[j].field->field_index &&
475 key_part->length == key_info->key_part[j].length) {
476 found = true;
477 break;
478 }
479 }
480
481 if (found) {
482 key_part++;
483 continue;
484 }
485 }
486
487 if (field && field->real_maybe_null()) max_len += 1; // NULL-byte
488
489 m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
490 key_part ? key_part->length : 0);
491 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
492
493 if (pk_info) {
494 m_pk_part_no[dst_i] = -1;
495 for (uint j = 0; j < m_pk_key_parts; j++) {
496 if (field->field_index == pk_info->key_part[j].field->field_index) {
497 m_pk_part_no[dst_i] = j;
498 break;
499 }
500 }
501 } else if (secondary_key && hidden_pk_exists) {
502 /*
503 The hidden pk can never be part of the sk. So it is always
504 appended to the end of the sk.
505 */
506 m_pk_part_no[dst_i] = -1;
507 if (simulating_extkey) m_pk_part_no[dst_i] = 0;
508 }
509
510 max_len += m_pack_info[dst_i].m_max_image_len;
511
512 max_part_len =
513 std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
514
515 /*
516 Check key part name here, if it matches the TTL column then we store
517 the offset of the TTL key part here.
518 */
519 if (!m_ttl_column.empty() &&
520 my_strcasecmp(system_charset_info, field->field_name,
521 m_ttl_column.c_str()) == 0) {
522 assert(field->real_type() == MYSQL_TYPE_LONGLONG);
523 assert(field->key_type() == HA_KEYTYPE_ULONGLONG);
524 assert(!field->real_maybe_null());
525 m_ttl_pk_key_part_offset = dst_i;
526 }
527
528 key_part++;
529 /*
530 For "unique" secondary indexes, pretend they have
531 "index extensions"
532 */
533 if (secondary_key && src_i + 1 == key_info->actual_key_parts) {
534 simulating_extkey = true;
535 if (!hidden_pk_exists) {
536 keyno_to_set = tbl->s->primary_key;
537 key_part = pk_info->key_part;
538 keypart_to_set = (uint)-1;
539 } else {
540 keyno_to_set = tbl_def->m_key_count - 1;
541 key_part = nullptr;
542 keypart_to_set = 0;
543 }
544 }
545
546 dst_i++;
547 }
548 }
549
550 m_key_parts = dst_i;
551
552 /* Initialize the memory needed by the stats structure */
553 m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
554
555 /* Cache prefix extractor for bloom filter usage later */
556 rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
557 m_prefix_extractor = opt.prefix_extractor;
558
559 /*
560 This should be the last member variable set before releasing the mutex
561 so that other threads can't see the object partially set up.
562 */
563 m_maxlength = max_len;
564
565 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
566 }
567 }
568
569 /*
570 Determine if the table has TTL enabled by parsing the table comment.
571
572 @param[IN] table_arg
573 @param[IN] tbl_def_arg
574 @param[OUT] ttl_duration Default TTL value parsed from table comment
575 */
extract_ttl_duration(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,uint64 * ttl_duration)576 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
577 const Rdb_tbl_def *const tbl_def_arg,
578 uint64 *ttl_duration) {
579 assert(table_arg != nullptr);
580 assert(tbl_def_arg != nullptr);
581 assert(ttl_duration != nullptr);
582 std::string table_comment(table_arg->s->comment.str,
583 table_arg->s->comment.length);
584
585 bool ttl_duration_per_part_match_found = false;
586 std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
587 table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
588 RDB_TTL_DURATION_QUALIFIER);
589
590 /* If we don't have a ttl duration, nothing to do here. */
591 if (ttl_duration_str.empty()) {
592 return HA_EXIT_SUCCESS;
593 }
594
595 /*
596 Catch errors where a non-integral value was used as ttl duration, strtoull
597 will return 0.
598 */
599 *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
600 if (!*ttl_duration) {
601 my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
602 return HA_EXIT_FAILURE;
603 }
604
605 return HA_EXIT_SUCCESS;
606 }
607
608 /*
609 Determine if the table has TTL enabled by parsing the table comment.
610
611 @param[IN] table_arg
612 @param[IN] tbl_def_arg
613 @param[OUT] ttl_column TTL column in the table
614 @param[IN] skip_checks Skip validation checks (when called in
615 setup())
616 */
extract_ttl_col(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,std::string * ttl_column,uint * ttl_field_index,bool skip_checks)617 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
618 const Rdb_tbl_def *const tbl_def_arg,
619 std::string *ttl_column,
620 uint *ttl_field_index, bool skip_checks) {
621 std::string table_comment(table_arg->s->comment.str,
622 table_arg->s->comment.length);
623 /*
624 Check if there is a TTL column specified. Note that this is not required
625 and if omitted, an 8-byte ttl field will be prepended to each record
626 implicitly.
627 */
628 bool ttl_col_per_part_match_found = false;
629 std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
630 table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
631 RDB_TTL_COL_QUALIFIER);
632
633 if (skip_checks) {
634 for (uint i = 0; i < table_arg->s->fields; i++) {
635 Field *const field = table_arg->field[i];
636 if (my_strcasecmp(system_charset_info, field->field_name,
637 ttl_col_str.c_str()) == 0) {
638 *ttl_column = ttl_col_str;
639 *ttl_field_index = i;
640 }
641 }
642 return HA_EXIT_SUCCESS;
643 }
644
645 /* Check if TTL column exists in table */
646 if (!ttl_col_str.empty()) {
647 bool found = false;
648 for (uint i = 0; i < table_arg->s->fields; i++) {
649 Field *const field = table_arg->field[i];
650 if (my_strcasecmp(system_charset_info, field->field_name,
651 ttl_col_str.c_str()) == 0 &&
652 field->real_type() == MYSQL_TYPE_LONGLONG &&
653 field->key_type() == HA_KEYTYPE_ULONGLONG &&
654 !field->real_maybe_null()) {
655 *ttl_column = ttl_col_str;
656 *ttl_field_index = i;
657 found = true;
658 break;
659 }
660 }
661
662 if (!found) {
663 my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
664 return HA_EXIT_FAILURE;
665 }
666 }
667
668 return HA_EXIT_SUCCESS;
669 }
670
gen_qualifier_for_table(const char * const qualifier,const std::string & partition_name)671 const std::string Rdb_key_def::gen_qualifier_for_table(
672 const char *const qualifier, const std::string &partition_name) {
673 bool has_partition = !partition_name.empty();
674 std::string qualifier_str = "";
675
676 if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
677 return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
678 : qualifier_str + RDB_CF_NAME_QUALIFIER +
679 RDB_QUALIFIER_VALUE_SEP;
680 } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
681 return has_partition
682 ? gen_ttl_duration_qualifier_for_partition(partition_name)
683 : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
684 RDB_QUALIFIER_VALUE_SEP;
685 } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
686 return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
687 : qualifier_str + RDB_TTL_COL_QUALIFIER +
688 RDB_QUALIFIER_VALUE_SEP;
689 } else {
690 assert(0);
691 }
692
693 return qualifier_str;
694 }
695
696 /*
697 Formats the string and returns the column family name assignment part for a
698 specific partition.
699 */
gen_cf_name_qualifier_for_partition(const std::string & prefix)700 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
701 const std::string &prefix) {
702 assert(!prefix.empty());
703
704 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
705 RDB_QUALIFIER_VALUE_SEP;
706 }
707
gen_ttl_duration_qualifier_for_partition(const std::string & prefix)708 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
709 const std::string &prefix) {
710 assert(!prefix.empty());
711
712 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
713 RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
714 }
715
gen_ttl_col_qualifier_for_partition(const std::string & prefix)716 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
717 const std::string &prefix) {
718 assert(!prefix.empty());
719
720 return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
721 RDB_QUALIFIER_VALUE_SEP;
722 }
723
parse_comment_for_qualifier(const std::string & comment,const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,bool * per_part_match_found,const char * const qualifier)724 const std::string Rdb_key_def::parse_comment_for_qualifier(
725 const std::string &comment, const TABLE *const table_arg,
726 const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
727 const char *const qualifier) {
728 assert(table_arg != nullptr);
729 assert(tbl_def_arg != nullptr);
730 assert(per_part_match_found != nullptr);
731 assert(qualifier != nullptr);
732
733 std::string empty_result;
734
735 // Flag which marks if partition specific options were found.
736 *per_part_match_found = false;
737
738 if (comment.empty()) {
739 return empty_result;
740 }
741
742 // Let's fetch the comment for a index and check if there's a custom key
743 // name specified for a partition we are handling.
744 std::vector<std::string> v =
745 myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
746
747 std::string search_str = gen_qualifier_for_table(qualifier);
748
749 // If table has partitions then we need to check if user has requested
750 // qualifiers on a per partition basis.
751 //
752 // NOTE: this means if you specify a qualifier for a specific partition it
753 // will take precedence the 'table level' qualifier if one exists.
754 std::string search_str_part;
755 if (table_arg->part_info != nullptr) {
756 std::string partition_name = tbl_def_arg->base_partition();
757 assert(!partition_name.empty());
758 search_str_part = gen_qualifier_for_table(qualifier, partition_name);
759 }
760
761 assert(!search_str.empty());
762
763 // Basic O(N) search for a matching assignment. At most we expect maybe
764 // ten or so elements here.
765 if (!search_str_part.empty()) {
766 for (const auto &it : v) {
767 if (it.substr(0, search_str_part.length()) == search_str_part) {
768 // We found a prefix match. Try to parse it as an assignment.
769 std::vector<std::string> tokens =
770 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
771
772 // We found a custom qualifier, it was in the form we expected it to be.
773 // Return that instead of whatever we initially wanted to return. In
774 // a case below the `foo` part will be returned to the caller.
775 //
776 // p3_cfname=foo
777 //
778 // If no value was specified then we'll return an empty string which
779 // later gets translated into using a default CF.
780 if (tokens.size() == 2) {
781 *per_part_match_found = true;
782 return tokens[1];
783 } else {
784 return empty_result;
785 }
786 }
787 }
788 }
789
790 // Do this loop again, this time searching for 'table level' qualifiers if we
791 // didn't find any partition level qualifiers above.
792 for (const auto &it : v) {
793 if (it.substr(0, search_str.length()) == search_str) {
794 std::vector<std::string> tokens =
795 myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
796 if (tokens.size() == 2) {
797 return tokens[1];
798 } else {
799 return empty_result;
800 }
801 }
802 }
803
804 // If we didn't find any partitioned/non-partitioned qualifiers, return an
805 // empty string.
806 return empty_result;
807 }
808
809 /**
810 Read a memcmp key part from a slice using the passed in reader.
811
812 Returns -1 if field was null, 1 if error, 0 otherwise.
813 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const814 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
815 Rdb_string_reader *reader,
816 const uint part_num) const {
817 /* It is impossible to unpack the column. Skip it. */
818 if (m_pack_info[part_num].m_field_maybe_null) {
819 const char *nullp;
820 if (!(nullp = reader->read(1))) return 1;
821 if (*nullp == 0) {
822 /* This is a NULL value */
823 return -1;
824 } else {
825 /* If NULL marker is not '0', it can be only '1' */
826 if (*nullp != 1) return 1;
827 }
828 }
829
830 Rdb_field_packing *fpi = &m_pack_info[part_num];
831 assert(table_arg->s != nullptr);
832
833 if ((fpi->m_skip_func)(fpi, reader)) {
834 return 1;
835 }
836 return 0;
837 }
838
839 /**
840 Get a mem-comparable form of Primary Key from mem-comparable form of this key
841
842 @param
843 pk_descr Primary Key descriptor
844 key Index tuple from this key in mem-comparable form
845 pk_buffer OUT Put here mem-comparable form of the Primary Key.
846
847 @note
848 It may or may not be possible to restore primary key columns to their
849 mem-comparable form. To handle all cases, this function copies mem-
850 comparable forms directly.
851
852 RocksDB SE supports "Extended keys". This means that PK columns are present
853 at the end of every key. If the key already includes PK columns, then
854 these columns are not present at the end of the key.
855
856 Because of the above, we copy each primary key column.
857
858 @todo
859 If we checked crc32 checksums in this function, we would catch some CRC
860 violations that we currently don't. On the other hand, there is a broader
861 set of queries for which we would check the checksum twice.
862 */
863
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const864 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
865 const Rdb_key_def &pk_descr,
866 const rocksdb::Slice *const key,
867 uchar *const pk_buffer) const {
868 assert(table != nullptr);
869 assert(key != nullptr);
870 assert(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
871 assert(pk_buffer);
872
873 uint size = 0;
874 uchar *buf = pk_buffer;
875 assert(m_pk_key_parts);
876
877 /* Put the PK number */
878 rdb_netbuf_store_index(buf, pk_descr.m_index_number);
879 buf += INDEX_NUMBER_SIZE;
880 size += INDEX_NUMBER_SIZE;
881
882 const char *start_offs[MAX_REF_PARTS];
883 const char *end_offs[MAX_REF_PARTS];
884 int pk_key_part;
885 uint i;
886 Rdb_string_reader reader(key);
887
888 // Skip the index number
889 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
890
891 for (i = 0; i < m_key_parts; i++) {
892 if ((pk_key_part = m_pk_part_no[i]) != -1) {
893 start_offs[pk_key_part] = reader.get_current_ptr();
894 }
895
896 if (read_memcmp_key_part(table, &reader, i) > 0) {
897 return RDB_INVALID_KEY_LEN;
898 }
899
900 if (pk_key_part != -1) {
901 end_offs[pk_key_part] = reader.get_current_ptr();
902 }
903 }
904
905 for (i = 0; i < m_pk_key_parts; i++) {
906 const uint part_size = end_offs[i] - start_offs[i];
907 memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
908 buf += part_size;
909 size += part_size;
910 }
911
912 return size;
913 }
914
915 /**
916 Get a mem-comparable form of Secondary Key from mem-comparable form of this
917 key, without the extended primary key tail.
918
919 @param
920 key Index tuple from this key in mem-comparable form
921 sk_buffer OUT Put here mem-comparable form of the Secondary Key.
922 n_null_fields OUT Put number of null fields contained within sk entry
923 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const924 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
925 const rocksdb::Slice &key,
926 uchar *sk_buffer,
927 uint *n_null_fields) const {
928 assert(table != nullptr);
929 assert(sk_buffer != nullptr);
930 assert(n_null_fields != nullptr);
931 assert(m_keyno != table->s->primary_key);
932 assert(!table_has_hidden_pk(table));
933
934 uchar *buf = sk_buffer;
935
936 int res;
937 Rdb_string_reader reader(&key);
938 const char *start = reader.get_current_ptr();
939
940 // Skip the index number
941 if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
942
943 for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
944 if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
945 return RDB_INVALID_KEY_LEN;
946 } else if (res == -1) {
947 (*n_null_fields)++;
948 }
949 }
950
951 uint sk_memcmp_len = reader.get_current_ptr() - start;
952 memcpy(buf, start, sk_memcmp_len);
953 return sk_memcmp_len;
954 }
955
956 /**
957 Convert index tuple into storage (i.e. mem-comparable) format
958
959 @detail
960 Currently this is done by unpacking into table->record[0] and then
961 packing index columns into storage format.
962
963 @param pack_buffer Temporary area for packing varchar columns. Its
964 size is at least max_storage_fmt_length() bytes.
965 */
966
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,const uchar * const key_tuple,const key_part_map & keypart_map) const967 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
968 uchar *const packed_tuple,
969 const uchar *const key_tuple,
970 const key_part_map &keypart_map) const {
971 assert(tbl != nullptr);
972 assert(pack_buffer != nullptr);
973 assert(packed_tuple != nullptr);
974 assert(key_tuple != nullptr);
975
976 /* We were given a record in KeyTupleFormat. First, save it to record */
977 const uint key_len = calculate_key_len(tbl, m_keyno, keypart_map);
978 key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
979
980 uint n_used_parts = my_count_bits(keypart_map);
981 if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0; // Full key is used
982
983 /* Then, convert the record into a mem-comparable form */
984 return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
985 false, 0, n_used_parts);
986 }
987
988 /**
989 @brief
990 Check if "unpack info" data includes checksum.
991
992 @detail
993 This is used only by CHECK TABLE to count the number of rows that have
994 checksums.
995 */
996
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)997 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
998 size_t size = unpack_info.size();
999 if (size == 0) {
1000 return false;
1001 }
1002 const uchar *ptr = (const uchar *)unpack_info.data();
1003
1004 // Skip unpack info if present.
1005 if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
1006 const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
1007 SHIP_ASSERT(size >= skip_len);
1008
1009 size -= skip_len;
1010 ptr += skip_len;
1011 }
1012
1013 return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
1014 }
1015
1016 /*
1017 @return Number of bytes that were changed
1018 */
successor(uchar * const packed_tuple,const uint len)1019 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
1020 assert(packed_tuple != nullptr);
1021
1022 int changed = 0;
1023 uchar *p = packed_tuple + len - 1;
1024 for (; p > packed_tuple; p--) {
1025 changed++;
1026 if (*p != uchar(0xFF)) {
1027 *p = *p + 1;
1028 break;
1029 }
1030 *p = '\0';
1031 }
1032 return changed;
1033 }
1034
1035 /*
1036 @return Number of bytes that were changed
1037 */
predecessor(uchar * const packed_tuple,const uint len)1038 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
1039 assert(packed_tuple != nullptr);
1040
1041 int changed = 0;
1042 uchar *p = packed_tuple + len - 1;
1043 for (; p > packed_tuple; p--) {
1044 changed++;
1045 if (*p != uchar(0x00)) {
1046 *p = *p - 1;
1047 break;
1048 }
1049 *p = 0xFF;
1050 }
1051 return changed;
1052 }
1053
1054 static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
1055 {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
1056 {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
1057
1058 /*
1059 @return The length in bytes of the header specified by the given tag
1060 */
get_unpack_header_size(char tag)1061 size_t Rdb_key_def::get_unpack_header_size(char tag) {
1062 assert(is_unpack_data_tag(tag));
1063 return UNPACK_HEADER_SIZES.at(tag);
1064 }
1065
1066 /*
1067 Get a bitmap indicating which varchar columns must be covered for this
1068 lookup to be covered. If the bitmap is a subset of the covered bitmap, then
1069 the lookup is covered. If it can already be determined that the lookup is
1070 not covered, map->bitmap will be set to null.
1071 */
get_lookup_bitmap(const TABLE * table,MY_BITMAP * map) const1072 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
1073 assert(map->bitmap == nullptr);
1074 bitmap_init(map, nullptr, MAX_REF_PARTS, false);
1075 uint curr_bitmap_pos = 0;
1076
1077 // Indicates which columns in the read set might be covered.
1078 MY_BITMAP maybe_covered_bitmap;
1079 bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
1080
1081 for (uint i = 0; i < m_key_parts; i++) {
1082 if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
1083 continue;
1084 }
1085
1086 Field *const field = m_pack_info[i].get_field_in_table(table);
1087
1088 // Columns which are always covered are not stored in the covered bitmap so
1089 // we can ignore them here too.
1090 if (m_pack_info[i].m_covered &&
1091 bitmap_is_set(table->read_set, field->field_index)) {
1092 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1093 continue;
1094 }
1095
1096 switch (field->real_type()) {
1097 // This type may be covered depending on the record. If it was requested,
1098 // we require the covered bitmap to have this bit set.
1099 case MYSQL_TYPE_VARCHAR:
1100 if (curr_bitmap_pos < MAX_REF_PARTS) {
1101 if (bitmap_is_set(table->read_set, field->field_index)) {
1102 bitmap_set_bit(map, curr_bitmap_pos);
1103 bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1104 }
1105 curr_bitmap_pos++;
1106 } else {
1107 bitmap_free(&maybe_covered_bitmap);
1108 bitmap_free(map);
1109 return;
1110 }
1111 break;
1112 // This column is a type which is never covered. If it was requested, we
1113 // know this lookup will never be covered.
1114 default:
1115 if (bitmap_is_set(table->read_set, field->field_index)) {
1116 bitmap_free(&maybe_covered_bitmap);
1117 bitmap_free(map);
1118 return;
1119 }
1120 break;
1121 }
1122 }
1123
1124 // If there are columns which are not covered in the read set, the lookup
1125 // can't be covered.
1126 if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
1127 bitmap_free(map);
1128 }
1129 bitmap_free(&maybe_covered_bitmap);
1130 }
1131
1132 /*
1133 Return true if for this secondary index
1134 - All of the requested columns are in the index
1135 - All values for columns that are prefix-only indexes are shorter or equal
1136 in length to the prefix
1137 */
covers_lookup(const rocksdb::Slice * const unpack_info,const MY_BITMAP * const lookup_bitmap) const1138 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
1139 const MY_BITMAP *const lookup_bitmap) const {
1140 assert(lookup_bitmap != nullptr);
1141 if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
1142 return false;
1143 }
1144
1145 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1146
1147 // Check if this unpack_info has a covered_bitmap
1148 const char *unpack_header = unp_reader.get_current_ptr();
1149 const bool has_covered_unpack_info =
1150 unp_reader.remaining_bytes() &&
1151 unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
1152 if (!has_covered_unpack_info ||
1153 !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
1154 return false;
1155 }
1156
1157 MY_BITMAP covered_bitmap;
1158 my_bitmap_map covered_bits;
1159 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1160 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1161 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1162 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1163
1164 return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
1165 }
1166
1167 /* Indicates that all key parts can be unpacked to cover a secondary lookup */
can_cover_lookup() const1168 bool Rdb_key_def::can_cover_lookup() const {
1169 for (uint i = 0; i < m_key_parts; i++) {
1170 if (!m_pack_info[i].m_covered) return false;
1171 }
1172 return true;
1173 }
1174
pack_field(Field * const field,Rdb_field_packing * pack_info,uchar * tuple,uchar * const packed_tuple,uchar * const pack_buffer,Rdb_string_writer * const unpack_info,uint * const n_null_fields) const1175 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
1176 uchar *tuple, uchar *const packed_tuple,
1177 uchar *const pack_buffer,
1178 Rdb_string_writer *const unpack_info,
1179 uint *const n_null_fields) const {
1180 if (field->real_maybe_null()) {
1181 assert(is_storage_available(tuple - packed_tuple, 1));
1182 if (field->is_real_null()) {
1183 /* NULL value. store '\0' so that it sorts before non-NULL values */
1184 *tuple++ = 0;
1185 /* That's it, don't store anything else */
1186 if (n_null_fields) (*n_null_fields)++;
1187 return tuple;
1188 } else {
1189 /* Not a NULL value. Store '1' */
1190 *tuple++ = 1;
1191 }
1192 }
1193
1194 const bool create_unpack_info =
1195 (unpack_info && // we were requested to generate unpack_info
1196 pack_info->uses_unpack_info()); // and this keypart uses it
1197 Rdb_pack_field_context pack_ctx(unpack_info);
1198
1199 // Set the offset for methods which do not take an offset as an argument
1200 assert(
1201 is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
1202
1203 (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
1204
1205 /* Make "unpack info" to be stored in the value */
1206 if (create_unpack_info) {
1207 (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
1208 &pack_ctx);
1209 }
1210
1211 return tuple;
1212 }
1213
1214 /**
1215 Get index columns from the record and pack them into mem-comparable form.
1216
1217 @param
1218 tbl Table we're working on
1219 record IN Record buffer with fields in table->record format
1220 pack_buffer IN Temporary area for packing varchars. The size is
1221 at least max_storage_fmt_length() bytes.
1222 packed_tuple OUT Key in the mem-comparable form
1223 unpack_info OUT Unpack data
1224 unpack_info_len OUT Unpack data length
1225 n_key_parts Number of keyparts to process. 0 means all of them.
1226 n_null_fields OUT Number of key fields with NULL value.
1227 ttl_bytes IN Previous ttl bytes from old record for update case or
1228 current ttl bytes from just packed primary key/value
1229 @detail
1230 Some callers do not need the unpack information, they can pass
1231 unpack_info=nullptr, unpack_info_len=nullptr.
1232
1233 @return
1234 Length of the packed tuple
1235 */
1236
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool should_store_row_debug_checksums,const longlong hidden_pk_id,uint n_key_parts,uint * const n_null_fields,const char * const ttl_bytes) const1237 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
1238 const uchar *const record,
1239 uchar *const packed_tuple,
1240 Rdb_string_writer *const unpack_info,
1241 const bool should_store_row_debug_checksums,
1242 const longlong hidden_pk_id, uint n_key_parts,
1243 uint *const n_null_fields,
1244 const char *const ttl_bytes) const {
1245 assert(tbl != nullptr);
1246 assert(pack_buffer != nullptr);
1247 assert(record != nullptr);
1248 assert(packed_tuple != nullptr);
1249 // Checksums for PKs are made when record is packed.
1250 // We should never attempt to make checksum just from PK values
1251 assert_IMP(should_store_row_debug_checksums,
1252 (m_index_type == INDEX_TYPE_SECONDARY));
1253
1254 uchar *tuple = packed_tuple;
1255 size_t unpack_start_pos = size_t(-1);
1256 size_t unpack_len_pos = size_t(-1);
1257 size_t covered_bitmap_pos = size_t(-1);
1258 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1259
1260 rdb_netbuf_store_index(tuple, m_index_number);
1261 tuple += INDEX_NUMBER_SIZE;
1262
1263 // If n_key_parts is 0, it means all columns.
1264 // The following includes the 'extended key' tail.
1265 // The 'extended key' includes primary key. This is done to 'uniqify'
1266 // non-unique indexes
1267 const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1268
1269 // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1270 // hidden key part. So we skip it (its always 1 part).
1271 if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
1272 n_key_parts = m_key_parts - 1;
1273 } else if (use_all_columns) {
1274 n_key_parts = m_key_parts;
1275 }
1276
1277 if (n_null_fields) *n_null_fields = 0;
1278
1279 // Check if we need a covered bitmap. If it is certain that all key parts are
1280 // covering, we don't need one.
1281 bool store_covered_bitmap = false;
1282 if (unpack_info && use_covered_bitmap_format()) {
1283 for (uint i = 0; i < n_key_parts; i++) {
1284 if (!m_pack_info[i].m_covered) {
1285 store_covered_bitmap = true;
1286 break;
1287 }
1288 }
1289 }
1290
1291 const char tag =
1292 store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1293
1294 if (unpack_info) {
1295 unpack_info->clear();
1296
1297 if (m_index_type == INDEX_TYPE_SECONDARY &&
1298 m_total_index_flags_length > 0) {
1299 // Reserve space for index flag fields
1300 unpack_info->allocate(m_total_index_flags_length);
1301
1302 // Insert TTL timestamp
1303 if (has_ttl() && ttl_bytes) {
1304 write_index_flag_field(unpack_info,
1305 reinterpret_cast<const uchar *>(ttl_bytes),
1306 Rdb_key_def::TTL_FLAG);
1307 }
1308 }
1309
1310 unpack_start_pos = unpack_info->get_current_pos();
1311 unpack_info->write_uint8(tag);
1312 unpack_len_pos = unpack_info->get_current_pos();
1313 // we don't know the total length yet, so write a zero
1314 unpack_info->write_uint16(0);
1315
1316 if (store_covered_bitmap) {
1317 // Reserve two bytes for the covered bitmap. This will store, for key
1318 // parts which are not always covering, whether or not it is covering
1319 // for this record.
1320 covered_bitmap_pos = unpack_info->get_current_pos();
1321 unpack_info->write_uint16(0);
1322 }
1323 }
1324
1325 MY_BITMAP covered_bitmap;
1326 my_bitmap_map covered_bits;
1327 uint curr_bitmap_pos = 0;
1328 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1329
1330 for (uint i = 0; i < n_key_parts; i++) {
1331 // Fill hidden pk id into the last key part for secondary keys for tables
1332 // with no pk
1333 if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1334 m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1335 break;
1336 }
1337
1338 Field *const field = m_pack_info[i].get_field_in_table(tbl);
1339 assert(field != nullptr);
1340
1341 uint field_offset = field->ptr - tbl->record[0];
1342 uint null_offset = field->null_offset(tbl->record[0]);
1343 bool maybe_null = field->real_maybe_null();
1344
1345 field->move_field(
1346 const_cast<uchar *>(record) + field_offset,
1347 maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
1348 field->null_bit);
1349 // WARNING! Don't return without restoring field->ptr and field->null_ptr
1350
1351 tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1352 unpack_info, n_null_fields);
1353
1354 // If this key part is a prefix of a VARCHAR field, check if it's covered.
1355 if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1356 !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1357 size_t data_length = field->data_length();
1358 uint16 key_length;
1359 if (m_pk_part_no[i] == (uint)-1) {
1360 key_length = tbl->key_info[get_keyno()].key_part[i].length;
1361 } else {
1362 key_length =
1363 tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1364 }
1365
1366 if (m_pack_info[i].m_unpack_func != nullptr &&
1367 data_length <= key_length) {
1368 bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1369 }
1370 curr_bitmap_pos++;
1371 }
1372
1373 // Restore field->ptr and field->null_ptr
1374 field->move_field(tbl->record[0] + field_offset,
1375 maybe_null ? tbl->record[0] + null_offset : nullptr,
1376 field->null_bit);
1377 }
1378
1379 if (unpack_info) {
1380 const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1381 assert(len <= std::numeric_limits<uint16_t>::max());
1382
1383 // Don't store the unpack_info if it has only the header (that is, there's
1384 // no meaningful content).
1385 // Primary Keys are special: for them, store the unpack_info even if it's
1386 // empty (provided m_maybe_unpack_info==true, see
1387 // ha_rocksdb::convert_record_to_storage_format)
1388 if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1389 if (len == get_unpack_header_size(tag) && !covered_bits) {
1390 unpack_info->truncate(unpack_start_pos);
1391 } else if (store_covered_bitmap) {
1392 unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1393 }
1394 } else {
1395 unpack_info->write_uint16_at(unpack_len_pos, len);
1396 }
1397
1398 //
1399 // Secondary keys have key and value checksums in the value part
1400 // Primary key is a special case (the value part has non-indexed columns),
1401 // so the checksums are computed and stored by
1402 // ha_rocksdb::convert_record_to_storage_format
1403 //
1404 if (should_store_row_debug_checksums) {
1405 const ha_checksum key_crc32 =
1406 my_checksum(0, packed_tuple, tuple - packed_tuple);
1407 const ha_checksum val_crc32 =
1408 my_checksum(0, unpack_info->ptr(), unpack_info->get_current_pos());
1409
1410 unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1411 unpack_info->write_uint32(key_crc32);
1412 unpack_info->write_uint32(val_crc32);
1413 }
1414 }
1415
1416 assert(is_storage_available(tuple - packed_tuple, 0));
1417
1418 return tuple - packed_tuple;
1419 }
1420
1421 /**
1422 Pack the hidden primary key into mem-comparable form.
1423
1424 @param
1425 tbl Table we're working on
1426 hidden_pk_id IN New value to be packed into key
1427 packed_tuple OUT Key in the mem-comparable form
1428
1429 @return
1430 Length of the packed tuple
1431 */
1432
pack_hidden_pk(const longlong hidden_pk_id,uchar * const packed_tuple) const1433 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
1434 uchar *const packed_tuple) const {
1435 assert(packed_tuple != nullptr);
1436
1437 uchar *tuple = packed_tuple;
1438 rdb_netbuf_store_index(tuple, m_index_number);
1439 tuple += INDEX_NUMBER_SIZE;
1440 assert(m_key_parts == 1);
1441 assert(is_storage_available(tuple - packed_tuple,
1442 m_pack_info[0].m_max_image_len));
1443
1444 m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1445
1446 assert(is_storage_available(tuple - packed_tuple, 0));
1447 return tuple - packed_tuple;
1448 }
1449
1450 /**
1451 Function of type rdb_index_field_pack_t
1452
1453 The following code (Rdb_key_def::pack_* and dependent functions) is pulled
1454 directly from ./sql/field.cc from all of the various
1455 Field_*::make_sort_key() functions. These results of these functions within
1456 the server code was never intended to be persisted and as such the encoding
1457 and comparison can change over time without any notice. To protect us from
1458 such an event as well as to ensure binary upgrade compatibility, we have
1459 copied that code here so that it is entirely within our control.
1460 */
1461
1462 #if !defined(DBL_EXP_DIG)
1463 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
1464 #endif
1465
change_double_for_sort(double nr,uchar * to)1466 static void change_double_for_sort(double nr, uchar *to) {
1467 uchar *tmp = to;
1468 if (nr == 0.0) { /* Change to zero string */
1469 tmp[0] = (uchar)128;
1470 memset(tmp + 1, 0, sizeof(nr) - 1);
1471 } else {
1472 #ifdef WORDS_BIGENDIAN
1473 memcpy(tmp, &nr, sizeof(nr));
1474 #else
1475 {
1476 uchar *ptr = (uchar *)&nr;
1477 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1478 tmp[0] = ptr[3];
1479 tmp[1] = ptr[2];
1480 tmp[2] = ptr[1];
1481 tmp[3] = ptr[0];
1482 tmp[4] = ptr[7];
1483 tmp[5] = ptr[6];
1484 tmp[6] = ptr[5];
1485 tmp[7] = ptr[4];
1486 #else
1487 tmp[0] = ptr[7];
1488 tmp[1] = ptr[6];
1489 tmp[2] = ptr[5];
1490 tmp[3] = ptr[4];
1491 tmp[4] = ptr[3];
1492 tmp[5] = ptr[2];
1493 tmp[6] = ptr[1];
1494 tmp[7] = ptr[0];
1495 #endif
1496 }
1497 #endif
1498 if (tmp[0] & 128) /* Negative */
1499 { /* make complement */
1500 uint i;
1501 for (i = 0; i < sizeof(nr); i++) tmp[i] = tmp[i] ^ (uchar)255;
1502 } else { /* Set high and move exponent one up */
1503 ushort exp_part =
1504 (((ushort)tmp[0] << 8) | (ushort)tmp[1] | (ushort)32768);
1505 exp_part += (ushort)1 << (16 - 1 - DBL_EXP_DIG);
1506 tmp[0] = (uchar)(exp_part >> 8);
1507 tmp[1] = (uchar)exp_part;
1508 }
1509 }
1510 }
1511
1512 /**
1513 Copies an integer value to a format comparable with memcmp(). The
1514 format is characterized by the following:
1515
1516 - The sign bit goes first and is unset for negative values.
1517 - The representation is big endian.
1518
1519 The function template can be instantiated to copy from little or
1520 big endian values.
1521
1522 @tparam Is_big_endian True if the source integer is big endian.
1523
1524 @param to Where to write the integer.
1525 @param to_length Size in bytes of the destination buffer.
1526 @param from Where to read the integer.
1527 @param from_length Size in bytes of the source integer
1528 @param is_unsigned True if the source integer is an unsigned value.
1529 */
1530 template <bool Is_big_endian>
copy_integer(uchar * to,size_t to_length,const uchar * from,size_t from_length,bool is_unsigned)1531 void copy_integer(uchar *to, size_t to_length, const uchar *from,
1532 size_t from_length, bool is_unsigned) {
1533 if (Is_big_endian) {
1534 if (is_unsigned)
1535 to[0] = from[0];
1536 else
1537 to[0] = (char)(from[0] ^ 128); // Reverse the sign bit.
1538 memcpy(to + 1, from + 1, to_length - 1);
1539 } else {
1540 const int sign_byte = from[from_length - 1];
1541 if (is_unsigned)
1542 to[0] = sign_byte;
1543 else
1544 to[0] = static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
1545 for (size_t i = 1, j = from_length - 2; i < to_length; ++i, --j)
1546 to[i] = from[j];
1547 }
1548 }
1549
pack_tiny(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1550 void Rdb_key_def::pack_tiny(
1551 Rdb_field_packing *const fpi, Field *const field,
1552 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1553 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1554 assert(fpi != nullptr);
1555 assert(field != nullptr);
1556 assert(dst != nullptr);
1557 assert(*dst != nullptr);
1558 assert(field->real_type() == MYSQL_TYPE_TINY);
1559
1560 const size_t length = fpi->m_max_image_len;
1561 const uchar *ptr = field->ptr;
1562 const bool unsigned_flag =
1563 dynamic_cast<Field_num *const>(field)->unsigned_flag;
1564 uchar *to = *dst;
1565
1566 assert(length >= 1);
1567 if (unsigned_flag)
1568 *to = *ptr;
1569 else
1570 to[0] = (char)(ptr[0] ^ (uchar)128); /* Reverse signbit */
1571
1572 *dst += length;
1573 }
1574
pack_short(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1575 void Rdb_key_def::pack_short(
1576 Rdb_field_packing *const fpi, Field *const field,
1577 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1578 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1579 assert(fpi != nullptr);
1580 assert(field != nullptr);
1581 assert(dst != nullptr);
1582 assert(*dst != nullptr);
1583 assert(field->real_type() == MYSQL_TYPE_SHORT);
1584
1585 const size_t length = fpi->m_max_image_len;
1586 const uchar *ptr = field->ptr;
1587 const bool unsigned_flag =
1588 dynamic_cast<Field_num *const>(field)->unsigned_flag;
1589 uchar *to = *dst;
1590
1591 assert(length >= 2);
1592 #ifdef WORDS_BIGENDIAN
1593 if (!field->table->s->db_low_byte_first) {
1594 if (unsigned_flag)
1595 to[0] = ptr[0];
1596 else
1597 to[0] = (char)(ptr[0] ^ 128); /* Revers signbit */
1598 to[1] = ptr[1];
1599 } else
1600 #endif
1601 {
1602 if (unsigned_flag)
1603 to[0] = ptr[1];
1604 else
1605 to[0] = (char)(ptr[1] ^ 128); /* Revers signbit */
1606 to[1] = ptr[0];
1607 }
1608
1609 *dst += length;
1610 }
1611
pack_medium(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1612 void Rdb_key_def::pack_medium(
1613 Rdb_field_packing *const fpi, Field *const field,
1614 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1615 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1616 assert(fpi != nullptr);
1617 assert(field != nullptr);
1618 assert(dst != nullptr);
1619 assert(*dst != nullptr);
1620 assert(field->real_type() == MYSQL_TYPE_INT24);
1621
1622 const size_t length = fpi->m_max_image_len;
1623 const uchar *ptr = field->ptr;
1624 const bool unsigned_flag =
1625 dynamic_cast<Field_num *const>(field)->unsigned_flag;
1626 uchar *to = *dst;
1627
1628 assert(length >= 3);
1629 if (unsigned_flag)
1630 to[0] = ptr[2];
1631 else
1632 to[0] = (uchar)(ptr[2] ^ 128); /* Revers signbit */
1633 to[1] = ptr[1];
1634 to[2] = ptr[0];
1635
1636 *dst += length;
1637 }
1638
pack_long(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1639 void Rdb_key_def::pack_long(
1640 Rdb_field_packing *const fpi, Field *const field,
1641 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1642 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1643 assert(fpi != nullptr);
1644 assert(field != nullptr);
1645 assert(dst != nullptr);
1646 assert(*dst != nullptr);
1647 assert(field->real_type() == MYSQL_TYPE_LONG);
1648
1649 const size_t length = fpi->m_max_image_len;
1650 const uchar *ptr = field->ptr;
1651 const bool unsigned_flag =
1652 dynamic_cast<Field_num *const>(field)->unsigned_flag;
1653 uchar *to = *dst;
1654
1655 assert(length >= 4);
1656 #ifdef WORDS_BIGENDIAN
1657 if (!field->table->s->db_low_byte_first) {
1658 if (unsigned_flag)
1659 to[0] = ptr[0];
1660 else
1661 dst[0] = (char)(ptr[0] ^ 128); /* Revers signbit */
1662 to[1] = ptr[1];
1663 to[2] = ptr[2];
1664 to[3] = ptr[3];
1665 } else
1666 #endif
1667 {
1668 if (unsigned_flag)
1669 to[0] = ptr[3];
1670 else
1671 to[0] = (char)(ptr[3] ^ 128); /* Revers signbit */
1672 to[1] = ptr[2];
1673 to[2] = ptr[1];
1674 to[3] = ptr[0];
1675 }
1676
1677 *dst += length;
1678 }
1679
pack_longlong(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1680 void Rdb_key_def::pack_longlong(
1681 Rdb_field_packing *const fpi, Field *const field,
1682 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1683 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1684 assert(fpi != nullptr);
1685 assert(field != nullptr);
1686 assert(dst != nullptr);
1687 assert(*dst != nullptr);
1688 assert(field->real_type() == MYSQL_TYPE_LONGLONG);
1689
1690 static const int PACK_LENGTH = 8;
1691 const size_t length = fpi->m_max_image_len;
1692 const uchar *ptr = field->ptr;
1693 const bool unsigned_flag =
1694 dynamic_cast<Field_num *const>(field)->unsigned_flag;
1695 uchar *to = *dst;
1696
1697 const size_t from_length = PACK_LENGTH;
1698 const size_t to_length = from_length > length ? from_length : length;
1699 #ifdef WORDS_BIGENDIAN
1700 if (field->table == NULL || !field->table->s->db_low_byte_first)
1701 copy_integer<true>(to, to_length, ptr, from_length, unsigned_flag);
1702 else
1703 #endif
1704 copy_integer<false>(to, to_length, ptr, from_length, unsigned_flag);
1705
1706 *dst += length;
1707 }
1708
pack_double(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1709 void Rdb_key_def::pack_double(
1710 Rdb_field_packing *const fpi, Field *const field,
1711 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1712 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1713 assert(fpi != nullptr);
1714 assert(field != nullptr);
1715 assert(dst != nullptr);
1716 assert(*dst != nullptr);
1717 assert(field->real_type() == MYSQL_TYPE_DOUBLE);
1718
1719 const size_t length = fpi->m_max_image_len;
1720 const uchar *ptr = field->ptr;
1721 uchar *to = *dst;
1722
1723 double nr;
1724 #ifdef WORDS_BIGENDIAN
1725 if (field->table->s->db_low_byte_first) {
1726 float8get(&nr, ptr);
1727 } else
1728 #endif
1729 doubleget(&nr, ptr);
1730 if (length < 8) {
1731 uchar buff[8];
1732 change_double_for_sort(nr, buff);
1733 memcpy(to, buff, length);
1734 } else
1735 change_double_for_sort(nr, to);
1736
1737 *dst += length;
1738 }
1739
1740 #if !defined(FLT_EXP_DIG)
1741 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
1742 #endif
1743
pack_float(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1744 void Rdb_key_def::pack_float(
1745 Rdb_field_packing *const fpi, Field *const field,
1746 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1747 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1748 assert(fpi != nullptr);
1749 assert(field != nullptr);
1750 assert(dst != nullptr);
1751 assert(*dst != nullptr);
1752 assert(field->real_type() == MYSQL_TYPE_FLOAT);
1753
1754 const size_t length = fpi->m_max_image_len;
1755 const uchar *ptr = field->ptr;
1756 uchar *to = *dst;
1757
1758 assert(length == sizeof(float));
1759 float nr;
1760
1761 #ifdef WORDS_BIGENDIAN
1762 if (field->table->s->db_low_byte_first) {
1763 float4get(&nr, ptr);
1764 } else
1765 #endif
1766 memcpy(&nr, ptr, length < sizeof(float) ? length : sizeof(float));
1767
1768 uchar *tmp = to;
1769 if (nr == (float)0.0) { /* Change to zero string */
1770 tmp[0] = (uchar)128;
1771 memset(tmp + 1, 0, length < sizeof(nr) - 1 ? length : sizeof(nr) - 1);
1772 } else {
1773 #ifdef WORDS_BIGENDIAN
1774 memcpy(tmp, &nr, sizeof(nr));
1775 #else
1776 tmp[0] = ptr[3];
1777 tmp[1] = ptr[2];
1778 tmp[2] = ptr[1];
1779 tmp[3] = ptr[0];
1780 #endif
1781 if (tmp[0] & 128) /* Negative */
1782 { /* make complement */
1783 uint i;
1784 for (i = 0; i < sizeof(nr); i++) tmp[i] = (uchar)(tmp[i] ^ (uchar)255);
1785 } else {
1786 ushort exp_part =
1787 (((ushort)tmp[0] << 8) | (ushort)tmp[1] | (ushort)32768);
1788 exp_part += (ushort)1 << (16 - 1 - FLT_EXP_DIG);
1789 tmp[0] = (uchar)(exp_part >> 8);
1790 tmp[1] = (uchar)exp_part;
1791 }
1792 }
1793
1794 *dst += length;
1795 }
1796
pack_new_decimal(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1797 void Rdb_key_def::pack_new_decimal(
1798 Rdb_field_packing *const fpi, Field *const field,
1799 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1800 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1801 assert(fpi != nullptr);
1802 assert(field != nullptr);
1803 assert(dst != nullptr);
1804 assert(*dst != nullptr);
1805 assert(field->real_type() == MYSQL_TYPE_NEWDECIMAL);
1806
1807 const size_t length = fpi->m_max_image_len;
1808 const uchar *ptr = field->ptr;
1809 uchar *to = *dst;
1810 Field_new_decimal *const fnd = dynamic_cast<Field_new_decimal *>(field);
1811
1812 memcpy(to, ptr, length < fnd->bin_size ? length : fnd->bin_size);
1813
1814 *dst += length;
1815 }
1816
pack_datetime2(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1817 void Rdb_key_def::pack_datetime2(
1818 Rdb_field_packing *const fpi, Field *const field,
1819 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1820 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1821 assert(fpi != nullptr);
1822 assert(field != nullptr);
1823 assert(dst != nullptr);
1824 assert(*dst != nullptr);
1825 assert(field->real_type() == MYSQL_TYPE_DATETIME2);
1826
1827 const size_t length = fpi->m_max_image_len;
1828 const uchar *ptr = field->ptr;
1829 uchar *to = *dst;
1830
1831 memcpy(to, ptr, length);
1832
1833 *dst += length;
1834 }
1835
pack_timestamp2(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1836 void Rdb_key_def::pack_timestamp2(
1837 Rdb_field_packing *const fpi, Field *const field,
1838 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1839 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1840 assert(fpi != nullptr);
1841 assert(field != nullptr);
1842 assert(dst != nullptr);
1843 assert(*dst != nullptr);
1844 assert(field->real_type() == MYSQL_TYPE_TIMESTAMP2);
1845
1846 const size_t length = fpi->m_max_image_len;
1847 const uchar *ptr = field->ptr;
1848 uchar *to = *dst;
1849
1850 memcpy(to, ptr, length);
1851
1852 *dst += length;
1853 }
1854
pack_time2(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1855 void Rdb_key_def::pack_time2(
1856 Rdb_field_packing *const fpi, Field *const field,
1857 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1858 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1859 assert(fpi != nullptr);
1860 assert(field != nullptr);
1861 assert(dst != nullptr);
1862 assert(*dst != nullptr);
1863 assert(field->real_type() == MYSQL_TYPE_TIME2);
1864
1865 const size_t length = fpi->m_max_image_len;
1866 const uchar *ptr = field->ptr;
1867 uchar *to = *dst;
1868
1869 memcpy(to, ptr, length);
1870
1871 *dst += length;
1872 }
1873
pack_year(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1874 void Rdb_key_def::pack_year(
1875 Rdb_field_packing *const fpi, Field *const field,
1876 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1877 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1878 assert(fpi != nullptr);
1879 assert(field != nullptr);
1880 assert(dst != nullptr);
1881 assert(*dst != nullptr);
1882 assert(field->real_type() == MYSQL_TYPE_YEAR);
1883
1884 const size_t length = fpi->m_max_image_len;
1885 const uchar *ptr = field->ptr;
1886 const bool unsigned_flag =
1887 dynamic_cast<Field_num *const>(field)->unsigned_flag;
1888 uchar *to = *dst;
1889
1890 assert(length >= 1);
1891 if (unsigned_flag)
1892 *to = *ptr;
1893 else
1894 to[0] = (char)(ptr[0] ^ (uchar)128); /* Reverse signbit */
1895
1896 *dst += length;
1897 }
1898
pack_newdate(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1899 void Rdb_key_def::pack_newdate(
1900 Rdb_field_packing *const fpi, Field *const field,
1901 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1902 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1903 assert(fpi != nullptr);
1904 assert(field != nullptr);
1905 assert(dst != nullptr);
1906 assert(*dst != nullptr);
1907 assert(field->real_type() == MYSQL_TYPE_NEWDATE);
1908
1909 const size_t length = fpi->m_max_image_len;
1910 const uchar *ptr = field->ptr;
1911 uchar *to = *dst;
1912
1913 assert(length >= 3);
1914 to[0] = ptr[2];
1915 to[1] = ptr[1];
1916 to[2] = ptr[0];
1917
1918 *dst += length;
1919 }
1920
pack_blob(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1921 void Rdb_key_def::pack_blob(
1922 Rdb_field_packing *const fpi, Field *const field,
1923 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1924 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1925 assert(fpi != nullptr);
1926 assert(field != nullptr);
1927 assert(dst != nullptr);
1928 assert(*dst != nullptr);
1929 assert(field->real_type() == MYSQL_TYPE_TINY_BLOB ||
1930 field->real_type() == MYSQL_TYPE_MEDIUM_BLOB ||
1931 field->real_type() == MYSQL_TYPE_LONG_BLOB ||
1932 field->real_type() == MYSQL_TYPE_BLOB ||
1933 field->real_type() == MYSQL_TYPE_JSON);
1934
1935 size_t length = fpi->m_max_image_len;
1936 const uchar *ptr = field->ptr;
1937 uchar *to = *dst;
1938 Field_blob *const field_blob = dynamic_cast<Field_blob *const>(field);
1939 const CHARSET_INFO *field_charset = field_blob->charset();
1940
1941 uchar *blob;
1942 size_t blob_length = field_blob->get_length();
1943
1944 if (!blob_length && field_charset->pad_char == 0) {
1945 memset(to, 0, length);
1946 } else {
1947 if (field_charset == &my_charset_bin) {
1948 uchar *pos;
1949
1950 /*
1951 Store length of blob last in blob to shorter blobs before longer blobs
1952 */
1953 length -= field_blob->pack_length_no_ptr();
1954 pos = to + length;
1955 uint key_length = blob_length < length ? blob_length : length;
1956
1957 switch (field_blob->pack_length_no_ptr()) {
1958 case 1:
1959 *pos = (char)key_length;
1960 break;
1961 case 2:
1962 mi_int2store(pos, key_length);
1963 break;
1964 case 3:
1965 mi_int3store(pos, key_length);
1966 break;
1967 case 4:
1968 mi_int4store(pos, key_length);
1969 break;
1970 }
1971 }
1972 memcpy(&blob, ptr + field_blob->pack_length_no_ptr(), sizeof(char *));
1973
1974 blob_length = field_charset->coll->strnxfrm(
1975 field_charset, to, length, length, blob, blob_length,
1976 MY_STRXFRM_PAD_WITH_SPACE | MY_STRXFRM_PAD_TO_MAXLEN);
1977 assert(blob_length == length);
1978 }
1979
1980 *dst += fpi->m_max_image_len;
1981 }
1982
1983 /**
1984 This is the end of the code copied from Field_*::make_sort_key()
1985 */
1986
pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1987 void Rdb_key_def::pack_with_make_sort_key(
1988 Rdb_field_packing *const fpi, Field *const field,
1989 uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1990 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1991 assert(fpi != nullptr);
1992 assert(field != nullptr);
1993 assert(dst != nullptr);
1994 assert(*dst != nullptr);
1995
1996 const int max_len = fpi->m_max_image_len;
1997 field->make_sort_key(*dst, max_len);
1998 *dst += max_len;
1999 }
2000
2001 /*
2002 Compares two keys without unpacking
2003
2004 @detail
2005 @return
2006 0 - Ok. column_index is the index of the first column which is different.
2007 -1 if two kes are equal
2008 1 - Data format error.
2009 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const2010 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
2011 const rocksdb::Slice *key2,
2012 std::size_t *const column_index) const {
2013 assert(key1 != nullptr);
2014 assert(key2 != nullptr);
2015 assert(column_index != nullptr);
2016
2017 // the caller should check the return value and
2018 // not rely on column_index being valid
2019 *column_index = 0xbadf00d;
2020
2021 Rdb_string_reader reader1(key1);
2022 Rdb_string_reader reader2(key2);
2023
2024 // Skip the index number
2025 auto indexp1 = reader1.read(INDEX_NUMBER_SIZE);
2026 if (!indexp1) return HA_EXIT_FAILURE;
2027
2028 auto indexp2 = reader2.read(INDEX_NUMBER_SIZE);
2029 if (!indexp2) return HA_EXIT_FAILURE;
2030
2031 // shouldn't compare with other index
2032 assert(memcmp(indexp1, indexp2, INDEX_NUMBER_SIZE) == 0);
2033
2034 for (uint i = 0; i < m_key_parts; i++) {
2035 const Rdb_field_packing *const fpi = &m_pack_info[i];
2036 if (fpi->m_field_maybe_null) {
2037 const auto nullp1 = reader1.read(1);
2038 const auto nullp2 = reader2.read(1);
2039
2040 if (nullp1 == nullptr || nullp2 == nullptr) {
2041 return HA_EXIT_FAILURE;
2042 }
2043
2044 if (*nullp1 != *nullp2) {
2045 *column_index = i;
2046 return HA_EXIT_SUCCESS;
2047 }
2048
2049 if (*nullp1 == 0) {
2050 /* This is a NULL value */
2051 continue;
2052 }
2053 }
2054
2055 const auto before_skip1 = reader1.get_current_ptr();
2056 const auto before_skip2 = reader2.get_current_ptr();
2057 assert(fpi->m_skip_func);
2058 if ((fpi->m_skip_func)(fpi, &reader1)) {
2059 return HA_EXIT_FAILURE;
2060 }
2061 if ((fpi->m_skip_func)(fpi, &reader2)) {
2062 return HA_EXIT_FAILURE;
2063 }
2064 const auto size1 = reader1.get_current_ptr() - before_skip1;
2065 const auto size2 = reader2.get_current_ptr() - before_skip2;
2066 if (size1 != size2) {
2067 *column_index = i;
2068 return HA_EXIT_SUCCESS;
2069 }
2070
2071 if (memcmp(before_skip1, before_skip2, size1) != 0) {
2072 *column_index = i;
2073 return HA_EXIT_SUCCESS;
2074 }
2075 }
2076
2077 *column_index = m_key_parts;
2078 return HA_EXIT_SUCCESS;
2079 }
2080
2081 /*
2082 @brief
2083 Given a zero-padded key, determine its real key length
2084
2085 @detail
2086 Fixed-size skip functions just read.
2087 */
2088
key_length(const TABLE * const table,const rocksdb::Slice & key) const2089 size_t Rdb_key_def::key_length(const TABLE *const table,
2090 const rocksdb::Slice &key) const {
2091 assert(table != nullptr);
2092
2093 Rdb_string_reader reader(&key);
2094
2095 if ((!reader.read(INDEX_NUMBER_SIZE))) {
2096 return size_t(-1);
2097 }
2098 for (uint i = 0; i < m_key_parts; i++) {
2099 const Rdb_field_packing *fpi = &m_pack_info[i];
2100 if ((fpi->m_skip_func)(fpi, &reader)) {
2101 return size_t(-1);
2102 }
2103 }
2104 return key.size() - reader.remaining_bytes();
2105 }
2106
2107 /*
2108 Take mem-comparable form and unpack_info and unpack it to Table->record
2109
2110 @detail
2111 not all indexes support this
2112
2113 @return
2114 HA_EXIT_SUCCESS OK
2115 other HA_ERR error code
2116 */
2117
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool verify_row_debug_checksums) const2118 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
2119 const rocksdb::Slice *const packed_key,
2120 const rocksdb::Slice *const unpack_info,
2121 const bool verify_row_debug_checksums) const {
2122 Rdb_string_reader reader(packed_key);
2123 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
2124
2125 // There is no checksuming data after unpack_info for primary keys, because
2126 // the layout there is different. The checksum is verified in
2127 // ha_rocksdb::convert_record_from_storage_format instead.
2128 assert_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
2129 !verify_row_debug_checksums);
2130
2131 // Skip the index number
2132 if ((unlikely(!reader.read(INDEX_NUMBER_SIZE)))) {
2133 return HA_ERR_ROCKSDB_CORRUPT_DATA;
2134 }
2135
2136 // For secondary keys, we expect the value field to contain index flags,
2137 // unpack data, and checksum data in that order. One or all can be missing,
2138 // but they cannot be reordered.
2139 if (unp_reader.remaining_bytes()) {
2140 if (m_index_type == INDEX_TYPE_SECONDARY &&
2141 m_total_index_flags_length > 0 &&
2142 !unp_reader.read(m_total_index_flags_length)) {
2143 return HA_ERR_ROCKSDB_CORRUPT_DATA;
2144 }
2145 }
2146
2147 const char *unpack_header = unp_reader.get_current_ptr();
2148 const bool has_unpack_info =
2149 unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
2150 if (has_unpack_info) {
2151 if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
2152 return HA_ERR_ROCKSDB_CORRUPT_DATA;
2153 }
2154 }
2155
2156 // Read the covered bitmap
2157 MY_BITMAP covered_bitmap;
2158 my_bitmap_map covered_bits;
2159 const bool has_covered_bitmap =
2160 has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
2161 if (has_covered_bitmap) {
2162 bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
2163 covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
2164 sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
2165 RDB_UNPACK_COVERED_DATA_LEN_SIZE);
2166 }
2167
2168 int err = HA_EXIT_SUCCESS;
2169
2170 Rdb_key_field_iterator iter(
2171 this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
2172 has_covered_bitmap ? &covered_bitmap : nullptr, buf);
2173 while (iter.has_next()) {
2174 err = iter.next();
2175 if (unlikely(err)) {
2176 return err;
2177 }
2178 }
2179
2180 /*
2181 Check checksum values if present
2182 */
2183 const char *ptr;
2184 if (unlikely((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG)) {
2185 if (verify_row_debug_checksums) {
2186 uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
2187 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
2188 const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
2189 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
2190
2191 const ha_checksum computed_key_chksum =
2192 my_checksum(0, (const uchar *)packed_key->data(), packed_key->size());
2193 const ha_checksum computed_val_chksum =
2194 my_checksum(0, (const uchar *)unpack_info->data(),
2195 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
2196
2197 DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
2198 stored_key_chksum++;);
2199
2200 if (stored_key_chksum != computed_key_chksum) {
2201 report_checksum_mismatch(true, packed_key->data(), packed_key->size());
2202 return HA_ERR_ROCKSDB_CORRUPT_DATA;
2203 }
2204
2205 if (stored_val_chksum != computed_val_chksum) {
2206 report_checksum_mismatch(false, unpack_info->data(),
2207 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
2208 return HA_ERR_ROCKSDB_CORRUPT_DATA;
2209 }
2210 } else {
2211 /* The checksums are present but we are not checking checksums */
2212 }
2213 }
2214
2215 if (unlikely(reader.remaining_bytes())) return HA_ERR_ROCKSDB_CORRUPT_DATA;
2216
2217 return HA_EXIT_SUCCESS;
2218 }
2219
table_has_hidden_pk(const TABLE * const table)2220 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
2221 return table->s->primary_key == MAX_INDEXES;
2222 }
2223
report_checksum_mismatch(const bool is_key,const char * const data,const size_t data_size) const2224 void Rdb_key_def::report_checksum_mismatch(const bool is_key,
2225 const char *const data,
2226 const size_t data_size) const {
2227 // NO_LINT_DEBUG
2228 sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
2229 is_key ? "key" : "value", get_index_number());
2230
2231 const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
2232 // NO_LINT_DEBUG
2233 sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
2234 (uint64_t)data_size, buf.c_str());
2235
2236 my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
2237 }
2238
index_format_min_check(const int pk_min,const int sk_min) const2239 bool Rdb_key_def::index_format_min_check(const int pk_min,
2240 const int sk_min) const {
2241 switch (m_index_type) {
2242 case INDEX_TYPE_PRIMARY:
2243 case INDEX_TYPE_HIDDEN_PRIMARY:
2244 return (m_kv_format_version >= pk_min);
2245 case INDEX_TYPE_SECONDARY:
2246 return (m_kv_format_version >= sk_min);
2247 default:
2248 assert(0);
2249 return false;
2250 }
2251 }
2252
2253 ///////////////////////////////////////////////////////////////////////////////////////////
2254 // Rdb_field_packing
2255 ///////////////////////////////////////////////////////////////////////////////////////////
2256
2257 /*
2258 Function of type rdb_index_field_skip_t
2259 */
2260
skip_max_length(const Rdb_field_packing * const fpi,Rdb_string_reader * const reader)2261 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
2262 Rdb_string_reader *const reader) {
2263 if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
2264 return HA_EXIT_SUCCESS;
2265 }
2266
2267 /*
2268 (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
2269 split in the middle of an UTF-8 character. See the implementation of
2270 unpack_binary_or_utf8_varchar.
2271 */
2272
2273 #define RDB_ESCAPE_LENGTH 9
2274 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
2275 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
2276 "RDB_ESCAPE_LENGTH-1 must be even.");
2277
2278 #define RDB_ENCODED_SIZE(len) \
2279 ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
2280 RDB_ESCAPE_LENGTH
2281
2282 #define RDB_LEGACY_ENCODED_SIZE(len) \
2283 ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
2284 RDB_LEGACY_ESCAPE_LENGTH
2285
2286 /*
2287 Function of type rdb_index_field_skip_t
2288 */
2289
skip_variable_length(const Rdb_field_packing * const fpi,Rdb_string_reader * const reader)2290 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
2291 Rdb_string_reader *const reader) {
2292 const uchar *ptr;
2293 bool finished = false;
2294
2295 /* How much data can be there */
2296 size_t dst_len = fpi->m_field_pack_length - fpi->m_varchar_length_bytes;
2297
2298 bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
2299
2300 /* Decode the length-emitted encoding here */
2301 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2302 uint used_bytes;
2303
2304 /* See pack_with_varchar_encoding. */
2305 if (use_legacy_format) {
2306 used_bytes = calc_unpack_legacy_variable_format(
2307 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2308 } else {
2309 used_bytes =
2310 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2311 }
2312
2313 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2314 return HA_EXIT_FAILURE; // Corruption in the data
2315 }
2316
2317 if (finished) {
2318 break;
2319 }
2320
2321 dst_len -= used_bytes;
2322 }
2323
2324 if (!finished) {
2325 return HA_EXIT_FAILURE;
2326 }
2327
2328 return HA_EXIT_SUCCESS;
2329 }
2330
2331 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
2332 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
2333 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
2334
2335 /*
2336 Skip a keypart that uses Variable-Length Space-Padded encoding
2337 */
2338
skip_variable_space_pad(const Rdb_field_packing * const fpi,Rdb_string_reader * const reader)2339 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
2340 Rdb_string_reader *const reader) {
2341 const uchar *ptr;
2342 bool finished = false;
2343
2344 /* How much data can be there */
2345 size_t dst_len = fpi->m_field_pack_length - fpi->m_varchar_length_bytes;
2346
2347 /* Decode the length-emitted encoding here */
2348 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2349 // See pack_with_varchar_space_pad
2350 const uchar c = ptr[fpi->m_segment_size - 1];
2351 if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
2352 // This is the last segment
2353 finished = true;
2354 break;
2355 } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
2356 c == VARCHAR_CMP_GREATER_THAN_SPACES) {
2357 // This is not the last segment
2358 if ((fpi->m_segment_size - 1) > dst_len) {
2359 // The segment is full of data but the table field can't hold that
2360 // much! This must be data corruption.
2361 return HA_EXIT_FAILURE;
2362 }
2363 dst_len -= (fpi->m_segment_size - 1);
2364 } else {
2365 // Encountered a value that's none of the VARCHAR_CMP* constants
2366 // It's data corruption.
2367 return HA_EXIT_FAILURE;
2368 }
2369 }
2370 return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
2371 }
2372
2373 /*
2374 Function of type rdb_index_field_unpack_t
2375 */
2376 template <int length>
unpack_integer(Rdb_field_packing * const fpi,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2377 int Rdb_key_def::unpack_integer(Rdb_field_packing *const fpi, uchar *const to,
2378 Rdb_string_reader *const reader,
2379 Rdb_string_reader *const unp_reader
2380 MY_ATTRIBUTE((__unused__))) {
2381 assert(length == fpi->m_max_image_len);
2382
2383 const uchar *from;
2384 if (!(from = (const uchar *)reader->read(length))) {
2385 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
2386 }
2387
2388 #ifdef WORDS_BIGENDIAN
2389 {
2390 if (fpi->m_field_unsigned_flag) {
2391 to[0] = from[0];
2392 } else {
2393 to[0] = static_cast<char>(from[0] ^ 128); // Reverse the sign bit.
2394 }
2395 /* Parameterized length should enable loop unrolling */
2396 for (int i = 1; i < length; i++) to[i] = from[i];
2397 }
2398 #else
2399 {
2400 const int sign_byte = from[0];
2401 if (fpi->m_field_unsigned_flag) {
2402 to[length - 1] = sign_byte;
2403 } else {
2404 to[length - 1] =
2405 static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
2406 }
2407
2408 /* Parameterized length should enable loop unrolling */
2409 for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
2410 }
2411 #endif
2412 return UNPACK_SUCCESS;
2413 }
2414
2415 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)2416 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
2417 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
2418 // A few systems store the most-significant _word_ first on little-endian
2419 dst[0] = src[3];
2420 dst[1] = src[2];
2421 dst[2] = src[1];
2422 dst[3] = src[0];
2423 dst[4] = src[7];
2424 dst[5] = src[6];
2425 dst[6] = src[5];
2426 dst[7] = src[4];
2427 #else
2428 dst[0] = src[7];
2429 dst[1] = src[6];
2430 dst[2] = src[5];
2431 dst[3] = src[4];
2432 dst[4] = src[3];
2433 dst[5] = src[2];
2434 dst[6] = src[1];
2435 dst[7] = src[0];
2436 #endif
2437 }
2438
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)2439 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
2440 dst[0] = src[3];
2441 dst[1] = src[2];
2442 dst[2] = src[1];
2443 dst[3] = src[0];
2444 }
2445 #else
2446 #define rdb_swap_double_bytes nullptr
2447 #define rdb_swap_float_bytes nullptr
2448 #endif
2449
unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t size,const int exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))2450 int Rdb_key_def::unpack_floating_point(
2451 uchar *const dst, Rdb_string_reader *const reader, const size_t size,
2452 const int exp_digit, const uchar *const zero_pattern,
2453 const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
2454 const uchar *const from = (const uchar *)reader->read(size);
2455 if (from == nullptr) {
2456 /* Mem-comparable image doesn't have enough bytes */
2457 return UNPACK_FAILURE;
2458 }
2459
2460 /* Check to see if the value is zero */
2461 if (memcmp(from, zero_pattern, size) == 0) {
2462 memcpy(dst, zero_val, size);
2463 return UNPACK_SUCCESS;
2464 }
2465
2466 #if defined(WORDS_BIGENDIAN)
2467 // On big-endian, output can go directly into result
2468 uchar *const tmp = dst;
2469 #else
2470 // Otherwise use a temporary buffer to make byte-swapping easier later
2471 uchar tmp[8];
2472 #endif
2473
2474 memcpy(tmp, from, size);
2475
2476 if (tmp[0] & 0x80) {
2477 // If the high bit is set the original value was positive so
2478 // remove the high bit and subtract one from the exponent.
2479 ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
2480 exp_part &= 0x7FFF; // clear high bit;
2481 exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent
2482 tmp[0] = (uchar)(exp_part >> 8);
2483 tmp[1] = (uchar)exp_part;
2484 } else {
2485 // Otherwise the original value was negative and all bytes have been
2486 // negated.
2487 for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
2488 }
2489
2490 #if !defined(WORDS_BIGENDIAN)
2491 // On little-endian, swap the bytes around
2492 swap_func(dst, tmp);
2493 #else
2494 assert(swap_func == nullptr);
2495 #endif
2496
2497 return UNPACK_SUCCESS;
2498 }
2499
2500 /*
2501 Function of type rdb_index_field_unpack_t
2502
2503 Unpack a double by doing the reverse action of change_double_for_sort
2504 (sql/filesort.cc). Note that this only works on IEEE values.
2505 Note also that this code assumes that NaN and +/-Infinity are never
2506 allowed in the database.
2507 */
unpack_double(Rdb_field_packing * const fpi MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2508 int Rdb_key_def::unpack_double(
2509 Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
2510 uchar *const field_ptr, Rdb_string_reader *const reader,
2511 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2512 static double zero_val = 0.0;
2513 static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
2514
2515 return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
2516 zero_pattern, (const uchar *)&zero_val,
2517 rdb_swap_double_bytes);
2518 }
2519
2520 /*
2521 Function of type rdb_index_field_unpack_t
2522
2523 Unpack a float by doing the reverse action of Field_float::make_sort_key
2524 (sql/field.cc). Note that this only works on IEEE values.
2525 Note also that this code assumes that NaN and +/-Infinity are never
2526 allowed in the database.
2527 */
unpack_float(Rdb_field_packing * const fpi,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2528 int Rdb_key_def::unpack_float(
2529 Rdb_field_packing *const fpi, uchar *const field_ptr,
2530 Rdb_string_reader *const reader,
2531 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2532 static float zero_val = 0.0;
2533 static const uchar zero_pattern[4] = {128, 0, 0, 0};
2534
2535 return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
2536 zero_pattern, (const uchar *)&zero_val,
2537 rdb_swap_float_bytes);
2538 }
2539
2540 /*
2541 Function of type rdb_index_field_unpack_t used to
2542 Unpack by doing the reverse action to Field_newdate::make_sort_key.
2543 */
2544
unpack_newdate(Rdb_field_packing * const fpi,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2545 int Rdb_key_def::unpack_newdate(
2546 Rdb_field_packing *const fpi, uchar *const field_ptr,
2547 Rdb_string_reader *const reader,
2548 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2549 const char *from;
2550 assert(fpi->m_max_image_len == 3);
2551
2552 if (!(from = reader->read(3))) {
2553 /* Mem-comparable image doesn't have enough bytes */
2554 return UNPACK_FAILURE;
2555 }
2556
2557 field_ptr[0] = from[2];
2558 field_ptr[1] = from[1];
2559 field_ptr[2] = from[0];
2560 return UNPACK_SUCCESS;
2561 }
2562
2563 /*
2564 Function of type rdb_index_field_unpack_t, used to
2565 Unpack the string by copying it over.
2566 This is for BINARY(n) where the value occupies the whole length.
2567 */
2568
unpack_binary_str(Rdb_field_packing * const fpi,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2569 int Rdb_key_def::unpack_binary_str(
2570 Rdb_field_packing *const fpi, uchar *const to,
2571 Rdb_string_reader *const reader,
2572 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2573 const char *from;
2574 if (!(from = reader->read(fpi->m_max_image_len))) {
2575 /* Mem-comparable image doesn't have enough bytes */
2576 return UNPACK_FAILURE;
2577 }
2578
2579 memcpy(to, from, fpi->m_max_image_len);
2580 return UNPACK_SUCCESS;
2581 }
2582
2583 /*
2584 Function of type rdb_index_field_unpack_t.
2585 For UTF-8, we need to convert 2-byte wide-character entities back into
2586 UTF8 sequences.
2587 */
2588
unpack_utf8_str(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2589 int Rdb_key_def::unpack_utf8_str(
2590 Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
2591 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2592 my_core::CHARSET_INFO *const cset =
2593 (my_core::CHARSET_INFO *)fpi->m_field_charset;
2594 const uchar *src;
2595 if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
2596 /* Mem-comparable image doesn't have enough bytes */
2597 return UNPACK_FAILURE;
2598 }
2599
2600 const uchar *const src_end = src + fpi->m_max_image_len;
2601 uchar *const dst_end = dst + fpi->m_field_pack_length;
2602
2603 while (src < src_end) {
2604 my_wc_t wc = (src[0] << 8) | src[1];
2605 src += 2;
2606 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2607 assert(res > 0);
2608 assert(res <= 3);
2609 if (res < 0) return UNPACK_FAILURE;
2610 dst += res;
2611 }
2612
2613 cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
2614 cset->pad_char);
2615 return UNPACK_SUCCESS;
2616 }
2617
2618 /*
2619 This is the original algorithm to encode a variable binary field. It
2620 sets a flag byte every Nth byte. The flag value is (255 - #pad) where
2621 #pad is the number of padding bytes that were needed (0 if all N-1
2622 bytes were used).
2623
2624 If N=8 and the field is:
2625 * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2626 * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2627 And the 4 byte string compares as greater than the 3 byte string
2628
2629 Unfortunately the algorithm has a flaw. If the input is exactly a
2630 multiple of N-1, an extra N bytes are written. Since we usually use
2631 N=9, an 8 byte input will generate 18 bytes of output instead of the
2632 9 bytes of output that is optimal.
2633
2634 See pack_variable_format for the newer algorithm.
2635 */
pack_legacy_variable_format(const uchar * src,size_t src_len,uchar ** dst)2636 void Rdb_key_def::pack_legacy_variable_format(
2637 const uchar *src, // The data to encode
2638 size_t src_len, // The length of the data to encode
2639 uchar **dst) // The location to encode the data
2640 {
2641 size_t copy_len;
2642 size_t padding_bytes;
2643 uchar *ptr = *dst;
2644
2645 do {
2646 copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2647 padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2648 memcpy(ptr, src, copy_len);
2649 ptr += copy_len;
2650 src += copy_len;
2651 // pad with zeros if necessary
2652 if (padding_bytes > 0) {
2653 memset(ptr, 0, padding_bytes);
2654 ptr += padding_bytes;
2655 }
2656
2657 *(ptr++) = 255 - padding_bytes;
2658
2659 src_len -= copy_len;
2660 } while (padding_bytes == 0);
2661
2662 *dst = ptr;
2663 }
2664
2665 /*
2666 This is the new algorithm. Similarly to the legacy format the input
2667 is split up into N-1 bytes and a flag byte is used as the Nth byte
2668 in the output.
2669
2670 - If the previous segment needed any padding the flag is set to the
2671 number of bytes used (0..N-2). 0 is possible in the first segment
2672 if the input is 0 bytes long.
2673 - If no padding was used and there is no more data left in the input
2674 the flag is set to N-1
2675 - If no padding was used and there is still data left in the input the
2676 flag is set to N.
2677
2678 For N=9, the following input values encode to the specified
2679 outout (where 'X' indicates a byte of the original input):
2680 - 0 bytes is encoded as 0 0 0 0 0 0 0 0 0
2681 - 1 byte is encoded as X 0 0 0 0 0 0 0 1
2682 - 2 bytes is encoded as X X 0 0 0 0 0 0 2
2683 - 7 bytes is encoded as X X X X X X X 0 7
2684 - 8 bytes is encoded as X X X X X X X X 8
2685 - 9 bytes is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2686 - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2687 */
pack_variable_format(const uchar * src,size_t src_len,uchar ** dst)2688 void Rdb_key_def::pack_variable_format(
2689 const uchar *src, // The data to encode
2690 size_t src_len, // The length of the data to encode
2691 uchar **dst) // The location to encode the data
2692 {
2693 uchar *ptr = *dst;
2694
2695 for (;;) {
2696 // Figure out how many bytes to copy, copy them and adjust pointers
2697 const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2698 memcpy(ptr, src, copy_len);
2699 ptr += copy_len;
2700 src += copy_len;
2701 src_len -= copy_len;
2702
2703 // Are we at the end of the input?
2704 if (src_len == 0) {
2705 // pad with zeros if necessary;
2706 const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2707 if (padding_bytes > 0) {
2708 memset(ptr, 0, padding_bytes);
2709 ptr += padding_bytes;
2710 }
2711
2712 // Put the flag byte (0 - N-1) in the output
2713 *(ptr++) = (uchar)copy_len;
2714 break;
2715 }
2716
2717 // We have more data - put the flag byte (N) in and continue
2718 *(ptr++) = RDB_ESCAPE_LENGTH;
2719 }
2720
2721 *dst = ptr;
2722 }
2723
2724 /*
2725 Function of type rdb_index_field_pack_t
2726 */
2727
pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))2728 void Rdb_key_def::pack_with_varchar_encoding(
2729 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2730 Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
2731 const CHARSET_INFO *const charset = field->charset();
2732 Field_varstring *const field_var = (Field_varstring *)field;
2733
2734 const size_t value_length = (field_var->length_bytes == 1)
2735 ? (uint)*field->ptr
2736 : uint2korr(field->ptr);
2737 size_t xfrm_len = charset->coll->strnxfrm(
2738 charset, buf, fpi->m_max_image_len, field_var->char_length(),
2739 field_var->ptr + field_var->length_bytes, value_length, 0);
2740
2741 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2742 if (fpi->m_use_legacy_varbinary_format) {
2743 pack_legacy_variable_format(buf, xfrm_len, dst);
2744 } else {
2745 pack_variable_format(buf, xfrm_len, dst);
2746 }
2747 }
2748
2749 /*
2750 Compare the string in [buf..buf_end) with a string that is an infinite
2751 sequence of strings in space_xfrm
2752 */
2753
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)2754 static int rdb_compare_string_with_spaces(
2755 const uchar *buf, const uchar *const buf_end,
2756 const std::vector<uchar> *const space_xfrm) {
2757 int cmp = 0;
2758 while (buf < buf_end) {
2759 size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2760 if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
2761 buf += bytes;
2762 }
2763 return cmp;
2764 }
2765
2766 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2767 /*
2768 Pack the data with Variable-Length Space-Padded Encoding.
2769
2770 The encoding is there to meet two goals:
2771
2772 Goal#1. Comparison. The SQL standard says
2773
2774 " If the collation for the comparison has the PAD SPACE characteristic,
2775 for the purposes of the comparison, the shorter value is effectively
2776 extended to the length of the longer by concatenation of <space>s on the
2777 right.
2778
2779 At the moment, all MySQL collations except one have the PAD SPACE
2780 characteristic. The exception is the "binary" collation that is used by
2781 [VAR]BINARY columns. (Note that binary collations for specific charsets,
2782 like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2783 the PAD SPACE characteristic).
2784
2785 Goal#2 is to preserve the number of trailing spaces in the original value.
2786
2787 This is achieved by using the following encoding:
2788 The key part:
2789 - Stores mem-comparable image of the column
2790 - It is stored in chunks of fpi->m_segment_size bytes (*)
2791 = If the remainder of the chunk is not occupied, it is padded with mem-
2792 comparable image of the space character (cs->pad_char to be precise).
2793 - The last byte of the chunk shows how the rest of column's mem-comparable
2794 image would compare to mem-comparable image of the column extended with
2795 spaces. There are three possible values.
2796 - VARCHAR_CMP_LESS_THAN_SPACES,
2797 - VARCHAR_CMP_EQUAL_TO_SPACES
2798 - VARCHAR_CMP_GREATER_THAN_SPACES
2799
2800 VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2801 is spaces, or something that sorts as spaces, so there is no reason to store
2802 it).
2803
2804 Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2805
2806 'abcd\0' => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0 ' <VARCHAR_CMP_EQUAL> ]
2807 'abcd' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2808 'abcd ' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2809 'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2810
2811 As mentioned above, the last chunk is padded with mem-comparable images of
2812 cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2813
2814 fpi->m_segment_size depends on the used collation. It is chosen to be such
2815 that no mem-comparable image of space will ever stretch across the segments
2816 (see get_segment_size_from_collation).
2817
2818 == The value part (aka unpack_info) ==
2819 The value part stores the number of space characters that one needs to add
2820 when unpacking the string.
2821 - If the number is positive, it means add this many spaces at the end
2822 - If the number is negative, it means padding has added extra spaces which
2823 must be removed.
2824
2825 Storage considerations
2826 - depending on column's max size, the number may occupy 1 or 2 bytes
2827 - the number of spaces that need to be removed is not more than
2828 RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2829 then store it as unsigned.
2830
2831 @seealso
2832 unpack_binary_or_utf8_varchar_space_pad
2833 unpack_simple_varchar_space_pad
2834 dummy_make_unpack_info
2835 skip_variable_space_pad
2836 */
2837
pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)2838 void Rdb_key_def::pack_with_varchar_space_pad(
2839 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2840 Rdb_pack_field_context *const pack_ctx) {
2841 Rdb_string_writer *const unpack_info = pack_ctx->writer;
2842 const CHARSET_INFO *const charset = field->charset();
2843 const auto field_var = static_cast<Field_varstring *>(field);
2844
2845 const size_t value_length = (field_var->length_bytes == 1)
2846 ? (uint)*field->ptr
2847 : uint2korr(field->ptr);
2848
2849 const size_t trimmed_len = charset->cset->lengthsp(
2850 charset, (const char *)field_var->ptr + field_var->length_bytes,
2851 value_length);
2852 const size_t xfrm_len = charset->coll->strnxfrm(
2853 charset, buf, fpi->m_max_image_len, field_var->char_length(),
2854 field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2855
2856 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2857 uchar *const buf_end = buf + xfrm_len;
2858
2859 size_t encoded_size = 0;
2860 uchar *ptr = *dst;
2861 size_t padding_bytes;
2862 while (true) {
2863 const size_t copy_len =
2864 std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2865 padding_bytes = fpi->m_segment_size - 1 - copy_len;
2866 memcpy(ptr, buf, copy_len);
2867 ptr += copy_len;
2868 buf += copy_len;
2869
2870 if (padding_bytes) {
2871 memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2872 ptr += padding_bytes;
2873 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment
2874 } else {
2875 // Compare the string suffix with a hypothetical infinite string of
2876 // spaces. It could be that the first difference is beyond the end of
2877 // current chunk.
2878 const int cmp =
2879 rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2880
2881 if (cmp < 0) {
2882 *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2883 } else if (cmp > 0) {
2884 *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2885 } else {
2886 // It turns out all the rest are spaces.
2887 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2888 }
2889 }
2890 encoded_size += fpi->m_segment_size;
2891
2892 if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
2893 }
2894
2895 // m_unpack_info_stores_value means unpack_info stores the whole original
2896 // value. There is no need to store the number of trimmed/padded endspaces
2897 // in that case.
2898 if (unpack_info && !fpi->m_unpack_info_stores_value) {
2899 // (value_length - trimmed_len) is the number of trimmed space *characters*
2900 // then, padding_bytes is the number of *bytes* added as padding
2901 // then, we add 8, because we don't store negative values.
2902 assert(padding_bytes % fpi->space_xfrm_len == 0);
2903 assert((value_length - trimmed_len) % fpi->space_mb_len == 0);
2904 const size_t removed_chars =
2905 RDB_TRIMMED_CHARS_OFFSET +
2906 (value_length - trimmed_len) / fpi->space_mb_len -
2907 padding_bytes / fpi->space_xfrm_len;
2908
2909 if (fpi->m_unpack_info_uses_two_bytes) {
2910 unpack_info->write_uint16(removed_chars);
2911 } else {
2912 assert(removed_chars < 0x100);
2913 unpack_info->write_uint8(removed_chars);
2914 }
2915 }
2916
2917 *dst += encoded_size;
2918 }
2919
2920 /*
2921 Calculate the number of used bytes in the chunk and whether this is the
2922 last chunk in the input. This is based on the old legacy format - see
2923 pack_legacy_variable_format.
2924 */
calc_unpack_legacy_variable_format(uchar flag,bool * done)2925 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
2926 uint pad = 255 - flag;
2927 uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2928 if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2929 return (uint)-1;
2930 }
2931
2932 *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2933 return used_bytes;
2934 }
2935
2936 /*
2937 Calculate the number of used bytes in the chunk and whether this is the
2938 last chunk in the input. This is based on the new format - see
2939 pack_variable_format.
2940 */
calc_unpack_variable_format(uchar flag,bool * done)2941 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
2942 // Check for invalid flag values
2943 if (flag > RDB_ESCAPE_LENGTH) {
2944 return (uint)-1;
2945 }
2946
2947 // Values from 1 to N-1 indicate this is the last chunk and that is how
2948 // many bytes were used
2949 if (flag < RDB_ESCAPE_LENGTH) {
2950 *done = true;
2951 return flag;
2952 }
2953
2954 // A value of N means we used N-1 bytes and had more to go
2955 *done = false;
2956 return RDB_ESCAPE_LENGTH - 1;
2957 }
2958
2959 /*
2960 Unpack data that has charset information. Each two bytes of the input is
2961 treated as a wide-character and converted to its multibyte equivalent in
2962 the output.
2963 */
unpack_charset(const CHARSET_INFO * cset,const uchar * src,uint src_len,uchar * dst,uint dst_len,uint * used_bytes)2964 static int unpack_charset(
2965 const CHARSET_INFO *cset, // character set information
2966 const uchar *src, // source data to unpack
2967 uint src_len, // length of source data
2968 uchar *dst, // destination of unpacked data
2969 uint dst_len, // length of destination data
2970 uint *used_bytes) // output number of bytes used
2971 {
2972 if (src_len & 1) {
2973 /*
2974 UTF-8 characters are encoded into two-byte entities. There is no way
2975 we can have an odd number of bytes after encoding.
2976 */
2977 return UNPACK_FAILURE;
2978 }
2979
2980 uchar *dst_end = dst + dst_len;
2981 uint used = 0;
2982
2983 for (uint ii = 0; ii < src_len; ii += 2) {
2984 my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2985 int res = cset->cset->wc_mb(cset, wc, dst + used, dst_end);
2986 assert(res > 0);
2987 assert(res <= 3);
2988 if (res < 0) {
2989 return UNPACK_FAILURE;
2990 }
2991
2992 used += res;
2993 }
2994
2995 *used_bytes = used;
2996 return UNPACK_SUCCESS;
2997 }
2998
2999 /*
3000 Function of type rdb_index_field_unpack_t
3001 */
3002
unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))3003 int Rdb_key_def::unpack_binary_or_utf8_varchar(
3004 Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
3005 Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
3006 const uchar *ptr;
3007 size_t len = 0;
3008 bool finished = false;
3009 uchar *d0 = dst;
3010 dst += fpi->m_varchar_length_bytes;
3011 // How much we can unpack
3012 size_t dst_len = fpi->m_field_pack_length - fpi->m_varchar_length_bytes;
3013
3014 bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
3015
3016 /* Decode the length-emitted encoding here */
3017 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
3018 uint used_bytes;
3019
3020 /* See pack_with_varchar_encoding. */
3021 if (use_legacy_format) {
3022 used_bytes = calc_unpack_legacy_variable_format(
3023 ptr[RDB_ESCAPE_LENGTH - 1], &finished);
3024 } else {
3025 used_bytes =
3026 calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
3027 }
3028
3029 if (used_bytes == (uint)-1 || dst_len < used_bytes) {
3030 return UNPACK_FAILURE; // Corruption in the data
3031 }
3032
3033 /*
3034 Now, we need to decode used_bytes of data and append them to the value.
3035 */
3036 if (fpi->m_field_charset == &my_charset_utf8_bin) {
3037 int err = unpack_charset(fpi->m_field_charset, ptr, used_bytes, dst,
3038 dst_len, &used_bytes);
3039 if (err != UNPACK_SUCCESS) {
3040 return err;
3041 }
3042 } else {
3043 memcpy(dst, ptr, used_bytes);
3044 }
3045
3046 dst += used_bytes;
3047 dst_len -= used_bytes;
3048 len += used_bytes;
3049
3050 if (finished) {
3051 break;
3052 }
3053 }
3054
3055 if (!finished) {
3056 return UNPACK_FAILURE;
3057 }
3058
3059 /* Save the length */
3060 if (fpi->m_varchar_length_bytes == 1) {
3061 d0[0] = len;
3062 } else {
3063 assert(fpi->m_varchar_length_bytes == 2);
3064 int2store(d0, len);
3065 }
3066 return UNPACK_SUCCESS;
3067 }
3068
3069 /*
3070 @seealso
3071 pack_with_varchar_space_pad - packing function
3072 unpack_simple_varchar_space_pad - unpacking function for 'simple'
3073 charsets.
3074 skip_variable_space_pad - skip function
3075 */
unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3076 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
3077 Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
3078 Rdb_string_reader *const unp_reader) {
3079 const uchar *ptr;
3080 size_t len = 0;
3081 bool finished = false;
3082 uchar *d0 = dst;
3083 uchar *dst_end = dst + fpi->m_field_pack_length;
3084 dst += fpi->m_varchar_length_bytes;
3085
3086 uint space_padding_bytes = 0;
3087 uint extra_spaces;
3088 if ((fpi->m_unpack_info_uses_two_bytes
3089 ? unp_reader->read_uint16(&extra_spaces)
3090 : unp_reader->read_uint8(&extra_spaces))) {
3091 return UNPACK_FAILURE;
3092 }
3093
3094 if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
3095 space_padding_bytes =
3096 -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
3097 extra_spaces = 0;
3098 } else {
3099 extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
3100 }
3101
3102 space_padding_bytes *= fpi->space_xfrm_len;
3103
3104 /* Decode the length-emitted encoding here */
3105 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
3106 const char last_byte = ptr[fpi->m_segment_size - 1];
3107 size_t used_bytes;
3108 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment
3109 {
3110 if (space_padding_bytes > (fpi->m_segment_size - 1)) {
3111 return UNPACK_FAILURE; // Cannot happen, corrupted data
3112 }
3113 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
3114 finished = true;
3115 } else {
3116 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
3117 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
3118 return UNPACK_FAILURE; // Invalid value
3119 }
3120 used_bytes = fpi->m_segment_size - 1;
3121 }
3122
3123 // Now, need to decode used_bytes of data and append them to the value.
3124 if (fpi->m_field_charset == &my_charset_utf8_bin) {
3125 if (used_bytes & 1) {
3126 /*
3127 UTF-8 characters are encoded into two-byte entities. There is no way
3128 we can have an odd number of bytes after encoding.
3129 */
3130 return UNPACK_FAILURE;
3131 }
3132
3133 const uchar *src = ptr;
3134 const uchar *const src_end = ptr + used_bytes;
3135 while (src < src_end) {
3136 my_wc_t wc = (src[0] << 8) | src[1];
3137 src += 2;
3138 const CHARSET_INFO *cset = fpi->m_field_charset;
3139 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
3140 assert(res <= 3);
3141 if (res <= 0) return UNPACK_FAILURE;
3142 dst += res;
3143 len += res;
3144 }
3145 } else {
3146 if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
3147 memcpy(dst, ptr, used_bytes);
3148 dst += used_bytes;
3149 len += used_bytes;
3150 }
3151
3152 if (finished) {
3153 if (extra_spaces) {
3154 // Both binary and UTF-8 charset store space as ' ',
3155 // so the following is ok:
3156 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
3157 memset(dst, fpi->m_field_charset->pad_char, extra_spaces);
3158 len += extra_spaces;
3159 }
3160 break;
3161 }
3162 }
3163
3164 if (!finished) return UNPACK_FAILURE;
3165
3166 /* Save the length */
3167 if (fpi->m_varchar_length_bytes == 1) {
3168 d0[0] = len;
3169 } else {
3170 assert(fpi->m_varchar_length_bytes == 2);
3171 int2store(d0, len);
3172 }
3173 return UNPACK_SUCCESS;
3174 }
3175
3176 /////////////////////////////////////////////////////////////////////////
3177
3178 /*
3179 Function of type rdb_make_unpack_info_t
3180 */
3181
make_unpack_unknown(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)3182 void Rdb_key_def::make_unpack_unknown(
3183 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
3184 const Field *const field, Rdb_pack_field_context *const pack_ctx) {
3185 pack_ctx->writer->write(field->ptr, field->pack_length());
3186 }
3187
3188 /*
3189 This point of this function is only to indicate that unpack_info is
3190 available.
3191
3192 The actual unpack_info data is produced by the function that packs the key,
3193 that is, pack_with_varchar_space_pad.
3194 */
3195
dummy_make_unpack_info(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * field MY_ATTRIBUTE ((__unused__)),Rdb_pack_field_context * pack_ctx MY_ATTRIBUTE ((__unused__)))3196 void Rdb_key_def::dummy_make_unpack_info(
3197 const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
3198 const Field *field MY_ATTRIBUTE((__unused__)),
3199 Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
3200 // Do nothing
3201 }
3202
3203 /*
3204 Function of type rdb_index_field_unpack_t
3205 */
3206
unpack_unknown(Rdb_field_packing * const fpi,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3207 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi, uchar *const dst,
3208 Rdb_string_reader *const reader,
3209 Rdb_string_reader *const unp_reader) {
3210 const uchar *ptr;
3211 const uint len = fpi->m_unpack_data_len;
3212 // We don't use anything from the key, so skip over it.
3213 if (skip_max_length(fpi, reader)) {
3214 return UNPACK_FAILURE;
3215 }
3216
3217 assert_IMP(len > 0, unp_reader != nullptr);
3218
3219 if ((ptr = (const uchar *)unp_reader->read(len))) {
3220 memcpy(dst, ptr, len);
3221 return UNPACK_SUCCESS;
3222 }
3223 return UNPACK_FAILURE;
3224 }
3225
3226 /*
3227 Function of type rdb_make_unpack_info_t
3228 */
3229
make_unpack_unknown_varchar(const Rdb_collation_codec * const codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)3230 void Rdb_key_def::make_unpack_unknown_varchar(
3231 const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
3232 const Field *const field, Rdb_pack_field_context *const pack_ctx) {
3233 const auto f = static_cast<const Field_varstring *>(field);
3234 uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
3235 len += f->length_bytes;
3236 pack_ctx->writer->write(field->ptr, len);
3237 }
3238
3239 /*
3240 Function of type rdb_index_field_unpack_t
3241
3242 @detail
3243 Unpack a key part in an "unknown" collation from its
3244 (mem_comparable_form, unpack_info) form.
3245
3246 "Unknown" means we have no clue about how mem_comparable_form is made from
3247 the original string, so we keep the whole original string in the unpack_info.
3248
3249 @seealso
3250 make_unpack_unknown, unpack_unknown
3251 */
3252
unpack_unknown_varchar(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3253 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
3254 uchar *dst,
3255 Rdb_string_reader *const reader,
3256 Rdb_string_reader *const unp_reader) {
3257 const uchar *ptr;
3258 uchar *const d0 = dst;
3259 dst += fpi->m_varchar_length_bytes;
3260 const uint len_bytes = fpi->m_varchar_length_bytes;
3261 // We don't use anything from the key, so skip over it.
3262 if ((fpi->m_skip_func)(fpi, reader)) {
3263 return UNPACK_FAILURE;
3264 }
3265
3266 assert(len_bytes > 0);
3267 assert(unp_reader != nullptr);
3268
3269 if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
3270 memcpy(d0, ptr, len_bytes);
3271 const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
3272 if ((ptr = (const uchar *)unp_reader->read(len))) {
3273 memcpy(dst, ptr, len);
3274 return UNPACK_SUCCESS;
3275 }
3276 }
3277 return UNPACK_FAILURE;
3278 }
3279
3280 /*
3281 Write unpack_data for a "simple" collation
3282 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)3283 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
3284 const Rdb_collation_codec *const codec,
3285 const uchar *const src,
3286 const size_t src_len) {
3287 for (uint i = 0; i < src_len; i++) {
3288 writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
3289 }
3290 }
3291
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len,uchar * const dst)3292 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
3293 const Rdb_collation_codec *const codec,
3294 const uchar *const src, const size_t src_len,
3295 uchar *const dst) {
3296 for (uint i = 0; i < src_len; i++) {
3297 if (codec->m_dec_size[src[i]] > 0) {
3298 uint *ret;
3299 assert(reader != nullptr);
3300
3301 if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
3302 return UNPACK_FAILURE;
3303 }
3304 dst[i] = codec->m_dec_idx[*ret][src[i]];
3305 } else {
3306 dst[i] = codec->m_dec_idx[0][src[i]];
3307 }
3308 }
3309
3310 return UNPACK_SUCCESS;
3311 }
3312
3313 /*
3314 Function of type rdb_make_unpack_info_t
3315
3316 @detail
3317 Make unpack_data for VARCHAR(n) in a "simple" charset.
3318 */
3319
make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)3320 void Rdb_key_def::make_unpack_simple_varchar(
3321 const Rdb_collation_codec *const codec, const Field *const field,
3322 Rdb_pack_field_context *const pack_ctx) {
3323 const auto f = static_cast<const Field_varstring *>(field);
3324 uchar *const src = f->ptr + f->length_bytes;
3325 const size_t src_len =
3326 f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
3327 Rdb_bit_writer bit_writer(pack_ctx->writer);
3328 // The std::min compares characters with bytes, but for simple collations,
3329 // mbmaxlen = 1.
3330 rdb_write_unpack_simple(&bit_writer, codec, src,
3331 std::min((size_t)f->char_length(), src_len));
3332 }
3333
3334 /*
3335 Function of type rdb_index_field_unpack_t
3336
3337 @seealso
3338 pack_with_varchar_space_pad - packing function
3339 unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
3340 */
3341
unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3342 int Rdb_key_def::unpack_simple_varchar_space_pad(
3343 Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
3344 Rdb_string_reader *const unp_reader) {
3345 const uchar *ptr;
3346 size_t len = 0;
3347 bool finished = false;
3348 uchar *d0 = dst;
3349 // For simple collations, char_length is also number of bytes.
3350 assert((size_t)fpi->m_max_image_len >= fpi->m_varchar_char_length);
3351 uchar *dst_end = dst + fpi->m_field_pack_length;
3352 dst += fpi->m_varchar_length_bytes;
3353 Rdb_bit_reader bit_reader(unp_reader);
3354
3355 uint space_padding_bytes = 0;
3356 uint extra_spaces;
3357 assert(unp_reader != nullptr);
3358
3359 if ((fpi->m_unpack_info_uses_two_bytes
3360 ? unp_reader->read_uint16(&extra_spaces)
3361 : unp_reader->read_uint8(&extra_spaces))) {
3362 return UNPACK_FAILURE;
3363 }
3364
3365 if (extra_spaces <= 8) {
3366 space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
3367 extra_spaces = 0;
3368 } else {
3369 extra_spaces -= 8;
3370 }
3371
3372 space_padding_bytes *= fpi->space_xfrm_len;
3373
3374 /* Decode the length-emitted encoding here */
3375 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
3376 const char last_byte =
3377 ptr[fpi->m_segment_size - 1]; // number of padding bytes
3378 size_t used_bytes;
3379 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
3380 // this is the last one
3381 if (space_padding_bytes > (fpi->m_segment_size - 1)) {
3382 return UNPACK_FAILURE; // Cannot happen, corrupted data
3383 }
3384 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
3385 finished = true;
3386 } else {
3387 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
3388 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
3389 return UNPACK_FAILURE;
3390 }
3391 used_bytes = fpi->m_segment_size - 1;
3392 }
3393
3394 if (dst + used_bytes > dst_end) {
3395 // The value on disk is longer than the field definition allows?
3396 return UNPACK_FAILURE;
3397 }
3398
3399 uint ret;
3400 if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
3401 used_bytes, dst)) != UNPACK_SUCCESS) {
3402 return ret;
3403 }
3404
3405 dst += used_bytes;
3406 len += used_bytes;
3407
3408 if (finished) {
3409 if (extra_spaces) {
3410 if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
3411 // pad_char has a 1-byte form in all charsets that
3412 // are handled by rdb_init_collation_mapping.
3413 memset(dst, fpi->m_field_charset->pad_char, extra_spaces);
3414 len += extra_spaces;
3415 }
3416 break;
3417 }
3418 }
3419
3420 if (!finished) return UNPACK_FAILURE;
3421
3422 /* Save the length */
3423 if (fpi->m_varchar_length_bytes == 1) {
3424 d0[0] = len;
3425 } else {
3426 assert(fpi->m_varchar_length_bytes == 2);
3427 int2store(d0, len);
3428 }
3429 return UNPACK_SUCCESS;
3430 }
3431
3432 /*
3433 Function of type rdb_make_unpack_info_t
3434
3435 @detail
3436 Make unpack_data for CHAR(n) value in a "simple" charset.
3437 It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
3438
3439 @seealso
3440 The VARCHAR variant is in make_unpack_simple_varchar
3441 */
3442
make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)3443 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
3444 const Field *const field,
3445 Rdb_pack_field_context *const pack_ctx) {
3446 const uchar *const src = field->ptr;
3447 Rdb_bit_writer bit_writer(pack_ctx->writer);
3448 rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
3449 }
3450
3451 /*
3452 Function of type rdb_index_field_unpack_t
3453 */
3454
unpack_simple(Rdb_field_packing * const fpi,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3455 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi, uchar *const dst,
3456 Rdb_string_reader *const reader,
3457 Rdb_string_reader *const unp_reader) {
3458 const uchar *ptr;
3459 const uint len = fpi->m_max_image_len;
3460 Rdb_bit_reader bit_reader(unp_reader);
3461
3462 if (!(ptr = (const uchar *)reader->read(len))) {
3463 return UNPACK_FAILURE;
3464 }
3465
3466 return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
3467 fpi->m_charset_codec, ptr, len, dst);
3468 }
3469
3470 // See Rdb_charset_space_info::spaces_xfrm
3471 const int RDB_SPACE_XFRM_SIZE = 32;
3472
3473 namespace {
3474
3475 // A class holding information about how space character is represented in a
3476 // charset.
3477 class Rdb_charset_space_info {
3478 public:
3479 Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
3480 Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
3481 Rdb_charset_space_info() = default;
3482
3483 // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
3484 std::vector<uchar> spaces_xfrm;
3485
3486 // length(strxfrm(' '))
3487 size_t space_xfrm_len;
3488
3489 // length of the space character itself
3490 // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
3491 // (length=2)
3492 size_t space_mb_len;
3493 };
3494
3495 } // namespace
3496
3497 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
3498 rdb_mem_comparable_space;
3499
3500 /*
3501 @brief
3502 For a given charset, get
3503 - strxfrm(' '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
3504 - length of strxfrm(charset, ' ')
3505 - length of the space character in the charset
3506
3507 @param cs IN Charset to get the space for
3508 @param ptr OUT A few space characters
3509 @param len OUT Return length of the space (in bytes)
3510
3511 @detail
3512 It is tempting to pre-generate mem-comparable form of space character for
3513 every charset on server startup.
3514 One can't do that: some charsets are not initialized until somebody
3515 attempts to use them (e.g. create or open a table that has a field that
3516 uses the charset).
3517 */
3518
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)3519 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
3520 const std::vector<uchar> **xfrm,
3521 size_t *const xfrm_len,
3522 size_t *const mb_len) {
3523 assert(cs->number < MY_ALL_CHARSETS_SIZE);
3524 if (!rdb_mem_comparable_space[cs->number].get()) {
3525 RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
3526 if (!rdb_mem_comparable_space[cs->number].get()) {
3527 // Upper bound of how many bytes can be occupied by multi-byte form of a
3528 // character in any charset.
3529 const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
3530 assert(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
3531
3532 // multi-byte form of the ' ' (space) character
3533 uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
3534
3535 const size_t space_mb_len = cs->cset->wc_mb(
3536 cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
3537
3538 // mem-comparable image of the space character
3539 std::array<uchar, 20> space;
3540
3541 const size_t space_len = cs->coll->strnxfrm(
3542 cs, space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
3543 Rdb_charset_space_info *const info = new Rdb_charset_space_info;
3544 info->space_xfrm_len = space_len;
3545 info->space_mb_len = space_mb_len;
3546 while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
3547 info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
3548 space.data() + space_len);
3549 }
3550 rdb_mem_comparable_space[cs->number].reset(info);
3551 }
3552 RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
3553 }
3554
3555 *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
3556 *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
3557 *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
3558 }
3559
3560 mysql_mutex_t rdb_mem_cmp_space_mutex;
3561
3562 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
3563 rdb_collation_data;
3564 mysql_mutex_t rdb_collation_data_mutex;
3565
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)3566 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
3567 return (cs->coll == &my_collation_8bit_simple_ci_handler);
3568 }
3569
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)3570 static const Rdb_collation_codec *rdb_init_collation_mapping(
3571 const my_core::CHARSET_INFO *const cs) {
3572 assert(cs);
3573 assert(cs->state & MY_CS_AVAILABLE);
3574 const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
3575
3576 if (codec == nullptr && rdb_is_collation_supported(cs)) {
3577 RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
3578
3579 codec = rdb_collation_data[cs->number];
3580 if (codec == nullptr) {
3581 Rdb_collation_codec *cur = nullptr;
3582
3583 // Compute reverse mapping for simple collations.
3584 if (cs->coll == &my_collation_8bit_simple_ci_handler) {
3585 cur = new Rdb_collation_codec;
3586 std::map<uchar, std::vector<uchar>> rev_map;
3587 size_t max_conflict_size = 0;
3588 for (int src = 0; src < 256; src++) {
3589 uchar dst = cs->sort_order[src];
3590 rev_map[dst].push_back(src);
3591 max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
3592 }
3593 cur->m_dec_idx.resize(max_conflict_size);
3594
3595 for (auto const &p : rev_map) {
3596 uchar dst = p.first;
3597 for (uint idx = 0; idx < p.second.size(); idx++) {
3598 uchar src = p.second[idx];
3599 uchar bits =
3600 my_bit_log2(my_round_up_to_next_power(p.second.size()));
3601 cur->m_enc_idx[src] = idx;
3602 cur->m_enc_size[src] = bits;
3603 cur->m_dec_size[dst] = bits;
3604 cur->m_dec_idx[idx][dst] = src;
3605 }
3606 }
3607
3608 cur->m_make_unpack_info_func = {
3609 {Rdb_key_def::make_unpack_simple_varchar,
3610 Rdb_key_def::make_unpack_simple}};
3611 cur->m_unpack_func = {{Rdb_key_def::unpack_simple_varchar_space_pad,
3612 Rdb_key_def::unpack_simple}};
3613 } else {
3614 // Out of luck for now.
3615 }
3616
3617 if (cur != nullptr) {
3618 codec = cur;
3619 cur->m_cs = cs;
3620 rdb_collation_data[cs->number] = cur;
3621 }
3622 }
3623
3624 RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3625 }
3626
3627 return codec;
3628 }
3629
get_segment_size_from_collation(const CHARSET_INFO * const cs)3630 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3631 int ret;
3632 if (cs == &my_charset_utf8mb4_bin || cs == &my_charset_utf16_bin ||
3633 cs == &my_charset_utf16le_bin || cs == &my_charset_utf32_bin) {
3634 /*
3635 In these collations, a character produces one weight, which is 3 bytes.
3636 Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3637 get 3*3+1=10
3638 */
3639 ret = 10;
3640 } else {
3641 /*
3642 All other collations. There are two classes:
3643 - Unicode-based, except for collations mentioned in the if-condition.
3644 For these all weights are 2 bytes long, a character may produce 0..8
3645 weights.
3646 in any case, 8 bytes of payload in the segment guarantee that the last
3647 space character won't span across segments.
3648
3649 - Collations not based on unicode. These have length(strxfrm(' '))=1,
3650 there nothing to worry about.
3651
3652 In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3653 */
3654 ret = 9;
3655 }
3656 assert(ret < RDB_SPACE_XFRM_SIZE);
3657 return ret;
3658 }
3659
Rdb_field_packing(const Rdb_field_packing & o)3660 Rdb_field_packing::Rdb_field_packing(const Rdb_field_packing &o)
3661 : m_max_image_len(o.m_max_image_len),
3662 m_unpack_data_len(o.m_unpack_data_len),
3663 m_unpack_data_offset(o.m_unpack_data_offset),
3664 m_field_maybe_null(o.m_field_maybe_null),
3665 m_segment_size(o.m_segment_size),
3666 m_unpack_info_uses_two_bytes(o.m_unpack_info_uses_two_bytes),
3667 m_covered(o.m_covered),
3668 space_xfrm(o.space_xfrm),
3669 space_xfrm_len(o.space_xfrm_len),
3670 space_mb_len(o.space_mb_len),
3671 m_charset_codec(o.m_charset_codec),
3672 m_unpack_info_stores_value(o.m_unpack_info_stores_value),
3673 m_pack_func(o.m_pack_func),
3674 m_make_unpack_info_func(o.m_make_unpack_info_func),
3675 m_unpack_func(o.m_unpack_func),
3676 m_skip_func(o.m_skip_func),
3677 m_keynr(o.m_keynr),
3678 m_key_part(o.m_key_part) {}
3679
Rdb_field_packing()3680 Rdb_field_packing::Rdb_field_packing()
3681 : m_max_image_len(0),
3682 m_unpack_data_len(0),
3683 m_unpack_data_offset(0),
3684 m_field_maybe_null(false),
3685 m_segment_size(0),
3686 m_unpack_info_uses_two_bytes(false),
3687 m_covered(false),
3688 space_xfrm(nullptr),
3689 space_xfrm_len(0),
3690 space_mb_len(0),
3691 m_charset_codec(nullptr),
3692 m_unpack_info_stores_value(false),
3693 m_pack_func(nullptr),
3694 m_make_unpack_info_func(nullptr),
3695 m_unpack_func(nullptr),
3696 m_skip_func(nullptr),
3697 m_keynr(0),
3698 m_key_part(0) {}
3699
3700 /*
3701 @brief
3702 Setup packing of index field into its mem-comparable form
3703
3704 @detail
3705 - It is possible produce mem-comparable form for any datatype.
3706 - Some datatypes also allow to unpack the original value from its
3707 mem-comparable form.
3708 = Some of these require extra information to be stored in "unpack_info".
3709 unpack_info is not a part of mem-comparable form, it is only used to
3710 restore the original value
3711
3712 @param
3713 field IN field to be packed/un-packed
3714
3715 @return
3716 TRUE - Field can be read with index-only reads
3717 FALSE - Otherwise
3718 */
3719
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint keynr_arg,const uint key_part_arg,const uint16 key_length)3720 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3721 const Field *const field, const uint keynr_arg,
3722 const uint key_part_arg,
3723 const uint16 key_length) {
3724 int res = false;
3725 enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3726
3727 m_field_real_type = type;
3728 m_field_offset =
3729 (key_descr && field) ? (field->ptr - field->table->record[0]) : -1;
3730 m_field_null_offset = (key_descr && field) ? field->null_offset() : -1;
3731 m_field_null_bit_mask = (key_descr && field) ? field->null_bit : 0;
3732 m_field_pack_length = (key_descr && field) ? field->pack_length() : -1;
3733 m_field_charset = (key_descr && field) ? field->charset() : nullptr;
3734 m_field_unsigned_flag = false;
3735 m_field_maybe_null = field ? field->real_maybe_null() : false;
3736 m_varchar_length_bytes = -1;
3737 m_varchar_char_length = -1;
3738
3739 m_keynr = keynr_arg;
3740 m_key_part = key_part_arg;
3741
3742 m_unpack_func = nullptr;
3743 m_make_unpack_info_func = nullptr;
3744 m_unpack_data_len = 0;
3745 space_xfrm = nullptr; // safety
3746 // whether to use legacy format for varchar
3747 m_use_legacy_varbinary_format = false;
3748 // ha_rocksdb::index_flags() will pass key_descr == null to
3749 // see whether field(column) can be read-only reads through return value,
3750 // but the legacy vs. new varchar format doesn't affect return value.
3751 // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
3752 if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3753 m_use_legacy_varbinary_format = true;
3754 }
3755 /* Calculate image length. By default, is is pack_length() */
3756 m_max_image_len =
3757 field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3758 m_skip_func = Rdb_key_def::skip_max_length;
3759 m_pack_func = Rdb_key_def::pack_with_make_sort_key;
3760
3761 m_covered = false;
3762
3763 switch (type) {
3764 case MYSQL_TYPE_LONGLONG:
3765 m_pack_func = Rdb_key_def::pack_longlong;
3766 m_field_unsigned_flag =
3767 field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3768 m_unpack_func = Rdb_key_def::unpack_integer<8>;
3769 m_covered = true;
3770 return true;
3771
3772 case MYSQL_TYPE_LONG:
3773 m_pack_func = Rdb_key_def::pack_long;
3774 m_field_unsigned_flag =
3775 field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3776 m_unpack_func = Rdb_key_def::unpack_integer<4>;
3777 m_covered = true;
3778 return true;
3779
3780 case MYSQL_TYPE_INT24:
3781 m_pack_func = Rdb_key_def::pack_medium;
3782 m_field_unsigned_flag =
3783 field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3784 m_unpack_func = Rdb_key_def::unpack_integer<3>;
3785 m_covered = true;
3786 return true;
3787
3788 case MYSQL_TYPE_SHORT:
3789 m_pack_func = Rdb_key_def::pack_short;
3790 m_field_unsigned_flag =
3791 field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3792 m_unpack_func = Rdb_key_def::unpack_integer<2>;
3793 m_covered = true;
3794 return true;
3795
3796 case MYSQL_TYPE_TINY:
3797 m_pack_func = Rdb_key_def::pack_tiny;
3798 m_field_unsigned_flag =
3799 field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3800 m_unpack_func = Rdb_key_def::unpack_integer<1>;
3801 m_covered = true;
3802 return true;
3803
3804 case MYSQL_TYPE_DOUBLE:
3805 m_pack_func = Rdb_key_def::pack_double;
3806 m_unpack_func = Rdb_key_def::unpack_double;
3807 m_covered = true;
3808 return true;
3809
3810 case MYSQL_TYPE_FLOAT:
3811 m_pack_func = Rdb_key_def::pack_float;
3812 m_unpack_func = Rdb_key_def::unpack_float;
3813 m_covered = true;
3814 return true;
3815
3816 case MYSQL_TYPE_NEWDECIMAL:
3817 m_pack_func = Rdb_key_def::pack_new_decimal;
3818 m_unpack_func = Rdb_key_def::unpack_binary_str;
3819 m_covered = true;
3820 return true;
3821
3822 case MYSQL_TYPE_DATETIME2:
3823 m_pack_func = Rdb_key_def::pack_datetime2;
3824 m_unpack_func = Rdb_key_def::unpack_binary_str;
3825 m_covered = true;
3826 return true;
3827
3828 case MYSQL_TYPE_TIMESTAMP2:
3829 m_pack_func = Rdb_key_def::pack_timestamp2;
3830 m_unpack_func = Rdb_key_def::unpack_binary_str;
3831 m_covered = true;
3832 return true;
3833
3834 case MYSQL_TYPE_TIME2:
3835 m_pack_func = Rdb_key_def::pack_time2;
3836 m_unpack_func = Rdb_key_def::unpack_binary_str;
3837 m_covered = true;
3838 return true;
3839
3840 case MYSQL_TYPE_YEAR:
3841 m_pack_func = Rdb_key_def::pack_year;
3842 m_unpack_func = Rdb_key_def::unpack_binary_str;
3843 m_covered = true;
3844 return true;
3845
3846 case MYSQL_TYPE_NEWDATE:
3847 m_pack_func = Rdb_key_def::pack_newdate;
3848 m_unpack_func = Rdb_key_def::unpack_newdate;
3849 m_covered = true;
3850 return true;
3851
3852 case MYSQL_TYPE_TINY_BLOB:
3853 case MYSQL_TYPE_MEDIUM_BLOB:
3854 case MYSQL_TYPE_LONG_BLOB:
3855 case MYSQL_TYPE_BLOB:
3856 case MYSQL_TYPE_JSON: {
3857 m_pack_func = Rdb_key_def::pack_blob;
3858 if (key_descr) {
3859 // The my_charset_bin collation is special in that it will consider
3860 // shorter strings sorting as less than longer strings.
3861 //
3862 // See Field_blob::make_sort_key for details.
3863 m_max_image_len =
3864 key_length +
3865 (field->charset() == &my_charset_bin
3866 ? dynamic_cast<const Field_blob *>(field)->pack_length_no_ptr()
3867 : 0);
3868 // Return false because indexes on text/blob will always require
3869 // a prefix. With a prefix, the optimizer will not be able to do an
3870 // index-only scan since there may be content occuring after the prefix
3871 // length.
3872 return false;
3873 }
3874 } break;
3875 // Obsolete
3876 case MYSQL_TYPE_DECIMAL:
3877 case MYSQL_TYPE_TIMESTAMP:
3878 case MYSQL_TYPE_TIME:
3879 case MYSQL_TYPE_DATETIME:
3880 assert(0);
3881 default:
3882 break;
3883 }
3884
3885 m_unpack_info_stores_value = false;
3886 /* Handle [VAR](CHAR|BINARY) */
3887
3888 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3889 /*
3890 For CHAR-based columns, check how strxfrm image will take.
3891 field->field_length = field->char_length() * cs->mbmaxlen.
3892 */
3893 const CHARSET_INFO *cs = field->charset();
3894 m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
3895 }
3896 const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3897 const CHARSET_INFO *cs = field->charset();
3898 // max_image_len before chunking is taken into account
3899 const int max_image_len_before_chunks = m_max_image_len;
3900
3901 if (is_varchar) {
3902 // The default for varchar is variable-length, without space-padding for
3903 // comparisons
3904 const auto field_var = static_cast<const Field_varstring *>(field);
3905 m_varchar_length_bytes = field_var->length_bytes;
3906 m_varchar_char_length = field_var->char_length();
3907 m_skip_func = Rdb_key_def::skip_variable_length;
3908 m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3909 if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3910 m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3911 } else {
3912 // Calculate the maximum size of the short section plus the
3913 // maximum size of the long section
3914 m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3915 }
3916
3917 m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3918 }
3919
3920 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3921 // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3922 // information about character-based datatypes are compared.
3923 bool use_unknown_collation = false;
3924 DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3925 use_unknown_collation = true;);
3926
3927 if (cs == &my_charset_bin) {
3928 // - SQL layer pads BINARY(N) so that it always is N bytes long.
3929 // - For VARBINARY(N), values may have different lengths, so we're using
3930 // variable-length encoding. This is also the only charset where the
3931 // values are not space-padded for comparison.
3932 m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
3933 : Rdb_key_def::unpack_binary_str;
3934 res = true;
3935 } else if (cs == &my_charset_latin1_bin || cs == &my_charset_utf8_bin) {
3936 // For _bin collations, mem-comparable form of the string is the string
3937 // itself.
3938
3939 if (is_varchar) {
3940 // VARCHARs - are compared as if they were space-padded - but are
3941 // not actually space-padded (reading the value back produces the
3942 // original value, without the padding)
3943 m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3944 m_skip_func = Rdb_key_def::skip_variable_space_pad;
3945 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3946 m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
3947 m_segment_size = get_segment_size_from_collation(cs);
3948 m_max_image_len =
3949 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3950 m_segment_size;
3951 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3952 &space_mb_len);
3953 } else {
3954 // SQL layer pads CHAR(N) values to their maximum length.
3955 // We just store that and restore it back.
3956 m_unpack_func = (cs == &my_charset_latin1_bin)
3957 ? Rdb_key_def::unpack_binary_str
3958 : Rdb_key_def::unpack_utf8_str;
3959 }
3960 res = true;
3961 } else {
3962 // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3963
3964 res = true; // index-only scans are possible
3965 m_unpack_data_len = is_varchar ? 0 : field->field_length;
3966 const uint idx = is_varchar ? 0 : 1;
3967 const Rdb_collation_codec *codec = nullptr;
3968
3969 if (is_varchar) {
3970 // VARCHAR requires space-padding for doing comparisons
3971 //
3972 // The check for cs->levels_for_order is to catch
3973 // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3974 // that Variable-Length Space Padded Encoding can't handle.
3975 // It is not expected to work for any other multi-level collations,
3976 // either.
3977 // Currently we handle these collations as NO_PAD, even if they have
3978 // PAD_SPACE attribute.
3979 if (cs->levels_for_order == 1) {
3980 m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3981 m_skip_func = Rdb_key_def::skip_variable_space_pad;
3982 m_segment_size = get_segment_size_from_collation(cs);
3983 m_max_image_len =
3984 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3985 m_segment_size;
3986 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3987 &space_mb_len);
3988 } else {
3989 // NO_LINT_DEBUG
3990 sql_print_warning(
3991 "RocksDB: you're trying to create an index "
3992 "with a multi-level collation %s",
3993 cs->name);
3994 // NO_LINT_DEBUG
3995 sql_print_warning(
3996 "MyRocks will handle this collation internally "
3997 " as if it had a NO_PAD attribute.");
3998 m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3999 m_skip_func = Rdb_key_def::skip_variable_length;
4000 }
4001 }
4002
4003 if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
4004 // The collation allows to store extra information in the unpack_info
4005 // which can be used to restore the original value from the
4006 // mem-comparable form.
4007 m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
4008 m_unpack_func = codec->m_unpack_func[idx];
4009 m_charset_codec = codec;
4010 } else if (use_unknown_collation) {
4011 // We have no clue about how this collation produces mem-comparable
4012 // form. Our way of restoring the original value is to keep a copy of
4013 // the original value in unpack_info.
4014 m_unpack_info_stores_value = true;
4015 m_make_unpack_info_func = is_varchar
4016 ? Rdb_key_def::make_unpack_unknown_varchar
4017 : Rdb_key_def::make_unpack_unknown;
4018 m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
4019 : Rdb_key_def::unpack_unknown;
4020 } else {
4021 // Same as above: we don't know how to restore the value from its
4022 // mem-comparable form.
4023 // Here, we just indicate to the SQL layer we can't do it.
4024 assert(m_unpack_func == nullptr);
4025 m_unpack_info_stores_value = false;
4026 res = false; // Indicate that index-only reads are not possible
4027 }
4028 }
4029
4030 // Make an adjustment: if this column is partially covered, tell the SQL
4031 // layer we can't do index-only scans. Later when we perform an index read,
4032 // we'll check on a record-by-record basis if we can do an index-only scan
4033 // or not.
4034 uint field_length;
4035 if (field->table) {
4036 field_length = field->table->field[field->field_index]->field_length;
4037 } else {
4038 field_length = field->field_length;
4039 }
4040
4041 if (field_length != key_length) {
4042 res = false;
4043 // If this index doesn't support covered bitmaps, then we won't know
4044 // during a read if the column is actually covered or not. If so, we need
4045 // to assume the column isn't covered and skip it during unpacking.
4046 //
4047 // If key_descr == NULL, then this is a dummy field and we probably don't
4048 // need to perform this step. However, to preserve the behavior before
4049 // this change, we'll only skip this step if we have an index which
4050 // supports covered bitmaps.
4051 if (!key_descr || !key_descr->use_covered_bitmap_format()) {
4052 m_unpack_func = nullptr;
4053 m_make_unpack_info_func = nullptr;
4054 m_unpack_info_stores_value = true;
4055 }
4056 }
4057 }
4058
4059 m_covered = res;
4060 return res;
4061 }
4062
get_field_in_table(const TABLE * const tbl) const4063 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
4064 return tbl->key_info[m_keynr].key_part[m_key_part].field;
4065 }
4066
fill_hidden_pk_val(uchar ** dst,const longlong hidden_pk_id) const4067 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
4068 const longlong hidden_pk_id) const {
4069 assert(m_max_image_len == 8);
4070
4071 String to;
4072 rdb_netstr_append_uint64(&to, hidden_pk_id);
4073 memcpy(*dst, to.ptr(), m_max_image_len);
4074
4075 *dst += m_max_image_len;
4076 }
4077
4078 ///////////////////////////////////////////////////////////////////////////////////////////
4079 // Rdb_ddl_manager
4080 ///////////////////////////////////////////////////////////////////////////////////////////
4081
~Rdb_tbl_def()4082 Rdb_tbl_def::~Rdb_tbl_def() {
4083 auto ddl_manager = rdb_get_ddl_manager();
4084 /* Don't free key definitions */
4085 if (m_key_descr_arr) {
4086 for (uint i = 0; i < m_key_count; i++) {
4087 if (ddl_manager && m_key_descr_arr[i]) {
4088 ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
4089 }
4090
4091 m_key_descr_arr[i] = nullptr;
4092 }
4093
4094 delete[] m_key_descr_arr;
4095 m_key_descr_arr = nullptr;
4096 }
4097 }
4098
4099 /*
4100 Put table definition DDL entry. Actual write is done at
4101 Rdb_dict_manager::commit.
4102
4103 We write
4104 dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
4105
4106 Where key entries are a tuple of
4107 ( cf_id, index_nr )
4108 */
4109
put_dict(Rdb_dict_manager * const dict,Rdb_cf_manager * cf_manager,rocksdb::WriteBatch * const batch,const rocksdb::Slice & key)4110 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
4111 Rdb_cf_manager *cf_manager,
4112 rocksdb::WriteBatch *const batch,
4113 const rocksdb::Slice &key) {
4114 StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
4115 indexes.alloc(Rdb_key_def::VERSION_SIZE +
4116 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
4117 rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
4118
4119 for (uint i = 0; i < m_key_count; i++) {
4120 const Rdb_key_def &kd = *m_key_descr_arr[i];
4121
4122 const uint cf_id = kd.get_cf()->GetID();
4123 /*
4124 If cf_id already exists, cf_flags must be the same.
4125 To prevent race condition, reading/modifying/committing CF flags
4126 need to be protected by mutex (dict_manager->lock()).
4127 When RocksDB supports transaction with pessimistic concurrency
4128 control, we can switch to use it and removing mutex.
4129 */
4130 const std::string cf_name = kd.get_cf()->GetName();
4131
4132 std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
4133 cf_manager->get_cf(cf_name);
4134
4135 if (!cfh || cfh != kd.get_shared_cf() || dict->get_dropped_cf(cf_id)) {
4136 // The CF has been dropped, i.e., cf_manager.remove_dropped_cf() has been
4137 // called; or the CF is being dropped, i.e., cf_manager.drop_cf() has
4138 // been called.
4139 my_error(ER_CF_DROPPED, MYF(0), cf_name.c_str());
4140 return true;
4141 }
4142
4143 rdb_netstr_append_uint32(&indexes, cf_id);
4144
4145 uint32 index_number = kd.get_index_number();
4146 rdb_netstr_append_uint32(&indexes, index_number);
4147
4148 struct Rdb_index_info index_info;
4149 index_info.m_gl_index_id = {cf_id, index_number};
4150 index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
4151 index_info.m_index_type = kd.m_index_type;
4152 index_info.m_kv_version = kd.m_kv_format_version;
4153 index_info.m_index_flags = kd.m_index_flags_bitmap;
4154 index_info.m_ttl_duration = kd.m_ttl_duration;
4155
4156 dict->add_or_update_index_cf_mapping(batch, &index_info);
4157 }
4158
4159 const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
4160
4161 dict->put_key(batch, key, svalue);
4162 return false;
4163 }
4164
get_create_time()4165 time_t Rdb_tbl_def::get_create_time() {
4166 time_t create_time = m_create_time;
4167
4168 if (create_time == CREATE_TIME_UNKNOWN) {
4169 // Read it from the .frm file. It's not a problem if several threads do this
4170 // concurrently
4171 char path[FN_REFLEN];
4172 snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
4173 m_dbname.c_str(), m_tablename.c_str(), reg_ext);
4174 unpack_filename(path, path);
4175 MY_STAT f_stat;
4176 if (my_stat(path, &f_stat, MYF(0)))
4177 create_time = f_stat.st_ctime;
4178 else
4179 create_time = 0; // will be shown as SQL NULL
4180 m_create_time = create_time;
4181 }
4182 return create_time;
4183 }
4184
4185 // Length that each index flag takes inside the record.
4186 // Each index in the array maps to the enum INDEX_FLAG
4187 static const std::array<uint, 1> index_flag_lengths = {
4188 {ROCKSDB_SIZEOF_TTL_RECORD}};
4189
has_index_flag(uint32 index_flags,enum INDEX_FLAG flag)4190 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
4191 return flag & index_flags;
4192 }
4193
calculate_index_flag_offset(uint32 index_flags,enum INDEX_FLAG flag,uint * const length)4194 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
4195 enum INDEX_FLAG flag,
4196 uint *const length) {
4197 assert_IMP(flag != MAX_FLAG,
4198 Rdb_key_def::has_index_flag(index_flags, flag));
4199
4200 uint offset = 0;
4201 for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
4202 int mask = 1 << bit;
4203
4204 /* Exit once we've reached the proper flag */
4205 if (flag & mask) {
4206 if (length != nullptr) {
4207 *length = index_flag_lengths[bit];
4208 }
4209 break;
4210 }
4211
4212 if (index_flags & mask) {
4213 offset += index_flag_lengths[bit];
4214 }
4215 }
4216
4217 return offset;
4218 }
4219
write_index_flag_field(Rdb_string_writer * const buf,const uchar * const val,enum INDEX_FLAG flag) const4220 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
4221 const uchar *const val,
4222 enum INDEX_FLAG flag) const {
4223 uint len;
4224 uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
4225 assert(offset + len <= buf->get_current_pos());
4226 memcpy(buf->ptr() + offset, val, len);
4227 }
4228
check_if_is_mysql_system_table()4229 void Rdb_tbl_def::check_if_is_mysql_system_table() {
4230 static const char *const system_dbs[] = {
4231 "mysql",
4232 "performance_schema",
4233 "information_schema",
4234 };
4235
4236 m_is_mysql_system_table = false;
4237 for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
4238 if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
4239 m_is_mysql_system_table = true;
4240 break;
4241 }
4242 }
4243 }
4244
check_and_set_read_free_rpl_table()4245 void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
4246 m_is_read_free_rpl_table =
4247 rdb_read_free_regex_handler.match(base_tablename());
4248 }
4249
set_name(const std::string & name)4250 void Rdb_tbl_def::set_name(const std::string &name) {
4251 int err MY_ATTRIBUTE((__unused__));
4252
4253 m_dbname_tablename = name;
4254 err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
4255 &m_partition);
4256 assert(err == 0);
4257
4258 check_if_is_mysql_system_table();
4259 }
4260
get_autoincr_gl_index_id()4261 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
4262 for (uint i = 0; i < m_key_count; i++) {
4263 auto &k = m_key_descr_arr[i];
4264 if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
4265 k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
4266 return k->get_gl_index_id();
4267 }
4268 }
4269
4270 // Every table must have a primary key, even if it's hidden.
4271 abort();
4272 return GL_INDEX_ID();
4273 }
4274
erase_index_num(const GL_INDEX_ID & gl_index_id)4275 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
4276 m_index_num_to_keydef.erase(gl_index_id);
4277 }
4278
add_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)4279 void Rdb_ddl_manager::add_uncommitted_keydefs(
4280 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
4281 mysql_rwlock_wrlock(&m_rwlock);
4282 for (const auto &index : indexes) {
4283 m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
4284 }
4285 mysql_rwlock_unlock(&m_rwlock);
4286 }
4287
remove_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)4288 void Rdb_ddl_manager::remove_uncommitted_keydefs(
4289 const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
4290 mysql_rwlock_wrlock(&m_rwlock);
4291 for (const auto &index : indexes) {
4292 m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
4293 }
4294 mysql_rwlock_unlock(&m_rwlock);
4295 }
4296
find_in_uncommitted_keydef(const uint32_t & cf_id)4297 int Rdb_ddl_manager::find_in_uncommitted_keydef(const uint32_t &cf_id) {
4298 mysql_rwlock_rdlock(&m_rwlock);
4299 for (const auto &pr : m_index_num_to_uncommitted_keydef) {
4300 const auto &kd = pr.second;
4301
4302 if (kd->get_cf()->GetID() == cf_id) {
4303 mysql_rwlock_unlock(&m_rwlock);
4304 return HA_EXIT_FAILURE;
4305 }
4306 }
4307
4308 mysql_rwlock_unlock(&m_rwlock);
4309 return HA_EXIT_SUCCESS;
4310 }
4311
4312 #if defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) && ROCKSDB_INCLUDE_VALIDATE_TABLES
4313 namespace // anonymous namespace = not visible outside this source file
4314 {
4315 struct Rdb_validate_tbls : public Rdb_tables_scanner {
4316 using tbl_info_t = std::pair<std::string, bool>;
4317 using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
4318
4319 tbl_list_t m_list;
4320
4321 int add_table(Rdb_tbl_def *tdef) override;
4322
4323 bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
4324
4325 bool scan_for_frms(const std::string &datadir, const std::string &dbname,
4326 bool *has_errors);
4327
4328 bool check_frm_file(const std::string &fullpath, const std::string &dbname,
4329 const std::string &tablename, bool *has_errors);
4330 };
4331 } // anonymous namespace
4332
4333 /*
4334 Get a list of tables that we expect to have .frm files for. This will use the
4335 information just read from the RocksDB data dictionary.
4336 */
add_table(Rdb_tbl_def * tdef)4337 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
4338 assert(tdef != nullptr);
4339
4340 /*
4341 Add the database/table into the list that are not temp table.
4342 Also skip over truncate temp table.
4343 */
4344 if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos &&
4345 tdef->base_tablename().find(TRUNCATE_TABLE_PREFIX) == std::string::npos) {
4346 bool is_partition = tdef->base_partition().size() != 0;
4347 m_list[tdef->base_dbname()].insert(
4348 tbl_info_t(tdef->base_tablename(), is_partition));
4349 }
4350
4351 return HA_EXIT_SUCCESS;
4352 }
4353
4354 /*
4355 Access the .frm file for this dbname/tablename and see if it is a RocksDB
4356 table (or partition table).
4357 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)4358 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
4359 const std::string &dbname,
4360 const std::string &tablename,
4361 bool *has_errors) {
4362 /* Check this .frm file to see what engine it uses */
4363 String fullfilename(fullpath.c_str(), &my_charset_bin);
4364 fullfilename.append(FN_DIRSEP);
4365 fullfilename.append(tablename.c_str());
4366 fullfilename.append(".frm");
4367
4368 /*
4369 This function will return the legacy_db_type of the table. Currently
4370 it does not reference the first parameter (THD* thd), but if it ever
4371 did in the future we would need to make a version that does it without
4372 the connection handle as we don't have one here.
4373 */
4374 enum legacy_db_type eng_type;
4375 frm_type_enum type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type);
4376 if (type == FRMTYPE_ERROR) {
4377 // NO_LINT_DEBUG
4378 sql_print_warning("RocksDB: Failed to open/read .from file: %s",
4379 fullfilename.ptr());
4380 return false;
4381 }
4382
4383 std::string partition_info_str;
4384 if (!native_part::get_part_str_for_path(fullfilename.c_ptr(),
4385 partition_info_str)) {
4386 sql_print_warning("RocksDB: can't read partition info string from %s",
4387 fullfilename.ptr());
4388 return false;
4389 }
4390
4391 if (!partition_info_str.empty()) eng_type = DB_TYPE_PARTITION_DB;
4392
4393 if (type == FRMTYPE_TABLE) {
4394 /* For a RocksDB table do we have a reference in the data dictionary? */
4395 if (eng_type == DB_TYPE_ROCKSDB) {
4396 /*
4397 Attempt to remove the table entry from the list of tables. If this
4398 fails then we know we had a .frm file that wasn't registered in RocksDB.
4399 */
4400 tbl_info_t element(tablename, false);
4401 if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
4402 sql_print_warning(
4403 "RocksDB: Schema mismatch - "
4404 "A .frm file exists for table %s.%s, "
4405 "but that table is not registered in RocksDB",
4406 dbname.c_str(), tablename.c_str());
4407 *has_errors = true;
4408 }
4409 } else if (eng_type == DB_TYPE_PARTITION_DB) {
4410 /*
4411 For partition tables, see if it is in the m_list as a partition,
4412 but don't generate an error if it isn't there - we don't know that the
4413 .frm is for RocksDB.
4414 */
4415 if (m_list.count(dbname) > 0) {
4416 m_list[dbname].erase(tbl_info_t(tablename, true));
4417 }
4418 }
4419 }
4420
4421 return true;
4422 }
4423
4424 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)4425 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
4426 const std::string &dbname,
4427 bool *has_errors) {
4428 bool result = true;
4429 std::string fullpath = datadir + dbname;
4430 struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
4431
4432 /* Access the directory */
4433 if (dir_info == nullptr) {
4434 // NO_LINT_DEBUG
4435 sql_print_warning("RocksDB: Could not open database directory: %s",
4436 fullpath.c_str());
4437 return false;
4438 }
4439
4440 /* Scan through the files in the directory */
4441 struct fileinfo *file_info = dir_info->dir_entry;
4442 for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
4443 /* Find .frm files that are not temp files (those that contain '#sql') */
4444 const char *ext = strrchr(file_info->name, '.');
4445 if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
4446 strcmp(ext, ".frm") == 0) {
4447 std::string tablename =
4448 std::string(file_info->name, ext - file_info->name);
4449
4450 /* Check to see if the .frm file is from RocksDB */
4451 if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
4452 result = false;
4453 break;
4454 }
4455 }
4456 }
4457
4458 /* Remove any databases who have no more tables listed */
4459 if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
4460 m_list.erase(dbname);
4461 }
4462
4463 /* Release the directory entry */
4464 my_dirend(dir_info);
4465
4466 return result;
4467 }
4468
4469 /*
4470 Scan the datadir for all databases (subdirectories) and get a list of .frm
4471 files they contain
4472 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)4473 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
4474 bool *has_errors) {
4475 bool result = true;
4476 struct st_my_dir *dir_info;
4477 struct fileinfo *file_info;
4478
4479 dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
4480 if (dir_info == nullptr) {
4481 // NO_LINT_DEBUG
4482 sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
4483 return false;
4484 }
4485
4486 file_info = dir_info->dir_entry;
4487 for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
4488 /* Ignore files/dirs starting with '.' */
4489 if (file_info->name[0] == '.') continue;
4490
4491 /* Ignore all non-directory files */
4492 if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
4493
4494 /* Scan all the .frm files in the directory */
4495 if (!scan_for_frms(datadir, file_info->name, has_errors)) {
4496 result = false;
4497 break;
4498 }
4499 }
4500
4501 /* Release the directory info */
4502 my_dirend(dir_info);
4503
4504 return result;
4505 }
4506
4507 /*
4508 Validate that all auto increment values in the data dictionary are on a
4509 supported version.
4510 */
validate_auto_incr()4511 bool Rdb_ddl_manager::validate_auto_incr() {
4512 std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
4513
4514 uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4515 rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
4516 const rocksdb::Slice auto_incr_entry_slice(
4517 reinterpret_cast<char *>(auto_incr_entry),
4518 Rdb_key_def::INDEX_NUMBER_SIZE);
4519 for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
4520 const rocksdb::Slice key = it->key();
4521 const rocksdb::Slice val = it->value();
4522 GL_INDEX_ID gl_index_id;
4523
4524 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4525 memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4526 break;
4527 }
4528
4529 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
4530 return false;
4531 }
4532
4533 if (val.size() <= Rdb_key_def::VERSION_SIZE) {
4534 return false;
4535 }
4536
4537 // Check if we have orphaned entries for whatever reason by cross
4538 // referencing ddl entries.
4539 auto ptr = reinterpret_cast<const uchar *>(key.data());
4540 ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
4541 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4542 if (!m_dict->get_index_info(gl_index_id, nullptr)) {
4543 // NO_LINT_DEBUG
4544 sql_print_warning(
4545 "RocksDB: AUTOINC mismatch - "
4546 "Index number (%u, %u) found in AUTOINC "
4547 "but does not exist as a DDL entry for table %s",
4548 gl_index_id.cf_id, gl_index_id.index_id,
4549 safe_get_table_name(gl_index_id).c_str());
4550 return false;
4551 }
4552
4553 ptr = reinterpret_cast<const uchar *>(val.data());
4554 const int version = rdb_netbuf_read_uint16(&ptr);
4555 if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
4556 // NO_LINT_DEBUG
4557 sql_print_warning(
4558 "RocksDB: AUTOINC mismatch - "
4559 "Index number (%u, %u) found in AUTOINC "
4560 "is on unsupported version %d for table %s",
4561 gl_index_id.cf_id, gl_index_id.index_id, version,
4562 safe_get_table_name(gl_index_id).c_str());
4563 return false;
4564 }
4565 }
4566
4567 if (!it->status().ok()) {
4568 return false;
4569 }
4570
4571 return true;
4572 }
4573
4574 /*
4575 Validate that all the tables in the RocksDB database dictionary match the .frm
4576 files in the datadir
4577 */
validate_schemas(void)4578 bool Rdb_ddl_manager::validate_schemas(void) {
4579 bool has_errors = false;
4580 const std::string datadir = std::string(mysql_real_data_home);
4581 Rdb_validate_tbls table_list;
4582
4583 /* Get the list of tables from the database dictionary */
4584 if (scan_for_tables(&table_list) != 0) {
4585 return false;
4586 }
4587
4588 /* Compare that to the list of actual .frm files */
4589 if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
4590 return false;
4591 }
4592
4593 /*
4594 Any tables left in the tables list are ones that are registered in RocksDB
4595 but don't have .frm files.
4596 */
4597 for (const auto &db : table_list.m_list) {
4598 for (const auto &table : db.second) {
4599 sql_print_warning(
4600 "RocksDB: Schema mismatch - "
4601 "Table %s.%s is registered in RocksDB "
4602 "but does not have a .frm file",
4603 db.first.c_str(), table.first.c_str());
4604 has_errors = true;
4605 }
4606 }
4607
4608 return !has_errors;
4609 }
4610 #endif // defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) &&
4611 // ROCKSDB_INCLUDE_VALIDATE_TABLES
4612
4613 #if defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) && ROCKSDB_INCLUDE_VALIDATE_TABLES
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t validate_tables)4614 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4615 Rdb_cf_manager *const cf_manager,
4616 const uint32_t validate_tables) {
4617 #else
4618 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4619 Rdb_cf_manager *const cf_manager) {
4620 #endif // defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) &&
4621 // ROCKSDB_INCLUDE_VALIDATE_TABLES
4622 m_dict = dict_arg;
4623 m_cf_manager = cf_manager;
4624 mysql_rwlock_init(0, &m_rwlock);
4625
4626 /* Read the data dictionary and populate the hash */
4627 uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4628 rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4629 const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
4630 Rdb_key_def::INDEX_NUMBER_SIZE);
4631
4632 /* Reading data dictionary should always skip bloom filter */
4633 rocksdb::Iterator *it = m_dict->new_iterator();
4634 int i = 0;
4635
4636 uint max_index_id_in_dict = 0;
4637 m_dict->get_max_index_id(&max_index_id_in_dict);
4638
4639 for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
4640 const uchar *ptr;
4641 const uchar *ptr_end;
4642 const rocksdb::Slice key = it->key();
4643 const rocksdb::Slice val = it->value();
4644
4645 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4646 memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4647 break;
4648 }
4649
4650 if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
4651 sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
4652 (int)key.size());
4653 return true;
4654 }
4655
4656 Rdb_tbl_def *const tdef =
4657 new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
4658
4659 // Now, read the DDLs.
4660 const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
4661 if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
4662 sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
4663 tdef->full_tablename().c_str());
4664 return true;
4665 }
4666 tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
4667 tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
4668
4669 ptr = reinterpret_cast<const uchar *>(val.data());
4670 const int version = rdb_netbuf_read_uint16(&ptr);
4671 if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
4672 sql_print_error(
4673 "RocksDB: DDL ENTRY Version was not expected."
4674 "Expected: %d, Actual: %d",
4675 Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
4676 return true;
4677 }
4678 ptr_end = ptr + real_val_size;
4679 for (uint keyno = 0; ptr < ptr_end; keyno++) {
4680 GL_INDEX_ID gl_index_id;
4681 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4682 uint flags = 0;
4683 struct Rdb_index_info index_info;
4684 if (!m_dict->get_index_info(gl_index_id, &index_info)) {
4685 sql_print_error(
4686 "RocksDB: Could not get index information "
4687 "for Index Number (%u,%u), table %s",
4688 gl_index_id.cf_id, gl_index_id.index_id,
4689 tdef->full_tablename().c_str());
4690 return true;
4691 }
4692 if (max_index_id_in_dict < gl_index_id.index_id) {
4693 sql_print_error(
4694 "RocksDB: Found max index id %u from data dictionary "
4695 "but also found larger index id %u from dictionary. "
4696 "This should never happen and possibly a bug.",
4697 max_index_id_in_dict, gl_index_id.index_id);
4698 return true;
4699 }
4700 if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
4701 sql_print_error(
4702 "RocksDB: Could not get Column Family Flags "
4703 "for CF Number %d, table %s",
4704 gl_index_id.cf_id, tdef->full_tablename().c_str());
4705 return true;
4706 }
4707
4708 if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
4709 // The per-index cf option is deprecated. Make sure we don't have the
4710 // flag set in any existing database. NO_LINT_DEBUG
4711 sql_print_error(
4712 "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
4713 "number %d, table %s",
4714 gl_index_id.cf_id, tdef->full_tablename().c_str());
4715 }
4716
4717 std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
4718 cf_manager->get_cf(gl_index_id.cf_id);
4719 assert(cfh);
4720
4721 uint32 ttl_rec_offset =
4722 Rdb_key_def::has_index_flag(index_info.m_index_flags,
4723 Rdb_key_def::TTL_FLAG)
4724 ? Rdb_key_def::calculate_index_flag_offset(
4725 index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
4726 : UINT_MAX;
4727
4728 /*
4729 We can't fully initialize Rdb_key_def object here, because full
4730 initialization requires that there is an open TABLE* where we could
4731 look at Field* objects and set max_length and other attributes
4732 */
4733 tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
4734 gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
4735 index_info.m_index_type, index_info.m_kv_version,
4736 flags & Rdb_key_def::REVERSE_CF_FLAG,
4737 flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
4738 m_dict->get_stats(gl_index_id), index_info.m_index_flags,
4739 ttl_rec_offset, index_info.m_ttl_duration);
4740 }
4741
4742 assert(tdef->m_key_count > 0);
4743 tdef->m_tbl_stats.set(
4744 tdef->m_key_count > 0 ? tdef->m_key_descr_arr[0]->m_stats.m_rows : 0, 0,
4745 0);
4746
4747 put(tdef);
4748 i++;
4749 }
4750
4751 #if defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) && ROCKSDB_INCLUDE_VALIDATE_TABLES
4752 /*
4753 If validate_tables is greater than 0 run the validation. Only fail the
4754 initialzation if the setting is 1. If the setting is 2 we continue.
4755 */
4756 if (validate_tables > 0) {
4757 std::string msg;
4758 if (!validate_schemas()) {
4759 msg =
4760 "RocksDB: Problems validating data dictionary "
4761 "against .frm files, exiting";
4762 } else if (!validate_auto_incr()) {
4763 msg =
4764 "RocksDB: Problems validating auto increment values in "
4765 "data dictionary, exiting";
4766 }
4767 if (validate_tables == 1 && !msg.empty()) {
4768 // NO_LINT_DEBUG
4769 sql_print_error(
4770 "%s. Use \"rocksdb_validate_tables=2\" to ignore this error.",
4771 msg.c_str());
4772 return true;
4773 }
4774 }
4775 #endif // defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) &&
4776 // ROCKSDB_INCLUDE_VALIDATE_TABLES
4777
4778 // index ids used by applications should not conflict with
4779 // data dictionary index ids
4780 if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4781 max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4782 }
4783
4784 m_sequence.init(max_index_id_in_dict + 1);
4785
4786 if (!it->status().ok()) {
4787 rdb_log_status_error(it->status(), "Table_store load error");
4788 return true;
4789 }
4790 delete it;
4791 // NO_LINT_DEBUG
4792 sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4793 i);
4794 return false;
4795 }
4796
4797 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4798 const bool lock) {
4799 Rdb_tbl_def *rec = nullptr;
4800
4801 if (lock) {
4802 mysql_rwlock_rdlock(&m_rwlock);
4803 }
4804
4805 const auto &it = m_ddl_map.find(table_name);
4806 if (it != m_ddl_map.end()) {
4807 rec = it->second;
4808 }
4809
4810 if (lock) {
4811 mysql_rwlock_unlock(&m_rwlock);
4812 }
4813
4814 return rec;
4815 }
4816
4817 int Rdb_ddl_manager::find_indexes(const std::string &table_name,
4818 std::vector<GL_INDEX_ID> *indexes) {
4819 mysql_rwlock_rdlock(&m_rwlock);
4820
4821 Rdb_tbl_def *tdef = nullptr;
4822 const auto it = m_ddl_map.find(table_name);
4823 if (it != m_ddl_map.end()) {
4824 tdef = it->second;
4825 }
4826
4827 if (!tdef) {
4828 mysql_rwlock_unlock(&m_rwlock);
4829 return HA_EXIT_FAILURE;
4830 }
4831
4832 for (uint i = 0; i < tdef->m_key_count; i++) {
4833 indexes->push_back(tdef->m_key_descr_arr[i]->get_gl_index_id());
4834 }
4835
4836 mysql_rwlock_unlock(&m_rwlock);
4837
4838 return HA_EXIT_SUCCESS;
4839 }
4840
4841 int Rdb_ddl_manager::find_table_stats(const std::string &table_name,
4842 Rdb_table_stats *tbl_stats) {
4843 mysql_rwlock_rdlock(&m_rwlock);
4844
4845 Rdb_tbl_def *tdef = nullptr;
4846 const auto it = m_ddl_map.find(table_name);
4847 if (it != m_ddl_map.end()) {
4848 tdef = it->second;
4849 }
4850
4851 if (!tdef) {
4852 mysql_rwlock_unlock(&m_rwlock);
4853 return HA_EXIT_FAILURE;
4854 }
4855
4856 *tbl_stats = tdef->m_tbl_stats;
4857
4858 mysql_rwlock_unlock(&m_rwlock);
4859
4860 return HA_EXIT_SUCCESS;
4861 }
4862
4863 // this is a safe version of the find() function below. It acquires a read
4864 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4865 // are finding it. Copying it into 'ret' increments the count making sure
4866 // that the object will not be discarded until we are finished with it.
4867 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
4868 GL_INDEX_ID gl_index_id) {
4869 std::shared_ptr<const Rdb_key_def> ret(nullptr);
4870
4871 mysql_rwlock_rdlock(&m_rwlock);
4872
4873 const auto it = m_index_num_to_keydef.find(gl_index_id);
4874 if (it != m_index_num_to_keydef.end()) {
4875 const auto table_def = find(it->second.first, false);
4876 if (table_def && it->second.second < table_def->m_key_count) {
4877 const auto &kd = table_def->m_key_descr_arr[it->second.second];
4878 if (kd->max_storage_fmt_length() != 0) {
4879 ret = kd;
4880 }
4881 }
4882 } else {
4883 const auto uncommitted_it =
4884 m_index_num_to_uncommitted_keydef.find(gl_index_id);
4885 if (uncommitted_it != m_index_num_to_uncommitted_keydef.end()) {
4886 const auto &kd = uncommitted_it->second;
4887 if (kd->max_storage_fmt_length() != 0) {
4888 ret = kd;
4889 }
4890 }
4891 }
4892
4893 mysql_rwlock_unlock(&m_rwlock);
4894
4895 return ret;
4896 }
4897
4898 // this method assumes at least read-only lock on m_rwlock
4899 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
4900 GL_INDEX_ID gl_index_id) {
4901 const auto it = m_index_num_to_keydef.find(gl_index_id);
4902 if (it != m_index_num_to_keydef.end()) {
4903 const auto table_def = find(it->second.first, false);
4904 if (table_def) {
4905 if (it->second.second < table_def->m_key_count) {
4906 return table_def->m_key_descr_arr[it->second.second];
4907 }
4908 }
4909 } else {
4910 const auto uncommitted_it =
4911 m_index_num_to_uncommitted_keydef.find(gl_index_id);
4912 if (uncommitted_it != m_index_num_to_uncommitted_keydef.end()) {
4913 return uncommitted_it->second;
4914 }
4915 }
4916
4917 static std::shared_ptr<Rdb_key_def> empty = nullptr;
4918
4919 return empty;
4920 }
4921
4922 // this method returns the name of the table based on an index id. It acquires
4923 // a read lock on m_rwlock.
4924 const std::string Rdb_ddl_manager::safe_get_table_name(
4925 const GL_INDEX_ID &gl_index_id) {
4926 std::string ret;
4927 mysql_rwlock_rdlock(&m_rwlock);
4928 auto it = m_index_num_to_keydef.find(gl_index_id);
4929 if (it != m_index_num_to_keydef.end()) {
4930 ret = it->second.first;
4931 }
4932 mysql_rwlock_unlock(&m_rwlock);
4933 return ret;
4934 }
4935
4936 void Rdb_ddl_manager::set_stats(
4937 const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4938 mysql_rwlock_wrlock(&m_rwlock);
4939 for (const auto &src : stats) {
4940 const auto &keydef = find(src.second.m_gl_index_id);
4941 if (keydef) {
4942 keydef->m_stats = src.second;
4943 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4944 }
4945 }
4946 mysql_rwlock_unlock(&m_rwlock);
4947 }
4948
4949 void Rdb_ddl_manager::adjust_stats(
4950 const std::vector<Rdb_index_stats> &new_data,
4951 const std::vector<Rdb_index_stats> &deleted_data) {
4952 mysql_rwlock_wrlock(&m_rwlock);
4953 int i = 0;
4954 for (const auto &data : {new_data, deleted_data}) {
4955 for (const auto &src : data) {
4956 const auto &keydef = find(src.m_gl_index_id);
4957 if (keydef) {
4958 keydef->m_stats.m_distinct_keys_per_prefix.resize(
4959 keydef->get_key_parts());
4960 keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4961 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4962 }
4963 }
4964 i++;
4965 }
4966 const bool should_save_stats = !m_stats2store.empty();
4967 mysql_rwlock_unlock(&m_rwlock);
4968 if (should_save_stats) {
4969 // Queue an async persist_stats(false) call to the background thread.
4970 rdb_queue_save_stats_request();
4971 }
4972 }
4973
4974 void Rdb_ddl_manager::persist_stats(const bool sync) {
4975 mysql_rwlock_wrlock(&m_rwlock);
4976 const auto local_stats2store = std::move(m_stats2store);
4977 m_stats2store.clear();
4978 mysql_rwlock_unlock(&m_rwlock);
4979
4980 // Persist stats
4981 const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4982 std::vector<Rdb_index_stats> stats;
4983 std::transform(local_stats2store.begin(), local_stats2store.end(),
4984 std::back_inserter(stats),
4985 [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4986 return s.second;
4987 });
4988 m_dict->add_stats(wb.get(), stats);
4989 m_dict->commit(wb.get(), sync);
4990 }
4991
4992 void Rdb_ddl_manager::set_table_stats(const std::string &tbl_name) {
4993 timespec ts;
4994 clock_gettime(CLOCK_REALTIME, &ts);
4995
4996 mysql_rwlock_rdlock(&m_rwlock);
4997 const auto &tbl_def = find(tbl_name, false /* needs lock */);
4998 if (tbl_def) {
4999 assert(tbl_def->m_key_count > 0);
5000 // Take the number of rows of the first index as the number of rows of
5001 // the table. This is an estimated value.
5002 tbl_def->m_tbl_stats.set(tbl_def->m_key_count > 0
5003 ? tbl_def->m_key_descr_arr[0]->m_stats.m_rows
5004 : 0,
5005 0, ts.tv_sec);
5006 }
5007 mysql_rwlock_unlock(&m_rwlock);
5008 }
5009
5010 /*
5011 Put table definition of `tbl` into the mapping, and also write it to the
5012 on-disk data dictionary.
5013 */
5014
5015 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
5016 rocksdb::WriteBatch *const batch) {
5017 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
5018
5019 buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
5020
5021 const std::string &dbname_tablename = tbl->full_tablename();
5022 buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
5023
5024 int res;
5025 if ((res =
5026 tbl->put_dict(m_dict, m_cf_manager, batch, buf_writer.to_slice()))) {
5027 return res;
5028 }
5029 if ((res = put(tbl))) {
5030 return res;
5031 }
5032 return HA_EXIT_SUCCESS;
5033 }
5034
5035 /* Return 0 - ok, other value - error */
5036 /* TODO:
5037 This function modifies m_ddl_map and m_index_num_to_keydef.
5038 However, these changes need to be reversed if dict_manager.commit fails
5039 See the discussion here: https://reviews.facebook.net/D35925#inline-259167
5040 Tracked by https://github.com/facebook/mysql-5.6/issues/33
5041 */
5042 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
5043 const std::string &dbname_tablename = tbl->full_tablename();
5044
5045 if (lock) mysql_rwlock_wrlock(&m_rwlock);
5046
5047 // We have to do this find because 'tbl' is not yet in the list. We need
5048 // to find the one we are replacing ('rec')
5049 const auto &it = m_ddl_map.find(dbname_tablename);
5050 if (it != m_ddl_map.end()) {
5051 delete it->second;
5052 m_ddl_map.erase(it);
5053 }
5054 m_ddl_map.emplace(dbname_tablename, tbl);
5055
5056 for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
5057 m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
5058 std::make_pair(dbname_tablename, keyno);
5059 }
5060 tbl->check_and_set_read_free_rpl_table();
5061
5062 if (lock) mysql_rwlock_unlock(&m_rwlock);
5063 return 0;
5064 }
5065
5066 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
5067 rocksdb::WriteBatch *const batch,
5068 const bool lock) {
5069 if (lock) mysql_rwlock_wrlock(&m_rwlock);
5070
5071 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
5072 key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
5073 const std::string &dbname_tablename = tbl->full_tablename();
5074 key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
5075
5076 m_dict->delete_key(batch, key_writer.to_slice());
5077
5078 const auto &it = m_ddl_map.find(dbname_tablename);
5079 if (it != m_ddl_map.end()) {
5080 // Free Rdb_tbl_def
5081 delete it->second;
5082
5083 m_ddl_map.erase(it);
5084 }
5085
5086 if (lock) mysql_rwlock_unlock(&m_rwlock);
5087 }
5088
5089 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
5090 rocksdb::WriteBatch *const batch) {
5091 Rdb_tbl_def *rec;
5092 Rdb_tbl_def *new_rec;
5093 bool res = true;
5094 Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
5095
5096 mysql_rwlock_wrlock(&m_rwlock);
5097 if (!(rec = find(from, false))) {
5098 mysql_rwlock_unlock(&m_rwlock);
5099 return true;
5100 }
5101
5102 new_rec = new Rdb_tbl_def(to);
5103
5104 new_rec->m_key_count = rec->m_key_count;
5105 new_rec->m_auto_incr_val =
5106 rec->m_auto_incr_val.load(std::memory_order_relaxed);
5107 new_rec->m_key_descr_arr = rec->m_key_descr_arr;
5108
5109 new_rec->m_hidden_pk_val =
5110 rec->m_hidden_pk_val.load(std::memory_order_relaxed);
5111
5112 new_rec->m_tbl_stats = rec->m_tbl_stats;
5113
5114 // so that it's not free'd when deleting the old rec
5115 rec->m_key_descr_arr = nullptr;
5116
5117 // Create a new key
5118 new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
5119
5120 const std::string &dbname_tablename = new_rec->full_tablename();
5121 new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
5122
5123 // Create a key to add
5124 if (!new_rec->put_dict(m_dict, m_cf_manager, batch,
5125 new_buf_writer.to_slice())) {
5126 remove(rec, batch, false);
5127 put(new_rec, false);
5128 res = false; // ok
5129 }
5130
5131 mysql_rwlock_unlock(&m_rwlock);
5132 return res;
5133 }
5134
5135 void Rdb_ddl_manager::cleanup() {
5136 for (const auto &kv : m_ddl_map) {
5137 delete kv.second;
5138 }
5139 m_ddl_map.clear();
5140
5141 mysql_rwlock_destroy(&m_rwlock);
5142 m_sequence.cleanup();
5143 }
5144
5145 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
5146 int ret;
5147
5148 assert(tables_scanner != nullptr);
5149
5150 // This method should NOT accquire dict_manager lock and
5151 // cf_manager lock in order to prevent deadlocks.
5152 mysql_rwlock_rdlock(&m_rwlock);
5153
5154 ret = 0;
5155
5156 for (const auto &kv : m_ddl_map) {
5157 ret = tables_scanner->add_table(kv.second);
5158 if (ret) break;
5159 }
5160
5161 mysql_rwlock_unlock(&m_rwlock);
5162 return ret;
5163 }
5164
5165 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
5166 Rdb_cf_manager *const cf_manager,
5167 const my_bool enable_remove_orphaned_dropped_cfs) {
5168 assert(rdb_dict != nullptr);
5169 assert(cf_manager != nullptr);
5170
5171 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
5172
5173 m_db = rdb_dict;
5174
5175 // It is safe to get raw pointers here since:
5176 // 1. System CF and default CF cannot be dropped
5177 // 2. cf_manager outlives dict_manager
5178 m_system_cfh =
5179 cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME, true).get();
5180 rocksdb::ColumnFamilyHandle *default_cfh =
5181 cf_manager->get_cf(DEFAULT_CF_NAME).get();
5182
5183 // System CF and default CF should be initialized
5184 if (m_system_cfh == nullptr || default_cfh == nullptr) {
5185 return HA_EXIT_FAILURE;
5186 }
5187
5188 rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
5189
5190 m_key_slice_max_index_id =
5191 rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
5192 Rdb_key_def::INDEX_NUMBER_SIZE);
5193
5194 resume_drop_indexes();
5195 rollback_ongoing_index_creation();
5196
5197 // Initialize system CF and default CF flags
5198 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5199 rocksdb::WriteBatch *const batch = wb.get();
5200
5201 add_cf_flags(batch, m_system_cfh->GetID(), 0);
5202 add_cf_flags(batch, default_cfh->GetID(), 0);
5203 commit(batch);
5204
5205 if (add_missing_cf_flags(cf_manager)) {
5206 return HA_EXIT_FAILURE;
5207 }
5208
5209 if (remove_orphaned_dropped_cfs(cf_manager,
5210 enable_remove_orphaned_dropped_cfs)) {
5211 return HA_EXIT_FAILURE;
5212 }
5213
5214 return HA_EXIT_SUCCESS;
5215 }
5216
5217 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
5218 return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
5219 }
5220
5221 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
5222 const rocksdb::Slice &key,
5223 const rocksdb::Slice &value) const {
5224 batch->Put(m_system_cfh, key, value);
5225 }
5226
5227 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
5228 std::string *const value) const {
5229 rocksdb::ReadOptions options;
5230 options.total_order_seek = true;
5231 return m_db->Get(options, m_system_cfh, key, value);
5232 }
5233
5234 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
5235 const rocksdb::Slice &key) const {
5236 batch->Delete(m_system_cfh, key);
5237 }
5238
5239 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
5240 /* Reading data dictionary should always skip bloom filter */
5241 rocksdb::ReadOptions read_options;
5242 read_options.total_order_seek = true;
5243 return m_db->NewIterator(read_options, m_system_cfh);
5244 }
5245
5246 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
5247 const bool sync) const {
5248 if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
5249 int res = HA_EXIT_SUCCESS;
5250 rocksdb::WriteOptions options;
5251 options.sync = sync;
5252 rocksdb::TransactionDBWriteOptimizations optimize;
5253 optimize.skip_concurrency_control = true;
5254 rocksdb::Status s = m_db->Write(options, optimize, batch);
5255 res = !s.ok(); // we return true when something failed
5256 if (res) {
5257 rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
5258 }
5259 batch->Clear();
5260 return res;
5261 }
5262
5263 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
5264 Rdb_key_def::DATA_DICT_TYPE dict_type,
5265 const GL_INDEX_ID &gl_index_id) {
5266 rdb_netbuf_store_uint32(netbuf, dict_type);
5267 rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
5268 gl_index_id.cf_id);
5269 rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
5270 gl_index_id.index_id);
5271 }
5272
5273 void Rdb_dict_manager::delete_with_prefix(
5274 rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
5275 const GL_INDEX_ID &gl_index_id) const {
5276 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5277 dump_index_id(&key_writer, dict_type, gl_index_id);
5278
5279 delete_key(batch, key_writer.to_slice());
5280 }
5281
5282 void Rdb_dict_manager::add_or_update_index_cf_mapping(
5283 rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
5284 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5285 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
5286 index_info->m_gl_index_id);
5287
5288 Rdb_buf_writer<256> value_writer;
5289
5290 value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
5291 value_writer.write_byte(index_info->m_index_type);
5292 value_writer.write_uint16(index_info->m_kv_version);
5293 value_writer.write_uint32(index_info->m_index_flags);
5294 value_writer.write_uint64(index_info->m_ttl_duration);
5295
5296 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5297 }
5298
5299 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
5300 const uint32_t cf_id,
5301 const uint32_t cf_flags) const {
5302 assert(batch != nullptr);
5303
5304 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
5305 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
5306 key_writer.write_uint32(cf_id);
5307
5308 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5309 value_writer;
5310 value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
5311 value_writer.write_uint32(cf_flags);
5312
5313 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5314 }
5315
5316 void Rdb_dict_manager::delete_cf_flags(rocksdb::WriteBatch *const batch,
5317 const uint &cf_id) const {
5318 assert(batch != nullptr);
5319
5320 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5321
5322 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
5323 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5324 const rocksdb::Slice key =
5325 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5326
5327 delete_key(batch, key);
5328 }
5329
5330 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
5331 const GL_INDEX_ID &gl_index_id) const {
5332 delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
5333 delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5334 delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
5335 }
5336
5337 bool Rdb_dict_manager::get_index_info(
5338 const GL_INDEX_ID &gl_index_id,
5339 struct Rdb_index_info *const index_info) const {
5340 if (index_info) {
5341 index_info->m_gl_index_id = gl_index_id;
5342 }
5343
5344 bool found = false;
5345 bool error = false;
5346 std::string value;
5347 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5348 dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
5349
5350 const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
5351 if (status.ok()) {
5352 if (!index_info) {
5353 return true;
5354 }
5355
5356 const uchar *const val = (const uchar *)value.c_str();
5357 const uchar *ptr = val;
5358 index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
5359 ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
5360
5361 switch (index_info->m_index_dict_version) {
5362 case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
5363 /* Sanity check to prevent reading bogus TTL record. */
5364 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
5365 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
5366 RDB_SIZEOF_INDEX_FLAGS +
5367 ROCKSDB_SIZEOF_TTL_RECORD) {
5368 error = true;
5369 break;
5370 }
5371 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
5372 ptr += RDB_SIZEOF_INDEX_TYPE;
5373 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
5374 ptr += RDB_SIZEOF_KV_VERSION;
5375 index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
5376 ptr += RDB_SIZEOF_INDEX_FLAGS;
5377 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
5378 found = true;
5379 break;
5380
5381 case Rdb_key_def::INDEX_INFO_VERSION_TTL:
5382 /* Sanity check to prevent reading bogus into TTL record. */
5383 if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
5384 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
5385 ROCKSDB_SIZEOF_TTL_RECORD) {
5386 error = true;
5387 break;
5388 }
5389 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
5390 ptr += RDB_SIZEOF_INDEX_TYPE;
5391 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
5392 ptr += RDB_SIZEOF_KV_VERSION;
5393 index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
5394 if ((index_info->m_kv_version ==
5395 Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
5396 index_info->m_ttl_duration > 0) {
5397 index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
5398 }
5399 found = true;
5400 break;
5401
5402 case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
5403 case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
5404 index_info->m_index_type = rdb_netbuf_to_byte(ptr);
5405 ptr += RDB_SIZEOF_INDEX_TYPE;
5406 index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
5407 found = true;
5408 break;
5409
5410 default:
5411 error = true;
5412 break;
5413 }
5414
5415 switch (index_info->m_index_type) {
5416 case Rdb_key_def::INDEX_TYPE_PRIMARY:
5417 case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
5418 error = index_info->m_kv_version >
5419 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
5420 break;
5421 }
5422 case Rdb_key_def::INDEX_TYPE_SECONDARY:
5423 error = index_info->m_kv_version >
5424 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
5425 break;
5426 default:
5427 error = true;
5428 break;
5429 }
5430 }
5431
5432 if (error) {
5433 // NO_LINT_DEBUG
5434 sql_print_error(
5435 "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
5436 "from data dictionary. This should never happen "
5437 "and it may be a bug.",
5438 index_info->m_index_dict_version, index_info->m_index_type,
5439 index_info->m_kv_version, index_info->m_ttl_duration);
5440 abort();
5441 }
5442
5443 return found;
5444 }
5445
5446 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
5447 uint32_t *const cf_flags) const {
5448 assert(cf_flags != nullptr);
5449
5450 bool found = false;
5451 std::string value;
5452 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
5453
5454 key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
5455 key_writer.write_uint32(cf_id);
5456
5457 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5458
5459 if (status.ok()) {
5460 const uchar *val = (const uchar *)value.c_str();
5461 assert(val);
5462
5463 const uint16_t version = rdb_netbuf_to_uint16(val);
5464
5465 if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
5466 *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5467 found = true;
5468 }
5469 }
5470
5471 return found;
5472 }
5473
5474 void Rdb_dict_manager::add_dropped_cf(rocksdb::WriteBatch *const batch,
5475 const uint &cf_id) const {
5476 assert(batch != nullptr);
5477
5478 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5479 uchar value_buf[Rdb_key_def::VERSION_SIZE] = {0};
5480 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::DROPPED_CF);
5481 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5482 const rocksdb::Slice key =
5483 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5484
5485 rdb_netbuf_store_uint16(value_buf, Rdb_key_def::DROPPED_CF_VERSION);
5486 const rocksdb::Slice value =
5487 rocksdb::Slice(reinterpret_cast<char *>(value_buf), sizeof(value_buf));
5488 batch->Put(m_system_cfh, key, value);
5489 }
5490
5491 bool Rdb_dict_manager::get_dropped_cf(const uint &cf_id) const {
5492 std::string value;
5493 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5494
5495 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::DROPPED_CF);
5496 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5497
5498 const rocksdb::Slice key =
5499 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5500 const rocksdb::Status status = get_value(key, &value);
5501
5502 return status.ok();
5503 }
5504
5505 void Rdb_dict_manager::delete_dropped_cf_and_flags(
5506 rocksdb::WriteBatch *const batch, const uint &cf_id) const {
5507 assert(batch != nullptr);
5508 delete_dropped_cf(batch, cf_id);
5509 delete_cf_flags(batch, cf_id);
5510 }
5511
5512 void Rdb_dict_manager::delete_dropped_cf(rocksdb::WriteBatch *const batch,
5513 const uint &cf_id) const {
5514 assert(batch != nullptr);
5515
5516 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5517
5518 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::DROPPED_CF);
5519 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5520 const rocksdb::Slice key =
5521 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5522
5523 delete_key(batch, key);
5524 }
5525
5526 void Rdb_dict_manager::get_all_dropped_cfs(
5527 std::unordered_set<uint32> *dropped_cf_ids) const {
5528 uchar dropped_cf_buf[Rdb_key_def::INDEX_NUMBER_SIZE];
5529 rdb_netbuf_store_uint32(dropped_cf_buf, Rdb_key_def::DROPPED_CF);
5530 const rocksdb::Slice dropped_cf_slice(
5531 reinterpret_cast<char *>(dropped_cf_buf), Rdb_key_def::INDEX_NUMBER_SIZE);
5532
5533 rocksdb::Iterator *it = new_iterator();
5534 for (it->Seek(dropped_cf_slice); it->Valid(); it->Next()) {
5535 rocksdb::Slice key = it->key();
5536 const uchar *const ptr = (const uchar *)key.data();
5537
5538 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 2 ||
5539 rdb_netbuf_to_uint32(ptr) != Rdb_key_def::DROPPED_CF) {
5540 break;
5541 }
5542
5543 uint32 cf_id = rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5544 dropped_cf_ids->insert(cf_id);
5545 }
5546
5547 delete it;
5548 }
5549
5550 /*
5551 Returning index ids that were marked as deleted (via DROP TABLE) but
5552 still not removed by drop_index_thread yet, or indexes that are marked as
5553 ongoing creation.
5554 */
5555 void Rdb_dict_manager::get_ongoing_index_operation(
5556 std::unordered_set<GL_INDEX_ID> *gl_index_ids,
5557 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5558 assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5559 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5560
5561 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
5562 index_writer.write_uint32(dd_type);
5563 const rocksdb::Slice index_slice = index_writer.to_slice();
5564
5565 rocksdb::Iterator *it = new_iterator();
5566 for (it->Seek(index_slice); it->Valid(); it->Next()) {
5567 rocksdb::Slice key = it->key();
5568 const uchar *const ptr = (const uchar *)key.data();
5569
5570 /*
5571 Ongoing drop/create index operations require key to be of the form:
5572 dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
5573
5574 This may need to be changed in the future if we want to process a new
5575 ddl_type with different format.
5576 */
5577 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
5578 rdb_netbuf_to_uint32(ptr) != dd_type) {
5579 break;
5580 }
5581
5582 // We don't check version right now since currently we always store only
5583 // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
5584 // If increasing version number, we need to add version check logic here.
5585 GL_INDEX_ID gl_index_id;
5586 gl_index_id.cf_id =
5587 rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5588 gl_index_id.index_id =
5589 rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
5590 gl_index_ids->insert(gl_index_id);
5591 }
5592 delete it;
5593 }
5594
5595 /*
5596 If mysqld reboots during create table, a column family can be
5597 created without cf flags. This method adds missing cf flags. It
5598 only should be called during mysqld startup.
5599 */
5600 int Rdb_dict_manager::add_missing_cf_flags(
5601 Rdb_cf_manager *const cf_manager) const {
5602 for (const auto &cf_name : cf_manager->get_cf_names()) {
5603 std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
5604 cf_manager->get_cf(cf_name);
5605
5606 if (cf_manager->create_cf_flags_if_needed(this, cfh->GetID(), cf_name)) {
5607 return HA_EXIT_FAILURE;
5608 }
5609 }
5610
5611 return HA_EXIT_SUCCESS;
5612 }
5613
5614 /*
5615 If mysqld reboots during dropping a column family, it can happen
5616 that the column family is deleted from RocksDB, but its id is
5617 in the list of cf ids that are to be dropped.
5618 This method cleans up these orphaned cf ids. It only should be
5619 called during mysqld startup.
5620 */
5621 int Rdb_dict_manager::remove_orphaned_dropped_cfs(
5622 Rdb_cf_manager *const cf_manager,
5623 const my_bool &enable_remove_orphaned_dropped_cfs) const {
5624 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5625 rocksdb::WriteBatch *const batch = wb.get();
5626
5627 std::unordered_set<uint32> dropped_cf_ids;
5628 get_all_dropped_cfs(&dropped_cf_ids);
5629 for (const auto cf_id : dropped_cf_ids) {
5630 if (!cf_manager->get_cf(cf_id)) {
5631 // NO_LINT_DEBUG
5632 sql_print_warning(
5633 "RocksDB: Column family with id %u doesn't exist in "
5634 "cf manager, but it is listed to be dropped",
5635 cf_id);
5636
5637 if (enable_remove_orphaned_dropped_cfs) {
5638 delete_dropped_cf_and_flags(batch, cf_id);
5639 }
5640 }
5641 }
5642
5643 commit(batch);
5644 return HA_EXIT_SUCCESS;
5645 }
5646
5647 /*
5648 Returning true if index_id is create/delete ongoing (undergoing creation or
5649 marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
5650 or not.
5651 */
5652 bool Rdb_dict_manager::is_index_operation_ongoing(
5653 const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5654 assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5655 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5656
5657 bool found = false;
5658 std::string value;
5659 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5660 dump_index_id(&key_writer, dd_type, gl_index_id);
5661
5662 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5663 if (status.ok()) {
5664 found = true;
5665 }
5666 return found;
5667 }
5668
5669 /*
5670 Adding index_id to data dictionary so that the index id is removed
5671 by drop_index_thread, or to track online index creation.
5672 */
5673 void Rdb_dict_manager::start_ongoing_index_operation(
5674 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5675 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5676 assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5677 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5678
5679 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5680 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
5681
5682 dump_index_id(&key_writer, dd_type, gl_index_id);
5683
5684 // version as needed
5685 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5686 value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
5687 } else {
5688 value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
5689 }
5690
5691 batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5692 }
5693
5694 /*
5695 Removing index_id from data dictionary to confirm drop_index_thread
5696 completed dropping entire key/values of the index_id
5697 */
5698 void Rdb_dict_manager::end_ongoing_index_operation(
5699 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5700 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5701 assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5702 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5703
5704 delete_with_prefix(batch, dd_type, gl_index_id);
5705 }
5706
5707 /*
5708 Returning true if there is no target index ids to be removed
5709 by drop_index_thread
5710 */
5711 bool Rdb_dict_manager::is_drop_index_empty() const {
5712 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5713 get_ongoing_drop_indexes(&gl_index_ids);
5714 return gl_index_ids.empty();
5715 }
5716
5717 /*
5718 This function is supposed to be called by DROP TABLE. Logging messages
5719 that dropping indexes started, and adding data dictionary so that
5720 all associated indexes to be removed
5721 */
5722 void Rdb_dict_manager::add_drop_table(
5723 std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5724 rocksdb::WriteBatch *const batch) const {
5725 std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5726 for (uint32 i = 0; i < n_keys; i++) {
5727 dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5728 }
5729
5730 add_drop_index(dropped_index_ids, batch);
5731 }
5732
5733 /*
5734 Called during inplace index drop operations. Logging messages
5735 that dropping indexes started, and adding data dictionary so that
5736 all associated indexes to be removed
5737 */
5738 void Rdb_dict_manager::add_drop_index(
5739 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5740 rocksdb::WriteBatch *const batch) const {
5741 for (const auto &gl_index_id : gl_index_ids) {
5742 log_start_drop_index(gl_index_id, "Begin");
5743 start_drop_index(batch, gl_index_id);
5744 }
5745 }
5746
5747 /*
5748 Called during inplace index creation operations. Logging messages
5749 that adding indexes started, and updates data dictionary with all associated
5750 indexes to be added.
5751 */
5752 void Rdb_dict_manager::add_create_index(
5753 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5754 rocksdb::WriteBatch *const batch) const {
5755 for (const auto &gl_index_id : gl_index_ids) {
5756 // NO_LINT_DEBUG
5757 sql_print_information("RocksDB: Begin index creation (%u,%u)",
5758 gl_index_id.cf_id, gl_index_id.index_id);
5759 start_create_index(batch, gl_index_id);
5760 }
5761 }
5762
5763 /*
5764 This function is supposed to be called by drop_index_thread, when it
5765 finished dropping any index, or at the completion of online index creation.
5766 */
5767 void Rdb_dict_manager::finish_indexes_operation(
5768 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5769 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5770 assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5771 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5772
5773 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5774 rocksdb::WriteBatch *const batch = wb.get();
5775
5776 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5777 get_ongoing_create_indexes(&incomplete_create_indexes);
5778
5779 for (const auto &gl_index_id : gl_index_ids) {
5780 if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5781 end_ongoing_index_operation(batch, gl_index_id, dd_type);
5782
5783 /*
5784 Remove the corresponding incomplete create indexes from data
5785 dictionary as well
5786 */
5787 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5788 if (incomplete_create_indexes.count(gl_index_id)) {
5789 end_ongoing_index_operation(batch, gl_index_id,
5790 Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5791 }
5792 }
5793 }
5794
5795 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5796 delete_index_info(batch, gl_index_id);
5797 }
5798 }
5799 commit(batch);
5800 }
5801
5802 /*
5803 This function is supposed to be called when initializing
5804 Rdb_dict_manager (at startup). If there is any index ids that are
5805 drop ongoing, printing out messages for diagnostics purposes.
5806 */
5807 void Rdb_dict_manager::resume_drop_indexes() const {
5808 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5809 get_ongoing_drop_indexes(&gl_index_ids);
5810
5811 uint max_index_id_in_dict = 0;
5812 get_max_index_id(&max_index_id_in_dict);
5813
5814 for (const auto &gl_index_id : gl_index_ids) {
5815 log_start_drop_index(gl_index_id, "Resume");
5816 if (max_index_id_in_dict < gl_index_id.index_id) {
5817 sql_print_error(
5818 "RocksDB: Found max index id %u from data dictionary "
5819 "but also found dropped index id (%u,%u) from drop_index "
5820 "dictionary. This should never happen and is possibly a "
5821 "bug.",
5822 max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
5823 abort();
5824 }
5825 }
5826 }
5827
5828 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5829 std::unordered_set<GL_INDEX_ID> gl_index_ids;
5830
5831 get_ongoing_create_indexes(&gl_index_ids);
5832 rollback_ongoing_index_creation(gl_index_ids);
5833 }
5834
5835 void Rdb_dict_manager::rollback_ongoing_index_creation(
5836 const std::unordered_set<GL_INDEX_ID> &gl_index_ids) const {
5837 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5838 rocksdb::WriteBatch *const batch = wb.get();
5839
5840 for (const auto &gl_index_id : gl_index_ids) {
5841 // NO_LINT_DEBUG
5842 sql_print_information("RocksDB: Removing incomplete create index (%u,%u)",
5843 gl_index_id.cf_id, gl_index_id.index_id);
5844
5845 start_drop_index(batch, gl_index_id);
5846 }
5847
5848 commit(batch);
5849 }
5850
5851 void Rdb_dict_manager::log_start_drop_table(
5852 const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5853 const char *const log_action) const {
5854 for (uint32 i = 0; i < n_keys; i++) {
5855 log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5856 }
5857 }
5858
5859 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5860 const char *log_action) const {
5861 struct Rdb_index_info index_info;
5862 if (!get_index_info(gl_index_id, &index_info)) {
5863 /*
5864 If we don't find the index info, it could be that it's because it was a
5865 partially created index that isn't in the data dictionary yet that needs
5866 to be rolled back.
5867 */
5868 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5869 get_ongoing_create_indexes(&incomplete_create_indexes);
5870
5871 if (!incomplete_create_indexes.count(gl_index_id)) {
5872 /* If it's not a partially created index, something is very wrong. */
5873 sql_print_error(
5874 "RocksDB: Failed to get column family info "
5875 "from index id (%u,%u). MyRocks data dictionary may "
5876 "get corrupted.",
5877 gl_index_id.cf_id, gl_index_id.index_id);
5878 abort();
5879 }
5880 }
5881 }
5882
5883 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5884 bool found = false;
5885 std::string value;
5886
5887 const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5888 if (status.ok()) {
5889 const uchar *const val = (const uchar *)value.c_str();
5890 const uint16_t version = rdb_netbuf_to_uint16(val);
5891 if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5892 *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5893 found = true;
5894 }
5895 }
5896 return found;
5897 }
5898
5899 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5900 const uint32_t index_id) const {
5901 assert(batch != nullptr);
5902
5903 uint32_t old_index_id = -1;
5904 if (get_max_index_id(&old_index_id)) {
5905 if (old_index_id > index_id) {
5906 sql_print_error(
5907 "RocksDB: Found max index id %u from data dictionary "
5908 "but trying to update to older value %u. This should "
5909 "never happen and possibly a bug.",
5910 old_index_id, index_id);
5911 return true;
5912 }
5913 }
5914
5915 Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5916 value_writer;
5917 value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
5918 value_writer.write_uint32(index_id);
5919
5920 batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
5921 return false;
5922 }
5923
5924 void Rdb_dict_manager::add_stats(
5925 rocksdb::WriteBatch *const batch,
5926 const std::vector<Rdb_index_stats> &stats) const {
5927 assert(batch != nullptr);
5928
5929 for (const auto &it : stats) {
5930 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5931 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5932
5933 // IndexStats::materialize takes complete care of serialization including
5934 // storing the version
5935 const auto value =
5936 Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5937
5938 batch->Put(m_system_cfh, key_writer.to_slice(), value);
5939 }
5940 }
5941
5942 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5943 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5944 dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5945
5946 std::string value;
5947 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5948 if (status.ok()) {
5949 std::vector<Rdb_index_stats> v;
5950 // unmaterialize checks if the version matches
5951 if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5952 return v[0];
5953 }
5954 }
5955
5956 return Rdb_index_stats();
5957 }
5958
5959 rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
5960 rocksdb::WriteBatchBase *batch, GL_INDEX_ID gl_index_id, ulonglong val,
5961 bool overwrite) const {
5962 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5963 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5964
5965 // Value is constructed by storing the version and the value.
5966 Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5967 ROCKSDB_SIZEOF_AUTOINC_VALUE>
5968 value_writer;
5969 value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
5970 value_writer.write_uint64(val);
5971
5972 if (overwrite) {
5973 return batch->Put(m_system_cfh, key_writer.to_slice(),
5974 value_writer.to_slice());
5975 }
5976 return batch->Merge(m_system_cfh, key_writer.to_slice(),
5977 value_writer.to_slice());
5978 }
5979
5980 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5981 ulonglong *new_val) const {
5982 Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5983 dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5984
5985 std::string value;
5986 const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5987
5988 if (status.ok()) {
5989 const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5990
5991 if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5992 *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5993 return true;
5994 }
5995 }
5996 return false;
5997 }
5998
5999 uint Rdb_seq_generator::get_and_update_next_number(
6000 Rdb_dict_manager *const dict) {
6001 assert(dict != nullptr);
6002
6003 uint res;
6004 RDB_MUTEX_LOCK_CHECK(m_mutex);
6005
6006 res = m_next_number++;
6007
6008 const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
6009 rocksdb::WriteBatch *const batch = wb.get();
6010
6011 assert(batch != nullptr);
6012 dict->update_max_index_id(batch, res);
6013 dict->commit(batch);
6014
6015 RDB_MUTEX_UNLOCK_CHECK(m_mutex);
6016
6017 return res;
6018 }
6019
6020 } // namespace myrocks
6021