1 /*
2 Copyright (c) 2012,2013 Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation // gcc: Class implementation
19 #endif
20
21 /* This C++ file's header file */
22 #include "./rdb_datadic.h"
23
24 /* C++ standard header files */
25 #include <algorithm>
26 #include <array>
27 #include <limits>
28 #include <map>
29 #include <set>
30 #include <utility>
31 #include <vector>
32
33 /* MySQL header files */
34 #include "./key.h"
35 #include "./m_ctype.h"
36 #include "./my_bit.h"
37
38 /* MyRocks header files */
39 #include "./ha_rocksdb_proto.h"
40 #include "./my_stacktrace.h"
41 #include "./rdb_cf_manager.h"
42 #include "./rdb_utils.h"
43
44 namespace myrocks {
45
46 void get_mem_comparable_space(const CHARSET_INFO *cs,
47 const std::vector<uchar> **xfrm, size_t *xfrm_len,
48 size_t *mb_len);
49
50 /*
51 Rdb_key_def class implementation
52 */
53
Rdb_key_def(uint indexnr_arg,uint keyno_arg,rocksdb::ColumnFamilyHandle * cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_auto_cf_arg,const char * _name,Rdb_index_stats _stats)54 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
55 rocksdb::ColumnFamilyHandle *cf_handle_arg,
56 uint16_t index_dict_version_arg, uchar index_type_arg,
57 uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
58 bool is_auto_cf_arg, const char *_name,
59 Rdb_index_stats _stats)
60 : m_index_number(indexnr_arg), m_cf_handle(cf_handle_arg),
61 m_index_dict_version(index_dict_version_arg),
62 m_index_type(index_type_arg), m_kv_format_version(kv_format_version_arg),
63 m_is_reverse_cf(is_reverse_cf_arg), m_is_auto_cf(is_auto_cf_arg),
64 m_name(_name), m_stats(_stats), m_pk_part_no(nullptr),
65 m_pack_info(nullptr), m_keyno(keyno_arg), m_key_parts(0),
66 m_prefix_extractor(nullptr), m_maxlength(0) // means 'not intialized'
67 {
68 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
69 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
70 DBUG_ASSERT(m_cf_handle != nullptr);
71 }
72
Rdb_key_def(const Rdb_key_def & k)73 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
74 : m_index_number(k.m_index_number), m_cf_handle(k.m_cf_handle),
75 m_is_reverse_cf(k.m_is_reverse_cf), m_is_auto_cf(k.m_is_auto_cf),
76 m_name(k.m_name), m_stats(k.m_stats), m_pk_part_no(k.m_pk_part_no),
77 m_pack_info(k.m_pack_info), m_keyno(k.m_keyno),
78 m_key_parts(k.m_key_parts), m_prefix_extractor(k.m_prefix_extractor),
79 m_maxlength(k.m_maxlength) {
80 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
81 rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
82 if (k.m_pack_info) {
83 const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
84 m_pack_info =
85 reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
86 memcpy(m_pack_info, k.m_pack_info, size);
87 }
88
89 if (k.m_pk_part_no) {
90 const size_t size = sizeof(uint) * m_key_parts;
91 m_pk_part_no = reinterpret_cast<uint *>(my_malloc(size, MYF(0)));
92 memcpy(m_pk_part_no, k.m_pk_part_no, size);
93 }
94 }
95
~Rdb_key_def()96 Rdb_key_def::~Rdb_key_def() {
97 mysql_mutex_destroy(&m_mutex);
98
99 my_free(m_pk_part_no);
100 m_pk_part_no = nullptr;
101
102 my_free(m_pack_info);
103 m_pack_info = nullptr;
104 }
105
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)106 void Rdb_key_def::setup(const TABLE *const tbl,
107 const Rdb_tbl_def *const tbl_def) {
108 DBUG_ASSERT(tbl != nullptr);
109 DBUG_ASSERT(tbl_def != nullptr);
110
111 /*
112 Set max_length based on the table. This can be called concurrently from
113 multiple threads, so there is a mutex to protect this code.
114 */
115 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
116 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
117 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
118 if (!m_maxlength) {
119 mysql_mutex_lock(&m_mutex);
120 if (m_maxlength != 0) {
121 mysql_mutex_unlock(&m_mutex);
122 return;
123 }
124
125 KEY *key_info = nullptr;
126 KEY *pk_info = nullptr;
127 if (!is_hidden_pk) {
128 key_info = &tbl->key_info[m_keyno];
129 if (!hidden_pk_exists)
130 pk_info = &tbl->key_info[tbl->s->primary_key];
131 m_name = std::string(key_info->name);
132 } else {
133 m_name = HIDDEN_PK_NAME;
134 }
135
136 if (secondary_key)
137 m_pk_key_parts = hidden_pk_exists ? 1 : pk_info->actual_key_parts;
138 else {
139 pk_info = nullptr;
140 m_pk_key_parts = 0;
141 }
142
143 // "unique" secondary keys support:
144 m_key_parts = is_hidden_pk ? 1 : key_info->actual_key_parts;
145
146 if (secondary_key) {
147 /*
148 In most cases, SQL layer puts PK columns as invisible suffix at the
149 end of secondary key. There are cases where this doesn't happen:
150 - unique secondary indexes.
151 - partitioned tables.
152
153 Internally, we always need PK columns as suffix (and InnoDB does,
154 too, if you were wondering).
155
156 The loop below will attempt to put all PK columns at the end of key
157 definition. Columns that are already included in the index (either
158 by the user or by "extended keys" feature) are not included for the
159 second time.
160 */
161 m_key_parts += m_pk_key_parts;
162 }
163
164 if (secondary_key)
165 m_pk_part_no = reinterpret_cast<uint *>(
166 my_malloc(sizeof(uint) * m_key_parts, MYF(0)));
167 else
168 m_pk_part_no = nullptr;
169
170 const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
171 m_pack_info =
172 reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
173
174 size_t max_len = INDEX_NUMBER_SIZE;
175 int unpack_len = 0;
176 int max_part_len = 0;
177 bool simulating_extkey = false;
178 uint dst_i = 0;
179
180 uint keyno_to_set = m_keyno;
181 uint keypart_to_set = 0;
182
183 if (is_hidden_pk) {
184 Field *field = nullptr;
185 m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
186 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
187 max_len += m_pack_info[dst_i].m_max_image_len;
188 max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
189 dst_i++;
190 } else {
191 KEY_PART_INFO *key_part = key_info->key_part;
192
193 /* this loop also loops over the 'extended key' tail */
194 for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
195 Field *const field = key_part ? key_part->field : nullptr;
196
197 if (simulating_extkey && !hidden_pk_exists) {
198 DBUG_ASSERT(secondary_key);
199 /* Check if this field is already present in the key definition */
200 bool found = false;
201 for (uint j = 0; j < key_info->actual_key_parts; j++) {
202 if (field->field_index ==
203 key_info->key_part[j].field->field_index &&
204 key_part->length == key_info->key_part[j].length) {
205 found = true;
206 break;
207 }
208 }
209
210 if (found) {
211 key_part++;
212 continue;
213 }
214 }
215
216 if (field && field->real_maybe_null())
217 max_len += 1; // NULL-byte
218
219 m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
220 key_part ? key_part->length : 0);
221 m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
222
223 if (pk_info) {
224 m_pk_part_no[dst_i] = -1;
225 for (uint j = 0; j < m_pk_key_parts; j++) {
226 if (field->field_index == pk_info->key_part[j].field->field_index) {
227 m_pk_part_no[dst_i] = j;
228 break;
229 }
230 }
231 } else if (secondary_key && hidden_pk_exists) {
232 /*
233 The hidden pk can never be part of the sk. So it is always
234 appended to the end of the sk.
235 */
236 m_pk_part_no[dst_i] = -1;
237 if (simulating_extkey)
238 m_pk_part_no[dst_i] = 0;
239 }
240
241 max_len += m_pack_info[dst_i].m_max_image_len;
242
243 max_part_len =
244 std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
245
246 key_part++;
247 /*
248 For "unique" secondary indexes, pretend they have
249 "index extensions"
250 */
251 if (secondary_key && src_i + 1 == key_info->actual_key_parts) {
252 simulating_extkey = true;
253 if (!hidden_pk_exists) {
254 keyno_to_set = tbl->s->primary_key;
255 key_part = pk_info->key_part;
256 keypart_to_set = (uint)-1;
257 } else {
258 keyno_to_set = tbl_def->m_key_count - 1;
259 key_part = nullptr;
260 keypart_to_set = 0;
261 }
262 }
263
264 dst_i++;
265 }
266 }
267
268 m_key_parts = dst_i;
269
270 /* Initialize the memory needed by the stats structure */
271 m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
272
273 /* Cache prefix extractor for bloom filter usage later */
274 rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
275 m_prefix_extractor = opt.prefix_extractor;
276
277 /*
278 This should be the last member variable set before releasing the mutex
279 so that other threads can't see the object partially set up.
280 */
281 m_maxlength = max_len;
282
283 mysql_mutex_unlock(&m_mutex);
284 }
285 }
286
287 /**
288 Read a memcmp key part from a slice using the passed in reader.
289
290 Returns -1 if field was null, 1 if error, 0 otherwise.
291 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const292 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
293 Rdb_string_reader *reader,
294 const uint part_num) const {
295 /* It is impossible to unpack the column. Skip it. */
296 if (m_pack_info[part_num].m_maybe_null) {
297 const char *nullp;
298 if (!(nullp = reader->read(1)))
299 return 1;
300 if (*nullp == 0) {
301 /* This is a NULL value */
302 return -1;
303 } else {
304 /* If NULL marker is not '0', it can be only '1' */
305 if (*nullp != 1)
306 return 1;
307 }
308 }
309
310 Rdb_field_packing *fpi = &m_pack_info[part_num];
311 DBUG_ASSERT(table_arg->s != nullptr);
312
313 bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
314 (table_arg->s->primary_key == MAX_INDEXES);
315 Field *field = nullptr;
316 if (!is_hidden_pk_part)
317 field = fpi->get_field_in_table(table_arg);
318 if (fpi->m_skip_func(fpi, field, reader))
319 return 1;
320
321 return 0;
322 }
323
324 /**
325 Get a mem-comparable form of Primary Key from mem-comparable form of this key
326
327 @param
328 pk_descr Primary Key descriptor
329 key Index tuple from this key in mem-comparable form
330 pk_buffer OUT Put here mem-comparable form of the Primary Key.
331
332 @note
333 It may or may not be possible to restore primary key columns to their
334 mem-comparable form. To handle all cases, this function copies mem-
335 comparable forms directly.
336
337 RocksDB SE supports "Extended keys". This means that PK columns are present
338 at the end of every key. If the key already includes PK columns, then
339 these columns are not present at the end of the key.
340
341 Because of the above, we copy each primary key column.
342
343 @todo
344 If we checked crc32 checksums in this function, we would catch some CRC
345 violations that we currently don't. On the other hand, there is a broader
346 set of queries for which we would check the checksum twice.
347 */
348
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const349 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
350 const Rdb_key_def &pk_descr,
351 const rocksdb::Slice *const key,
352 uchar *const pk_buffer) const {
353 DBUG_ASSERT(table != nullptr);
354 DBUG_ASSERT(key != nullptr);
355 DBUG_ASSERT(pk_buffer);
356
357 uint size = 0;
358 uchar *buf = pk_buffer;
359 DBUG_ASSERT(m_pk_key_parts);
360
361 /* Put the PK number */
362 rdb_netbuf_store_index(buf, pk_descr.m_index_number);
363 buf += INDEX_NUMBER_SIZE;
364 size += INDEX_NUMBER_SIZE;
365
366 const char *start_offs[MAX_REF_PARTS];
367 const char *end_offs[MAX_REF_PARTS];
368 int pk_key_part;
369 uint i;
370 Rdb_string_reader reader(key);
371
372 // Skip the index number
373 if ((!reader.read(INDEX_NUMBER_SIZE)))
374 return RDB_INVALID_KEY_LEN;
375
376 for (i = 0; i < m_key_parts; i++) {
377 if ((pk_key_part = m_pk_part_no[i]) != -1) {
378 start_offs[pk_key_part] = reader.get_current_ptr();
379 }
380
381 if (read_memcmp_key_part(table, &reader, i) > 0) {
382 return RDB_INVALID_KEY_LEN;
383 }
384
385 if (pk_key_part != -1) {
386 end_offs[pk_key_part] = reader.get_current_ptr();
387 }
388 }
389
390 for (i = 0; i < m_pk_key_parts; i++) {
391 const uint part_size = end_offs[i] - start_offs[i];
392 memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
393 buf += part_size;
394 size += part_size;
395 }
396
397 return size;
398 }
399
400 /**
401 Get a mem-comparable form of Secondary Key from mem-comparable form of this
402 key, without the extended primary key tail.
403
404 @param
405 key Index tuple from this key in mem-comparable form
406 sk_buffer OUT Put here mem-comparable form of the Secondary Key.
407 n_null_fields OUT Put number of null fields contained within sk entry
408 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const409 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
410 const rocksdb::Slice &key,
411 uchar *sk_buffer,
412 uint *n_null_fields) const {
413 DBUG_ASSERT(table != nullptr);
414 DBUG_ASSERT(sk_buffer != nullptr);
415 DBUG_ASSERT(n_null_fields != nullptr);
416 DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
417
418 uchar *buf = sk_buffer;
419
420 int res;
421 Rdb_string_reader reader(&key);
422 const char *start = reader.get_current_ptr();
423
424 // Skip the index number
425 if ((!reader.read(INDEX_NUMBER_SIZE)))
426 return RDB_INVALID_KEY_LEN;
427
428 for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
429 if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
430 return RDB_INVALID_KEY_LEN;
431 } else if (res == -1) {
432 (*n_null_fields)++;
433 }
434 }
435
436 uint sk_memcmp_len = reader.get_current_ptr() - start;
437 memcpy(buf, start, sk_memcmp_len);
438 return sk_memcmp_len;
439 }
440
441 /**
442 Convert index tuple into storage (i.e. mem-comparable) format
443
444 @detail
445 Currently this is done by unpacking into table->record[0] and then
446 packing index columns into storage format.
447
448 @param pack_buffer Temporary area for packing varchar columns. Its
449 size is at least max_storage_fmt_length() bytes.
450 */
451
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,const uchar * const key_tuple,const key_part_map & keypart_map) const452 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
453 uchar *const packed_tuple,
454 const uchar *const key_tuple,
455 const key_part_map &keypart_map) const {
456 DBUG_ASSERT(tbl != nullptr);
457 DBUG_ASSERT(pack_buffer != nullptr);
458 DBUG_ASSERT(packed_tuple != nullptr);
459 DBUG_ASSERT(key_tuple != nullptr);
460
461 /* We were given a record in KeyTupleFormat. First, save it to record */
462 const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
463 key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
464
465 uint n_used_parts = my_count_bits(keypart_map);
466 if (keypart_map == HA_WHOLE_KEY)
467 n_used_parts = 0; // Full key is used
468
469 /* Then, convert the record into a mem-comparable form */
470 return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
471 false, 0, n_used_parts);
472 }
473
474 /**
475 @brief
476 Check if "unpack info" data includes checksum.
477
478 @detail
479 This is used only by CHECK TABLE to count the number of rows that have
480 checksums.
481 */
482
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)483 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
484 const uchar *ptr = (const uchar *)unpack_info.data();
485 size_t size = unpack_info.size();
486
487 // Skip unpack info if present.
488 if (size >= RDB_UNPACK_HEADER_SIZE && ptr[0] == RDB_UNPACK_DATA_TAG) {
489 const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
490 SHIP_ASSERT(size >= skip_len);
491
492 size -= skip_len;
493 ptr += skip_len;
494 }
495
496 return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
497 }
498
499 /*
500 @return Number of bytes that were changed
501 */
successor(uchar * const packed_tuple,const uint & len)502 int Rdb_key_def::successor(uchar *const packed_tuple, const uint &len) {
503 DBUG_ASSERT(packed_tuple != nullptr);
504
505 int changed = 0;
506 uchar *p = packed_tuple + len - 1;
507 for (; p > packed_tuple; p--) {
508 changed++;
509 if (*p != uchar(0xFF)) {
510 *p = *p + 1;
511 break;
512 }
513 *p = '\0';
514 }
515 return changed;
516 }
517
518 /**
519 Get index columns from the record and pack them into mem-comparable form.
520
521 @param
522 tbl Table we're working on
523 record IN Record buffer with fields in table->record format
524 pack_buffer IN Temporary area for packing varchars. The size is
525 at least max_storage_fmt_length() bytes.
526 packed_tuple OUT Key in the mem-comparable form
527 unpack_info OUT Unpack data
528 unpack_info_len OUT Unpack data length
529 n_key_parts Number of keyparts to process. 0 means all of them.
530 n_null_fields OUT Number of key fields with NULL value.
531
532 @detail
533 Some callers do not need the unpack information, they can pass
534 unpack_info=nullptr, unpack_info_len=nullptr.
535
536 @return
537 Length of the packed tuple
538 */
539
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool & should_store_row_debug_checksums,const longlong & hidden_pk_id,uint n_key_parts,uint * const n_null_fields) const540 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
541 const uchar *const record,
542 uchar *const packed_tuple,
543 Rdb_string_writer *const unpack_info,
544 const bool &should_store_row_debug_checksums,
545 const longlong &hidden_pk_id, uint n_key_parts,
546 uint *const n_null_fields) const {
547 DBUG_ASSERT(tbl != nullptr);
548 DBUG_ASSERT(pack_buffer != nullptr);
549 DBUG_ASSERT(record != nullptr);
550 DBUG_ASSERT(packed_tuple != nullptr);
551 // Checksums for PKs are made when record is packed.
552 // We should never attempt to make checksum just from PK values
553 DBUG_ASSERT_IMP(should_store_row_debug_checksums,
554 (m_index_type == INDEX_TYPE_SECONDARY));
555
556 uchar *tuple = packed_tuple;
557 size_t unpack_len_pos = size_t(-1);
558 const bool hidden_pk_exists = table_has_hidden_pk(tbl);
559
560 rdb_netbuf_store_index(tuple, m_index_number);
561 tuple += INDEX_NUMBER_SIZE;
562
563 // If n_key_parts is 0, it means all columns.
564 // The following includes the 'extended key' tail.
565 // The 'extended key' includes primary key. This is done to 'uniqify'
566 // non-unique indexes
567 const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
568
569 // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
570 // hidden key part. So we skip it (its always 1 part).
571 if (hidden_pk_exists && !hidden_pk_id && use_all_columns)
572 n_key_parts = m_key_parts - 1;
573 else if (use_all_columns)
574 n_key_parts = m_key_parts;
575
576 if (n_null_fields)
577 *n_null_fields = 0;
578
579 if (unpack_info) {
580 unpack_info->clear();
581 unpack_info->write_uint8(RDB_UNPACK_DATA_TAG);
582 unpack_len_pos = unpack_info->get_current_pos();
583 // we don't know the total length yet, so write a zero
584 unpack_info->write_uint16(0);
585 }
586
587 for (uint i = 0; i < n_key_parts; i++) {
588 // Fill hidden pk id into the last key part for secondary keys for tables
589 // with no pk
590 if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
591 m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
592 break;
593 }
594
595 Field *const field = m_pack_info[i].get_field_in_table(tbl);
596 DBUG_ASSERT(field != nullptr);
597
598 // Old Field methods expected the record pointer to be at tbl->record[0].
599 // The quick and easy way to fix this was to pass along the offset
600 // for the pointer.
601 const my_ptrdiff_t ptr_diff = record - tbl->record[0];
602
603 if (field->real_maybe_null()) {
604 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
605 if (field->is_real_null(ptr_diff)) {
606 /* NULL value. store '\0' so that it sorts before non-NULL values */
607 *tuple++ = 0;
608 /* That's it, don't store anything else */
609 if (n_null_fields)
610 (*n_null_fields)++;
611 continue;
612 } else {
613 /* Not a NULL value. Store '1' */
614 *tuple++ = 1;
615 }
616 }
617
618 const bool create_unpack_info =
619 (unpack_info && // we were requested to generate unpack_info
620 m_pack_info[i].uses_unpack_info()); // and this keypart uses it
621 Rdb_pack_field_context pack_ctx(unpack_info);
622
623 // Set the offset for methods which do not take an offset as an argument
624 DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
625 m_pack_info[i].m_max_image_len));
626 field->move_field_offset(ptr_diff);
627
628 m_pack_info[i].m_pack_func(&m_pack_info[i], field, pack_buffer, &tuple,
629 &pack_ctx);
630
631 /* Make "unpack info" to be stored in the value */
632 if (create_unpack_info) {
633 m_pack_info[i].m_make_unpack_info_func(m_pack_info[i].m_charset_codec,
634 field, &pack_ctx);
635 }
636 field->move_field_offset(-ptr_diff);
637 }
638
639 if (unpack_info) {
640 const size_t len = unpack_info->get_current_pos();
641 DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
642
643 // Don't store the unpack_info if it has only the header (that is, there's
644 // no meaningful content).
645 // Primary Keys are special: for them, store the unpack_info even if it's
646 // empty (provided m_maybe_unpack_info==true, see
647 // ha_rocksdb::convert_record_to_storage_format)
648 if (len == RDB_UNPACK_HEADER_SIZE &&
649 m_index_type != Rdb_key_def::INDEX_TYPE_PRIMARY) {
650 unpack_info->clear();
651 } else {
652 unpack_info->write_uint16_at(unpack_len_pos, len);
653 }
654
655 //
656 // Secondary keys have key and value checksums in the value part
657 // Primary key is a special case (the value part has non-indexed columns),
658 // so the checksums are computed and stored by
659 // ha_rocksdb::convert_record_to_storage_format
660 //
661 if (should_store_row_debug_checksums) {
662 const uint32_t key_crc32 = crc32(0, packed_tuple, tuple - packed_tuple);
663 const uint32_t val_crc32 =
664 crc32(0, unpack_info->ptr(), unpack_info->get_current_pos());
665
666 unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
667 unpack_info->write_uint32(key_crc32);
668 unpack_info->write_uint32(val_crc32);
669 }
670 }
671
672 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
673
674 return tuple - packed_tuple;
675 }
676
677 /**
678 Pack the hidden primary key into mem-comparable form.
679
680 @param
681 tbl Table we're working on
682 hidden_pk_id IN New value to be packed into key
683 packed_tuple OUT Key in the mem-comparable form
684
685 @return
686 Length of the packed tuple
687 */
688
pack_hidden_pk(const longlong & hidden_pk_id,uchar * const packed_tuple) const689 uint Rdb_key_def::pack_hidden_pk(const longlong &hidden_pk_id,
690 uchar *const packed_tuple) const {
691 DBUG_ASSERT(packed_tuple != nullptr);
692
693 uchar *tuple = packed_tuple;
694 rdb_netbuf_store_index(tuple, m_index_number);
695 tuple += INDEX_NUMBER_SIZE;
696 DBUG_ASSERT(m_key_parts == 1);
697 DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
698 m_pack_info[0].m_max_image_len));
699
700 m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
701
702 DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
703 return tuple - packed_tuple;
704 }
705
706 /*
707 Function of type rdb_index_field_pack_t
708 */
709
rdb_pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)710 void rdb_pack_with_make_sort_key(Rdb_field_packing *const fpi,
711 Field *const field,
712 uchar *const buf __attribute__((__unused__)),
713 uchar **dst,
714 Rdb_pack_field_context *const pack_ctx
715 __attribute__((__unused__))) {
716 DBUG_ASSERT(fpi != nullptr);
717 DBUG_ASSERT(field != nullptr);
718 DBUG_ASSERT(dst != nullptr);
719 DBUG_ASSERT(*dst != nullptr);
720
721 const int max_len = fpi->m_max_image_len;
722 field->make_sort_key(*dst, max_len);
723 *dst += max_len;
724 }
725
726 /*
727 Compares two keys without unpacking
728
729 @detail
730 @return
731 0 - Ok. column_index is the index of the first column which is different.
732 -1 if two kes are equal
733 1 - Data format error.
734 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const735 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
736 const rocksdb::Slice *key2,
737 std::size_t *const column_index) const {
738 DBUG_ASSERT(key1 != nullptr);
739 DBUG_ASSERT(key2 != nullptr);
740 DBUG_ASSERT(column_index != nullptr);
741
742 // the caller should check the return value and
743 // not rely on column_index being valid
744 *column_index = 0xbadf00d;
745
746 Rdb_string_reader reader1(key1);
747 Rdb_string_reader reader2(key2);
748
749 // Skip the index number
750 if ((!reader1.read(INDEX_NUMBER_SIZE)))
751 return HA_EXIT_FAILURE;
752
753 if ((!reader2.read(INDEX_NUMBER_SIZE)))
754 return HA_EXIT_FAILURE;
755
756 for (uint i = 0; i < m_key_parts; i++) {
757 const Rdb_field_packing *const fpi = &m_pack_info[i];
758 if (fpi->m_maybe_null) {
759 const auto nullp1 = reader1.read(1);
760 const auto nullp2 = reader2.read(1);
761
762 if (nullp1 == nullptr || nullp2 == nullptr) {
763 return HA_EXIT_FAILURE;
764 }
765
766 if (*nullp1 != *nullp2) {
767 *column_index = i;
768 return HA_EXIT_SUCCESS;
769 }
770
771 if (*nullp1 == 0) {
772 /* This is a NULL value */
773 continue;
774 }
775 }
776
777 const auto before_skip1 = reader1.get_current_ptr();
778 const auto before_skip2 = reader2.get_current_ptr();
779 DBUG_ASSERT(fpi->m_skip_func);
780 if (fpi->m_skip_func(fpi, nullptr, &reader1))
781 return HA_EXIT_FAILURE;
782 if (fpi->m_skip_func(fpi, nullptr, &reader2))
783 return HA_EXIT_FAILURE;
784 const auto size1 = reader1.get_current_ptr() - before_skip1;
785 const auto size2 = reader2.get_current_ptr() - before_skip2;
786 if (size1 != size2) {
787 *column_index = i;
788 return HA_EXIT_SUCCESS;
789 }
790
791 if (memcmp(before_skip1, before_skip2, size1) != 0) {
792 *column_index = i;
793 return HA_EXIT_SUCCESS;
794 }
795 }
796
797 *column_index = m_key_parts;
798 return HA_EXIT_SUCCESS;
799 }
800
801 /*
802 @brief
803 Given a zero-padded key, determine its real key length
804
805 @detail
806 Fixed-size skip functions just read.
807 */
808
key_length(const TABLE * const table,const rocksdb::Slice & key) const809 size_t Rdb_key_def::key_length(const TABLE *const table,
810 const rocksdb::Slice &key) const {
811 DBUG_ASSERT(table != nullptr);
812
813 Rdb_string_reader reader(&key);
814
815 if ((!reader.read(INDEX_NUMBER_SIZE)))
816 return size_t(-1);
817
818 for (uint i = 0; i < m_key_parts; i++) {
819 const Rdb_field_packing *fpi = &m_pack_info[i];
820 const Field *field = nullptr;
821 if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY)
822 field = fpi->get_field_in_table(table);
823 if (fpi->m_skip_func(fpi, field, &reader))
824 return size_t(-1);
825 }
826 return key.size() - reader.remaining_bytes();
827 }
828
829 /*
830 Take mem-comparable form and unpack_info and unpack it to Table->record
831
832 @detail
833 not all indexes support this
834
835 @return
836 UNPACK_SUCCESS - Ok
837 UNPACK_FAILURE - Data format error.
838 */
839
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool & verify_row_debug_checksums) const840 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
841 const rocksdb::Slice *const packed_key,
842 const rocksdb::Slice *const unpack_info,
843 const bool &verify_row_debug_checksums) const {
844 Rdb_string_reader reader(packed_key);
845 Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
846
847 const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
848 const bool hidden_pk_exists = table_has_hidden_pk(table);
849 const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
850 // There is no checksuming data after unpack_info for primary keys, because
851 // the layout there is different. The checksum is verified in
852 // ha_rocksdb::convert_record_from_storage_format instead.
853 DBUG_ASSERT_IMP(!secondary_key, !verify_row_debug_checksums);
854
855 // Old Field methods expected the record pointer to be at tbl->record[0].
856 // The quick and easy way to fix this was to pass along the offset
857 // for the pointer.
858 const my_ptrdiff_t ptr_diff = buf - table->record[0];
859
860 // Skip the index number
861 if ((!reader.read(INDEX_NUMBER_SIZE))) {
862 return HA_EXIT_FAILURE;
863 }
864
865 // For secondary keys, we expect the value field to contain unpack data and
866 // checksum data in that order. One or both can be missing, but they cannot
867 // be reordered.
868 const bool has_unpack_info =
869 unp_reader.remaining_bytes() &&
870 *unp_reader.get_current_ptr() == RDB_UNPACK_DATA_TAG;
871 if (has_unpack_info && !unp_reader.read(RDB_UNPACK_HEADER_SIZE)) {
872 return HA_EXIT_FAILURE;
873 }
874
875 for (uint i = 0; i < m_key_parts; i++) {
876 Rdb_field_packing *const fpi = &m_pack_info[i];
877
878 /*
879 Hidden pk field is packed at the end of the secondary keys, but the SQL
880 layer does not know about it. Skip retrieving field if hidden pk.
881 */
882 if ((secondary_key && hidden_pk_exists && i + 1 == m_key_parts) ||
883 is_hidden_pk) {
884 DBUG_ASSERT(fpi->m_unpack_func);
885 if (fpi->m_skip_func(fpi, nullptr, &reader)) {
886 return HA_EXIT_FAILURE;
887 }
888 continue;
889 }
890
891 Field *const field = fpi->get_field_in_table(table);
892
893 if (fpi->m_unpack_func) {
894 /* It is possible to unpack this column. Do it. */
895
896 if (fpi->m_maybe_null) {
897 const char *nullp;
898 if (!(nullp = reader.read(1)))
899 return HA_EXIT_FAILURE;
900 if (*nullp == 0) {
901 /* Set the NULL-bit of this field */
902 field->set_null(ptr_diff);
903 /* Also set the field to its default value */
904 uint field_offset = field->ptr - table->record[0];
905 memcpy(buf + field_offset, table->s->default_values + field_offset,
906 field->pack_length());
907 continue;
908 } else if (*nullp == 1)
909 field->set_notnull(ptr_diff);
910 else
911 return HA_EXIT_FAILURE;
912 }
913
914 // If we need unpack info, but there is none, tell the unpack function
915 // this by passing unp_reader as nullptr. If we never read unpack_info
916 // during unpacking anyway, then there won't an error.
917 const bool maybe_missing_unpack =
918 !has_unpack_info && fpi->uses_unpack_info();
919 const int res =
920 fpi->m_unpack_func(fpi, field, field->ptr + ptr_diff, &reader,
921 maybe_missing_unpack ? nullptr : &unp_reader);
922
923 if (res)
924 return res;
925 } else {
926 /* It is impossible to unpack the column. Skip it. */
927 if (fpi->m_maybe_null) {
928 const char *nullp;
929 if (!(nullp = reader.read(1)))
930 return HA_EXIT_FAILURE;
931 if (*nullp == 0) {
932 /* This is a NULL value */
933 continue;
934 }
935 /* If NULL marker is not '0', it can be only '1' */
936 if (*nullp != 1)
937 return HA_EXIT_FAILURE;
938 }
939 if (fpi->m_skip_func(fpi, field, &reader))
940 return HA_EXIT_FAILURE;
941 }
942 }
943
944 /*
945 Check checksum values if present
946 */
947 const char *ptr;
948 if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
949 if (verify_row_debug_checksums) {
950 uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
951 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
952 const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
953 (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
954
955 const uint32_t computed_key_chksum =
956 crc32(0, (const uchar *)packed_key->data(), packed_key->size());
957 const uint32_t computed_val_chksum =
958 crc32(0, (const uchar *)unpack_info->data(),
959 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
960
961 DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
962 stored_key_chksum++;);
963
964 if (stored_key_chksum != computed_key_chksum) {
965 report_checksum_mismatch(true, packed_key->data(), packed_key->size());
966 return HA_EXIT_FAILURE;
967 }
968
969 if (stored_val_chksum != computed_val_chksum) {
970 report_checksum_mismatch(false, unpack_info->data(),
971 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
972 return HA_EXIT_FAILURE;
973 }
974 } else {
975 /* The checksums are present but we are not checking checksums */
976 }
977 }
978
979 if (reader.remaining_bytes())
980 return HA_EXIT_FAILURE;
981
982 return HA_EXIT_SUCCESS;
983 }
984
table_has_hidden_pk(const TABLE * const table)985 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
986 return table->s->primary_key == MAX_INDEXES;
987 }
988
report_checksum_mismatch(const bool & is_key,const char * const data,const size_t data_size) const989 void Rdb_key_def::report_checksum_mismatch(const bool &is_key,
990 const char *const data,
991 const size_t data_size) const {
992 // NO_LINT_DEBUG
993 sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
994 is_key ? "key" : "value", get_index_number());
995
996 const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
997 // NO_LINT_DEBUG
998 sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
999 (uint64_t)data_size, buf.c_str());
1000
1001 my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1002 }
1003
index_format_min_check(const int & pk_min,const int & sk_min) const1004 bool Rdb_key_def::index_format_min_check(const int &pk_min,
1005 const int &sk_min) const {
1006 switch (m_index_type) {
1007 case INDEX_TYPE_PRIMARY:
1008 case INDEX_TYPE_HIDDEN_PRIMARY:
1009 return (m_kv_format_version >= pk_min);
1010 case INDEX_TYPE_SECONDARY:
1011 return (m_kv_format_version >= sk_min);
1012 default:
1013 DBUG_ASSERT(0);
1014 return false;
1015 }
1016 }
1017
1018 ///////////////////////////////////////////////////////////////////////////////////////////
1019 // Rdb_field_packing
1020 ///////////////////////////////////////////////////////////////////////////////////////////
1021
1022 /*
1023 Function of type rdb_index_field_skip_t
1024 */
1025
rdb_skip_max_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1026 int rdb_skip_max_length(const Rdb_field_packing *const fpi,
1027 const Field *const field __attribute__((__unused__)),
1028 Rdb_string_reader *const reader) {
1029 if (!reader->read(fpi->m_max_image_len))
1030 return HA_EXIT_FAILURE;
1031 return HA_EXIT_SUCCESS;
1032 }
1033
1034 /*
1035 (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1036 split in the middle of an UTF-8 character. See the implementation of
1037 rdb_unpack_binary_or_utf8_varchar.
1038 */
1039
1040 const uint RDB_ESCAPE_LENGTH = 9;
1041 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1042 "RDB_ESCAPE_LENGTH-1 must be even.");
1043
1044 /*
1045 Function of type rdb_index_field_skip_t
1046 */
1047
rdb_skip_variable_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1048 static int rdb_skip_variable_length(const Rdb_field_packing *const fpi
1049 __attribute__((__unused__)),
1050 const Field *const field,
1051 Rdb_string_reader *const reader) {
1052 const uchar *ptr;
1053 bool finished = false;
1054
1055 size_t dst_len; /* How much data can be there */
1056 if (field) {
1057 const Field_varstring *const field_var =
1058 static_cast<const Field_varstring *>(field);
1059 dst_len = field_var->pack_length() - field_var->length_bytes;
1060 } else {
1061 dst_len = UINT_MAX;
1062 }
1063
1064 /* Decode the length-emitted encoding here */
1065 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1066 /* See rdb_pack_with_varchar_encoding. */
1067 const uchar pad =
1068 255 - ptr[RDB_ESCAPE_LENGTH - 1]; // number of padding bytes
1069 const uchar used_bytes = RDB_ESCAPE_LENGTH - 1 - pad;
1070
1071 if (used_bytes > RDB_ESCAPE_LENGTH - 1 || used_bytes > dst_len) {
1072 return HA_EXIT_FAILURE; /* cannot store that much, invalid data */
1073 }
1074
1075 if (used_bytes < RDB_ESCAPE_LENGTH - 1) {
1076 finished = true;
1077 break;
1078 }
1079 dst_len -= used_bytes;
1080 }
1081
1082 if (!finished) {
1083 return HA_EXIT_FAILURE;
1084 }
1085
1086 return HA_EXIT_SUCCESS;
1087 }
1088
1089 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1090 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1091 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1092
1093 /*
1094 Skip a keypart that uses Variable-Length Space-Padded encoding
1095 */
1096
rdb_skip_variable_space_pad(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1097 static int rdb_skip_variable_space_pad(const Rdb_field_packing *const fpi,
1098 const Field *const field,
1099 Rdb_string_reader *const reader) {
1100 const uchar *ptr;
1101 bool finished = false;
1102
1103 size_t dst_len = UINT_MAX; /* How much data can be there */
1104
1105 if (field) {
1106 const Field_varstring *const field_var =
1107 static_cast<const Field_varstring *>(field);
1108 dst_len = field_var->pack_length() - field_var->length_bytes;
1109 }
1110
1111 /* Decode the length-emitted encoding here */
1112 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1113 // See rdb_pack_with_varchar_space_pad
1114 const uchar c = ptr[fpi->m_segment_size - 1];
1115 if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1116 // This is the last segment
1117 finished = true;
1118 break;
1119 } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1120 c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1121 // This is not the last segment
1122 if ((fpi->m_segment_size - 1) > dst_len) {
1123 // The segment is full of data but the table field can't hold that
1124 // much! This must be data corruption.
1125 return HA_EXIT_FAILURE;
1126 }
1127 dst_len -= (fpi->m_segment_size - 1);
1128 } else {
1129 // Encountered a value that's none of the VARCHAR_CMP* constants
1130 // It's data corruption.
1131 return HA_EXIT_FAILURE;
1132 }
1133 }
1134 return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1135 }
1136
1137 /*
1138 Function of type rdb_index_field_unpack_t
1139 */
1140
rdb_unpack_integer(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1141 int rdb_unpack_integer(Rdb_field_packing *const fpi, Field *const field,
1142 uchar *const to, Rdb_string_reader *const reader,
1143 Rdb_string_reader *const unp_reader
1144 __attribute__((__unused__))) {
1145 const int length = fpi->m_max_image_len;
1146
1147 const uchar *from;
1148 if (!(from = (const uchar *)reader->read(length)))
1149 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1150
1151 #ifdef WORDS_BIGENDIAN
1152 {
1153 if (((Field_num *)field)->unsigned_flag)
1154 to[0] = from[0];
1155 else
1156 to[0] = (char)(from[0] ^ 128); // Reverse the sign bit.
1157 memcpy(to + 1, from + 1, length - 1);
1158 }
1159 #else
1160 {
1161 const int sign_byte = from[0];
1162 if (((Field_num *)field)->unsigned_flag)
1163 to[length - 1] = sign_byte;
1164 else
1165 to[length - 1] =
1166 static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
1167 for (int i = 0, j = length - 1; i < length - 1; ++i, --j)
1168 to[i] = from[j];
1169 }
1170 #endif
1171 return UNPACK_SUCCESS;
1172 }
1173
1174 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)1175 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1176 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1177 // A few systems store the most-significant _word_ first on little-endian
1178 dst[0] = src[3];
1179 dst[1] = src[2];
1180 dst[2] = src[1];
1181 dst[3] = src[0];
1182 dst[4] = src[7];
1183 dst[5] = src[6];
1184 dst[6] = src[5];
1185 dst[7] = src[4];
1186 #else
1187 dst[0] = src[7];
1188 dst[1] = src[6];
1189 dst[2] = src[5];
1190 dst[3] = src[4];
1191 dst[4] = src[3];
1192 dst[5] = src[2];
1193 dst[6] = src[1];
1194 dst[7] = src[0];
1195 #endif
1196 }
1197
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)1198 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1199 dst[0] = src[3];
1200 dst[1] = src[2];
1201 dst[2] = src[1];
1202 dst[3] = src[0];
1203 }
1204 #else
1205 #define rdb_swap_double_bytes nullptr
1206 #define rdb_swap_float_bytes nullptr
1207 #endif
1208
rdb_unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t & size,const int & exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))1209 static int rdb_unpack_floating_point(
1210 uchar *const dst, Rdb_string_reader *const reader, const size_t &size,
1211 const int &exp_digit, const uchar *const zero_pattern,
1212 const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
1213 const uchar *const from = (const uchar *)reader->read(size);
1214 if (from == nullptr)
1215 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1216
1217 /* Check to see if the value is zero */
1218 if (memcmp(from, zero_pattern, size) == 0) {
1219 memcpy(dst, zero_val, size);
1220 return UNPACK_SUCCESS;
1221 }
1222
1223 #if defined(WORDS_BIGENDIAN)
1224 // On big-endian, output can go directly into result
1225 uchar *const tmp = dst;
1226 #else
1227 // Otherwise use a temporary buffer to make byte-swapping easier later
1228 uchar tmp[8];
1229 #endif
1230
1231 memcpy(tmp, from, size);
1232
1233 if (tmp[0] & 0x80) {
1234 // If the high bit is set the original value was positive so
1235 // remove the high bit and subtract one from the exponent.
1236 ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1237 exp_part &= 0x7FFF; // clear high bit;
1238 exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent
1239 tmp[0] = (uchar)(exp_part >> 8);
1240 tmp[1] = (uchar)exp_part;
1241 } else {
1242 // Otherwise the original value was negative and all bytes have been
1243 // negated.
1244 for (size_t ii = 0; ii < size; ii++)
1245 tmp[ii] ^= 0xFF;
1246 }
1247
1248 #if !defined(WORDS_BIGENDIAN)
1249 // On little-endian, swap the bytes around
1250 swap_func(dst, tmp);
1251 #else
1252 static_assert(swap_func == nullptr, "Assuming that no swapping is needed.");
1253 #endif
1254
1255 return UNPACK_SUCCESS;
1256 }
1257
1258 #if !defined(DBL_EXP_DIG)
1259 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
1260 #endif
1261
1262 /*
1263 Function of type rdb_index_field_unpack_t
1264
1265 Unpack a double by doing the reverse action of change_double_for_sort
1266 (sql/filesort.cc). Note that this only works on IEEE values.
1267 Note also that this code assumes that NaN and +/-Infinity are never
1268 allowed in the database.
1269 */
rdb_unpack_double(Rdb_field_packing * const fpi,Field * const field,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1270 static int rdb_unpack_double(Rdb_field_packing *const fpi
1271 __attribute__((__unused__)),
1272 Field *const field __attribute__((__unused__)),
1273 uchar *const field_ptr,
1274 Rdb_string_reader *const reader,
1275 Rdb_string_reader *const unp_reader
1276 __attribute__((__unused__))) {
1277 static double zero_val = 0.0;
1278 static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
1279
1280 return rdb_unpack_floating_point(
1281 field_ptr, reader, sizeof(double), DBL_EXP_DIG, zero_pattern,
1282 (const uchar *)&zero_val, rdb_swap_double_bytes);
1283 }
1284
1285 #if !defined(FLT_EXP_DIG)
1286 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
1287 #endif
1288
1289 /*
1290 Function of type rdb_index_field_unpack_t
1291
1292 Unpack a float by doing the reverse action of Field_float::make_sort_key
1293 (sql/field.cc). Note that this only works on IEEE values.
1294 Note also that this code assumes that NaN and +/-Infinity are never
1295 allowed in the database.
1296 */
rdb_unpack_float(Rdb_field_packing * const,Field * const field,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1297 static int rdb_unpack_float(Rdb_field_packing *const,
1298 Field *const field __attribute__((__unused__)),
1299 uchar *const field_ptr,
1300 Rdb_string_reader *const reader,
1301 Rdb_string_reader *const unp_reader
1302 __attribute__((__unused__))) {
1303 static float zero_val = 0.0;
1304 static const uchar zero_pattern[4] = {128, 0, 0, 0};
1305
1306 return rdb_unpack_floating_point(
1307 field_ptr, reader, sizeof(float), FLT_EXP_DIG, zero_pattern,
1308 (const uchar *)&zero_val, rdb_swap_float_bytes);
1309 }
1310
1311 /*
1312 Function of type rdb_index_field_unpack_t used to
1313 Unpack by doing the reverse action to Field_newdate::make_sort_key.
1314 */
1315
rdb_unpack_newdate(Rdb_field_packing * const fpi,Field * constfield,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1316 int rdb_unpack_newdate(Rdb_field_packing *const fpi, Field *constfield,
1317 uchar *const field_ptr, Rdb_string_reader *const reader,
1318 Rdb_string_reader *const unp_reader
1319 __attribute__((__unused__))) {
1320 const char *from;
1321 DBUG_ASSERT(fpi->m_max_image_len == 3);
1322
1323 if (!(from = reader->read(3)))
1324 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1325
1326 field_ptr[0] = from[2];
1327 field_ptr[1] = from[1];
1328 field_ptr[2] = from[0];
1329 return UNPACK_SUCCESS;
1330 }
1331
1332 /*
1333 Function of type rdb_index_field_unpack_t, used to
1334 Unpack the string by copying it over.
1335 This is for BINARY(n) where the value occupies the whole length.
1336 */
1337
rdb_unpack_binary_str(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1338 static int rdb_unpack_binary_str(Rdb_field_packing *const fpi,
1339 Field *const field, uchar *const to,
1340 Rdb_string_reader *const reader,
1341 Rdb_string_reader *const unp_reader
1342 __attribute__((__unused__))) {
1343 const char *from;
1344 if (!(from = reader->read(fpi->m_max_image_len)))
1345 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1346
1347 memcpy(to, from, fpi->m_max_image_len);
1348 return UNPACK_SUCCESS;
1349 }
1350
1351 /*
1352 Function of type rdb_index_field_unpack_t.
1353 For UTF-8, we need to convert 2-byte wide-character entities back into
1354 UTF8 sequences.
1355 */
1356
rdb_unpack_utf8_str(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1357 static int rdb_unpack_utf8_str(Rdb_field_packing *const fpi, Field *const field,
1358 uchar *dst, Rdb_string_reader *const reader,
1359 Rdb_string_reader *const unp_reader
1360 __attribute__((__unused__))) {
1361 my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
1362 const uchar *src;
1363 if (!(src = (const uchar *)reader->read(fpi->m_max_image_len)))
1364 return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1365
1366 const uchar *const src_end = src + fpi->m_max_image_len;
1367 uchar *const dst_end = dst + field->pack_length();
1368
1369 while (src < src_end) {
1370 my_wc_t wc = (src[0] << 8) | src[1];
1371 src += 2;
1372 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
1373 DBUG_ASSERT(res > 0 && res <= 3);
1374 if (res < 0)
1375 return UNPACK_FAILURE;
1376 dst += res;
1377 }
1378
1379 cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
1380 cset->pad_char);
1381 return UNPACK_SUCCESS;
1382 }
1383
1384 /*
1385 Function of type rdb_index_field_pack_t
1386 */
1387
rdb_pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)1388 static void rdb_pack_with_varchar_encoding(
1389 Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
1390 Rdb_pack_field_context *const pack_ctx __attribute__((__unused__))) {
1391 /*
1392 Use a flag byte every Nth byte. Set it to (255 - #pad) where #pad is 0
1393 when the var length field filled all N-1 previous bytes and #pad is
1394 otherwise the number of padding bytes used.
1395
1396 If N=8 and the field is:
1397 * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
1398 * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
1399 And the 4 byte string compares as greater than the 3 byte string
1400 */
1401 const CHARSET_INFO *const charset = field->charset();
1402 Field_varstring *const field_var = (Field_varstring *)field;
1403
1404 const size_t value_length = (field_var->length_bytes == 1)
1405 ? (uint)*field->ptr
1406 : uint2korr(field->ptr);
1407 size_t xfrm_len = charset->coll->strnxfrm(
1408 charset, buf, fpi->m_max_image_len, field_var->char_length(),
1409 field_var->ptr + field_var->length_bytes, value_length, 0);
1410
1411 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
1412
1413 size_t encoded_size = 0;
1414 uchar *ptr = *dst;
1415 while (1) {
1416 const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, xfrm_len);
1417 const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
1418 memcpy(ptr, buf, copy_len);
1419 ptr += copy_len;
1420 buf += copy_len;
1421 // pad with zeros if necessary;
1422 for (size_t idx = 0; idx < padding_bytes; idx++)
1423 *(ptr++) = 0;
1424 *(ptr++) = 255 - padding_bytes;
1425
1426 xfrm_len -= copy_len;
1427 encoded_size += RDB_ESCAPE_LENGTH;
1428 if (padding_bytes != 0)
1429 break;
1430 }
1431 *dst += encoded_size;
1432 }
1433
1434 /*
1435 Compare the string in [buf..buf_end) with a string that is an infinite
1436 sequence of strings in space_xfrm
1437 */
1438
1439 static int
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)1440 rdb_compare_string_with_spaces(const uchar *buf, const uchar *const buf_end,
1441 const std::vector<uchar> *const space_xfrm) {
1442 int cmp = 0;
1443 while (buf < buf_end) {
1444 size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
1445 if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0)
1446 break;
1447 buf += bytes;
1448 }
1449 return cmp;
1450 }
1451
1452 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
1453 /*
1454 Pack the data with Variable-Length Space-Padded Encoding.
1455
1456 The encoding is there to meet two goals:
1457
1458 Goal#1. Comparison. The SQL standard says
1459
1460 " If the collation for the comparison has the PAD SPACE characteristic,
1461 for the purposes of the comparison, the shorter value is effectively
1462 extended to the length of the longer by concatenation of <space>s on the
1463 right.
1464
1465 At the moment, all MySQL collations except one have the PAD SPACE
1466 characteristic. The exception is the "binary" collation that is used by
1467 [VAR]BINARY columns. (Note that binary collations for specific charsets,
1468 like utf8_bin or latin1_bin are not the same as "binary" collation, they have
1469 the PAD SPACE characteristic).
1470
1471 Goal#2 is to preserve the number of trailing spaces in the original value.
1472
1473 This is achieved by using the following encoding:
1474 The key part:
1475 - Stores mem-comparable image of the column
1476 - It is stored in chunks of fpi->m_segment_size bytes (*)
1477 = If the remainder of the chunk is not occupied, it is padded with mem-
1478 comparable image of the space character (cs->pad_char to be precise).
1479 - The last byte of the chunk shows how the rest of column's mem-comparable
1480 image would compare to mem-comparable image of the column extended with
1481 spaces. There are three possible values.
1482 - VARCHAR_CMP_LESS_THAN_SPACES,
1483 - VARCHAR_CMP_EQUAL_TO_SPACES
1484 - VARCHAR_CMP_GREATER_THAN_SPACES
1485
1486 VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
1487 is spaces, or something that sorts as spaces, so there is no reason to store
1488 it).
1489
1490 Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
1491
1492 'abcd\0' => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0 ' <VARCHAR_CMP_EQUAL> ]
1493 'abcd' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
1494 'abcd ' => [ 'abcd' <VARCHAR_CMP_EQUAL>]
1495 'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
1496
1497 As mentioned above, the last chunk is padded with mem-comparable images of
1498 cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
1499
1500 fpi->m_segment_size depends on the used collation. It is chosen to be such
1501 that no mem-comparable image of space will ever stretch across the segments
1502 (see get_segment_size_from_collation).
1503
1504 == The value part (aka unpack_info) ==
1505 The value part stores the number of space characters that one needs to add
1506 when unpacking the string.
1507 - If the number is positive, it means add this many spaces at the end
1508 - If the number is negative, it means padding has added extra spaces which
1509 must be removed.
1510
1511 Storage considerations
1512 - depending on column's max size, the number may occupy 1 or 2 bytes
1513 - the number of spaces that need to be removed is not more than
1514 RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
1515 then store it as unsigned.
1516
1517 @seealso
1518 rdb_unpack_binary_or_utf8_varchar_space_pad
1519 rdb_unpack_simple_varchar_space_pad
1520 rdb_dummy_make_unpack_info
1521 rdb_skip_variable_space_pad
1522 */
1523
1524 static void
rdb_pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)1525 rdb_pack_with_varchar_space_pad(Rdb_field_packing *const fpi,
1526 Field *const field, uchar *buf, uchar **dst,
1527 Rdb_pack_field_context *const pack_ctx) {
1528 Rdb_string_writer *const unpack_info = pack_ctx->writer;
1529 const CHARSET_INFO *const charset = field->charset();
1530 const auto field_var = static_cast<Field_varstring *>(field);
1531
1532 const size_t value_length = (field_var->length_bytes == 1)
1533 ? (uint)*field->ptr
1534 : uint2korr(field->ptr);
1535
1536 const size_t trimmed_len = charset->cset->lengthsp(
1537 charset, (const char *)field_var->ptr + field_var->length_bytes,
1538 value_length);
1539 const size_t xfrm_len = charset->coll->strnxfrm(
1540 charset, buf, fpi->m_max_image_len, field_var->char_length(),
1541 field_var->ptr + field_var->length_bytes, trimmed_len, 0);
1542
1543 /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
1544 uchar *const buf_end = buf + xfrm_len;
1545
1546 size_t encoded_size = 0;
1547 uchar *ptr = *dst;
1548 size_t padding_bytes;
1549 while (true) {
1550 const size_t copy_len =
1551 std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
1552 padding_bytes = fpi->m_segment_size - 1 - copy_len;
1553 memcpy(ptr, buf, copy_len);
1554 ptr += copy_len;
1555 buf += copy_len;
1556
1557 if (padding_bytes) {
1558 memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
1559 ptr += padding_bytes;
1560 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment
1561 } else {
1562 // Compare the string suffix with a hypothetical infinite string of
1563 // spaces. It could be that the first difference is beyond the end of
1564 // current chunk.
1565 const int cmp =
1566 rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
1567
1568 if (cmp < 0)
1569 *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
1570 else if (cmp > 0)
1571 *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
1572 else {
1573 // It turns out all the rest are spaces.
1574 *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
1575 }
1576 }
1577 encoded_size += fpi->m_segment_size;
1578
1579 if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES)
1580 break;
1581 }
1582
1583 // m_unpack_info_stores_value means unpack_info stores the whole original
1584 // value. There is no need to store the number of trimmed/padded endspaces
1585 // in that case.
1586 if (unpack_info && !fpi->m_unpack_info_stores_value) {
1587 // (value_length - trimmed_len) is the number of trimmed space *characters*
1588 // then, padding_bytes is the number of *bytes* added as padding
1589 // then, we add 8, because we don't store negative values.
1590 DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
1591 DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
1592 const size_t removed_chars =
1593 RDB_TRIMMED_CHARS_OFFSET +
1594 (value_length - trimmed_len) / fpi->space_mb_len -
1595 padding_bytes / fpi->space_xfrm_len;
1596
1597 if (fpi->m_unpack_info_uses_two_bytes) {
1598 unpack_info->write_uint16(removed_chars);
1599 } else {
1600 DBUG_ASSERT(removed_chars < 0x100);
1601 unpack_info->write_uint8(removed_chars);
1602 }
1603 }
1604
1605 *dst += encoded_size;
1606 }
1607
1608 /*
1609 Function of type rdb_index_field_unpack_t
1610 */
1611
rdb_unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1612 static int rdb_unpack_binary_or_utf8_varchar(Rdb_field_packing *const fpi,
1613 Field *const field, uchar *dst,
1614 Rdb_string_reader *const reader,
1615 Rdb_string_reader *const unp_reader
1616 __attribute__((__unused__))) {
1617 const uchar *ptr;
1618 size_t len = 0;
1619 bool finished = false;
1620 uchar *d0 = dst;
1621 Field_varstring *const field_var = (Field_varstring *)field;
1622 dst += field_var->length_bytes;
1623 // How much we can unpack
1624 size_t dst_len = field_var->pack_length() - field_var->length_bytes;
1625 uchar *const dst_end = dst + dst_len;
1626
1627 /* Decode the length-emitted encoding here */
1628 while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1629 /* See rdb_pack_with_varchar_encoding. */
1630 uchar pad = 255 - ptr[RDB_ESCAPE_LENGTH - 1]; // number of padding bytes
1631 uchar used_bytes = RDB_ESCAPE_LENGTH - 1 - pad;
1632
1633 if (used_bytes > RDB_ESCAPE_LENGTH - 1) {
1634 return UNPACK_FAILURE; /* cannot store that much, invalid data */
1635 }
1636
1637 if (dst_len < used_bytes) {
1638 /* Encoded index tuple is longer than the size in the record buffer? */
1639 return UNPACK_FAILURE;
1640 }
1641
1642 /*
1643 Now, we need to decode used_bytes of data and append them to the value.
1644 */
1645 if (fpi->m_varchar_charset == &my_charset_utf8_bin) {
1646 if (used_bytes & 1) {
1647 /*
1648 UTF-8 characters are encoded into two-byte entities. There is no way
1649 we can have an odd number of bytes after encoding.
1650 */
1651 return UNPACK_FAILURE;
1652 }
1653
1654 const uchar *src = ptr;
1655 const uchar *src_end = ptr + used_bytes;
1656 while (src < src_end) {
1657 my_wc_t wc = (src[0] << 8) | src[1];
1658 src += 2;
1659 const CHARSET_INFO *cset = fpi->m_varchar_charset;
1660 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
1661 DBUG_ASSERT(res > 0 && res <= 3);
1662 if (res < 0)
1663 return UNPACK_FAILURE;
1664 dst += res;
1665 len += res;
1666 dst_len -= res;
1667 }
1668 } else {
1669 memcpy(dst, ptr, used_bytes);
1670 dst += used_bytes;
1671 dst_len -= used_bytes;
1672 len += used_bytes;
1673 }
1674
1675 if (used_bytes < RDB_ESCAPE_LENGTH - 1) {
1676 finished = true;
1677 break;
1678 }
1679 }
1680
1681 if (!finished)
1682 return UNPACK_FAILURE;
1683
1684 /* Save the length */
1685 if (field_var->length_bytes == 1) {
1686 d0[0] = len;
1687 } else {
1688 DBUG_ASSERT(field_var->length_bytes == 2);
1689 int2store(d0, len);
1690 }
1691 return UNPACK_SUCCESS;
1692 }
1693
1694 /*
1695 @seealso
1696 rdb_pack_with_varchar_space_pad - packing function
1697 rdb_unpack_simple_varchar_space_pad - unpacking function for 'simple'
1698 charsets.
1699 rdb_skip_variable_space_pad - skip function
1700 */
rdb_unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1701 static int rdb_unpack_binary_or_utf8_varchar_space_pad(
1702 Rdb_field_packing *const fpi, Field *const field, uchar *dst,
1703 Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
1704 const uchar *ptr;
1705 size_t len = 0;
1706 bool finished = false;
1707 Field_varstring *const field_var = static_cast<Field_varstring *>(field);
1708 uchar *d0 = dst;
1709 uchar *dst_end = dst + field_var->pack_length();
1710 dst += field_var->length_bytes;
1711
1712 uint space_padding_bytes = 0;
1713 uint extra_spaces;
1714 if ((fpi->m_unpack_info_uses_two_bytes
1715 ? unp_reader->read_uint16(&extra_spaces)
1716 : unp_reader->read_uint8(&extra_spaces))) {
1717 return UNPACK_FAILURE;
1718 }
1719
1720 if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
1721 space_padding_bytes =
1722 -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
1723 extra_spaces = 0;
1724 } else
1725 extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
1726
1727 space_padding_bytes *= fpi->space_xfrm_len;
1728
1729 /* Decode the length-emitted encoding here */
1730 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1731 const char last_byte = ptr[fpi->m_segment_size - 1];
1732 size_t used_bytes;
1733 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment
1734 {
1735 if (space_padding_bytes > (fpi->m_segment_size - 1))
1736 return UNPACK_FAILURE; // Cannot happen, corrupted data
1737 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
1738 finished = true;
1739 } else {
1740 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
1741 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
1742 return UNPACK_FAILURE; // Invalid value
1743 }
1744 used_bytes = fpi->m_segment_size - 1;
1745 }
1746
1747 // Now, need to decode used_bytes of data and append them to the value.
1748 if (fpi->m_varchar_charset == &my_charset_utf8_bin) {
1749 if (used_bytes & 1) {
1750 /*
1751 UTF-8 characters are encoded into two-byte entities. There is no way
1752 we can have an odd number of bytes after encoding.
1753 */
1754 return UNPACK_FAILURE;
1755 }
1756
1757 const uchar *src = ptr;
1758 const uchar *const src_end = ptr + used_bytes;
1759 while (src < src_end) {
1760 my_wc_t wc = (src[0] << 8) | src[1];
1761 src += 2;
1762 const CHARSET_INFO *cset = fpi->m_varchar_charset;
1763 int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
1764 DBUG_ASSERT(res <= 3);
1765 if (res <= 0)
1766 return UNPACK_FAILURE;
1767 dst += res;
1768 len += res;
1769 }
1770 } else {
1771 if (dst + used_bytes > dst_end)
1772 return UNPACK_FAILURE;
1773 memcpy(dst, ptr, used_bytes);
1774 dst += used_bytes;
1775 len += used_bytes;
1776 }
1777
1778 if (finished) {
1779 if (extra_spaces) {
1780 // Both binary and UTF-8 charset store space as ' ',
1781 // so the following is ok:
1782 if (dst + extra_spaces > dst_end)
1783 return UNPACK_FAILURE;
1784 memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
1785 len += extra_spaces;
1786 }
1787 break;
1788 }
1789 }
1790
1791 if (!finished)
1792 return UNPACK_FAILURE;
1793
1794 /* Save the length */
1795 if (field_var->length_bytes == 1) {
1796 d0[0] = len;
1797 } else {
1798 DBUG_ASSERT(field_var->length_bytes == 2);
1799 int2store(d0, len);
1800 }
1801 return UNPACK_SUCCESS;
1802 }
1803
1804 /////////////////////////////////////////////////////////////////////////
1805
1806 /*
1807 Function of type rdb_make_unpack_info_t
1808 */
1809
rdb_make_unpack_unknown(const Rdb_collation_codec * codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)1810 static void rdb_make_unpack_unknown(const Rdb_collation_codec *codec
1811 __attribute__((__unused__)),
1812 const Field *const field,
1813 Rdb_pack_field_context *const pack_ctx) {
1814 pack_ctx->writer->write(field->ptr, field->pack_length());
1815 }
1816
1817 /*
1818 This point of this function is only to indicate that unpack_info is
1819 available.
1820
1821 The actual unpack_info data is produced by the function that packs the key,
1822 that is, rdb_pack_with_varchar_space_pad.
1823 */
1824
rdb_dummy_make_unpack_info(const Rdb_collation_codec * codec,const Field * field,Rdb_pack_field_context * pack_ctx)1825 static void rdb_dummy_make_unpack_info(const Rdb_collation_codec *codec
1826 __attribute__((__unused__)),
1827 const Field *field
1828 __attribute__((__unused__)),
1829 Rdb_pack_field_context *pack_ctx
1830 __attribute__((__unused__))) {}
1831
1832 /*
1833 Function of type rdb_index_field_unpack_t
1834 */
1835
rdb_unpack_unknown(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1836 static int rdb_unpack_unknown(Rdb_field_packing *const fpi, Field *const field,
1837 uchar *const dst, Rdb_string_reader *const reader,
1838 Rdb_string_reader *const unp_reader) {
1839 const uchar *ptr;
1840 const uint len = fpi->m_unpack_data_len;
1841 // We don't use anything from the key, so skip over it.
1842 if (rdb_skip_max_length(fpi, field, reader)) {
1843 return UNPACK_FAILURE;
1844 }
1845
1846 DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
1847
1848 if ((ptr = (const uchar *)unp_reader->read(len))) {
1849 memcpy(dst, ptr, len);
1850 return UNPACK_SUCCESS;
1851 }
1852 return UNPACK_FAILURE;
1853 }
1854
1855 /*
1856 Function of type rdb_make_unpack_info_t
1857 */
1858
rdb_make_unpack_unknown_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)1859 static void rdb_make_unpack_unknown_varchar(
1860 const Rdb_collation_codec *const codec __attribute__((__unused__)),
1861 const Field *const field, Rdb_pack_field_context *const pack_ctx) {
1862 const auto f = static_cast<const Field_varstring *>(field);
1863 uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
1864 len += f->length_bytes;
1865 pack_ctx->writer->write(field->ptr, len);
1866 }
1867
1868 /*
1869 Function of type rdb_index_field_unpack_t
1870
1871 @detail
1872 Unpack a key part in an "unknown" collation from its
1873 (mem_comparable_form, unpack_info) form.
1874
1875 "Unknown" means we have no clue about how mem_comparable_form is made from
1876 the original string, so we keep the whole original string in the unpack_info.
1877
1878 @seealso
1879 rdb_make_unpack_unknown, rdb_unpack_unknown
1880 */
1881
rdb_unpack_unknown_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1882 static int rdb_unpack_unknown_varchar(Rdb_field_packing *const fpi,
1883 Field *const field, uchar *dst,
1884 Rdb_string_reader *const reader,
1885 Rdb_string_reader *const unp_reader) {
1886 const uchar *ptr;
1887 uchar *const d0 = dst;
1888 const auto f = static_cast<Field_varstring *>(field);
1889 dst += f->length_bytes;
1890 const uint len_bytes = f->length_bytes;
1891 // We don't use anything from the key, so skip over it.
1892 if (fpi->m_skip_func(fpi, field, reader)) {
1893 return UNPACK_FAILURE;
1894 }
1895
1896 DBUG_ASSERT(len_bytes > 0);
1897 DBUG_ASSERT(unp_reader != nullptr);
1898
1899 if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
1900 memcpy(d0, ptr, len_bytes);
1901 const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
1902 if ((ptr = (const uchar *)unp_reader->read(len))) {
1903 memcpy(dst, ptr, len);
1904 return UNPACK_SUCCESS;
1905 }
1906 }
1907 return UNPACK_FAILURE;
1908 }
1909
1910 /*
1911 Write unpack_data for a "simple" collation
1912 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)1913 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
1914 const Rdb_collation_codec *const codec,
1915 const uchar *const src,
1916 const size_t src_len) {
1917 for (uint i = 0; i < src_len; i++) {
1918 writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
1919 }
1920 }
1921
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t & src_len,uchar * const dst)1922 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
1923 const Rdb_collation_codec *const codec,
1924 const uchar *const src,
1925 const size_t &src_len, uchar *const dst) {
1926 for (uint i = 0; i < src_len; i++) {
1927 if (codec->m_dec_size[src[i]] > 0) {
1928 uint *ret;
1929 DBUG_ASSERT(reader != nullptr);
1930
1931 if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
1932 return UNPACK_FAILURE;
1933 }
1934 dst[i] = codec->m_dec_idx[*ret][src[i]];
1935 } else {
1936 dst[i] = codec->m_dec_idx[0][src[i]];
1937 }
1938 }
1939
1940 return UNPACK_SUCCESS;
1941 }
1942
1943 /*
1944 Function of type rdb_make_unpack_info_t
1945
1946 @detail
1947 Make unpack_data for VARCHAR(n) in a "simple" charset.
1948 */
1949
1950 static void
rdb_make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)1951 rdb_make_unpack_simple_varchar(const Rdb_collation_codec *const codec,
1952 const Field *const field,
1953 Rdb_pack_field_context *const pack_ctx) {
1954 const auto f = static_cast<const Field_varstring *>(field);
1955 uchar *const src = f->ptr + f->length_bytes;
1956 const size_t src_len =
1957 f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
1958 Rdb_bit_writer bit_writer(pack_ctx->writer);
1959 // The std::min compares characters with bytes, but for simple collations,
1960 // mbmaxlen = 1.
1961 rdb_write_unpack_simple(&bit_writer, codec, src,
1962 std::min((size_t)f->char_length(), src_len));
1963 }
1964
1965 /*
1966 Function of type rdb_index_field_unpack_t
1967
1968 @seealso
1969 rdb_pack_with_varchar_space_pad - packing function
1970 rdb_unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
1971 */
1972
rdb_unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1973 int rdb_unpack_simple_varchar_space_pad(Rdb_field_packing *const fpi,
1974 Field *const field, uchar *dst,
1975 Rdb_string_reader *const reader,
1976 Rdb_string_reader *const unp_reader) {
1977 const uchar *ptr;
1978 size_t len = 0;
1979 bool finished = false;
1980 uchar *d0 = dst;
1981 const Field_varstring *const field_var =
1982 static_cast<Field_varstring *>(field);
1983 // For simple collations, char_length is also number of bytes.
1984 DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
1985 uchar *dst_end = dst + field_var->pack_length();
1986 dst += field_var->length_bytes;
1987 Rdb_bit_reader bit_reader(unp_reader);
1988
1989 uint space_padding_bytes = 0;
1990 uint extra_spaces;
1991 DBUG_ASSERT(unp_reader != nullptr);
1992
1993 if ((fpi->m_unpack_info_uses_two_bytes
1994 ? unp_reader->read_uint16(&extra_spaces)
1995 : unp_reader->read_uint8(&extra_spaces))) {
1996 return UNPACK_FAILURE;
1997 }
1998
1999 if (extra_spaces <= 8) {
2000 space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2001 extra_spaces = 0;
2002 } else
2003 extra_spaces -= 8;
2004
2005 space_padding_bytes *= fpi->space_xfrm_len;
2006
2007 /* Decode the length-emitted encoding here */
2008 while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2009 const char last_byte =
2010 ptr[fpi->m_segment_size - 1]; // number of padding bytes
2011 size_t used_bytes;
2012 if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2013 // this is the last one
2014 if (space_padding_bytes > (fpi->m_segment_size - 1))
2015 return UNPACK_FAILURE; // Cannot happen, corrupted data
2016 used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2017 finished = true;
2018 } else {
2019 if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2020 last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2021 return UNPACK_FAILURE;
2022 }
2023 used_bytes = fpi->m_segment_size - 1;
2024 }
2025
2026 if (dst + used_bytes > dst_end) {
2027 // The value on disk is longer than the field definition allows?
2028 return UNPACK_FAILURE;
2029 }
2030
2031 uint ret;
2032 if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2033 used_bytes, dst)) != UNPACK_SUCCESS) {
2034 return ret;
2035 }
2036
2037 dst += used_bytes;
2038 len += used_bytes;
2039
2040 if (finished) {
2041 if (extra_spaces) {
2042 if (dst + extra_spaces > dst_end)
2043 return UNPACK_FAILURE;
2044 // pad_char has a 1-byte form in all charsets that
2045 // are handled by rdb_init_collation_mapping.
2046 memset(dst, field_var->charset()->pad_char, extra_spaces);
2047 len += extra_spaces;
2048 }
2049 break;
2050 }
2051 }
2052
2053 if (!finished)
2054 return UNPACK_FAILURE;
2055
2056 /* Save the length */
2057 if (field_var->length_bytes == 1) {
2058 d0[0] = len;
2059 } else {
2060 DBUG_ASSERT(field_var->length_bytes == 2);
2061 int2store(d0, len);
2062 }
2063 return UNPACK_SUCCESS;
2064 }
2065
2066 /*
2067 Function of type rdb_make_unpack_info_t
2068
2069 @detail
2070 Make unpack_data for CHAR(n) value in a "simple" charset.
2071 It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2072
2073 @seealso
2074 The VARCHAR variant is in rdb_make_unpack_simple_varchar
2075 */
2076
rdb_make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2077 static void rdb_make_unpack_simple(const Rdb_collation_codec *const codec,
2078 const Field *const field,
2079 Rdb_pack_field_context *const pack_ctx) {
2080 const uchar *const src = field->ptr;
2081 Rdb_bit_writer bit_writer(pack_ctx->writer);
2082 rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2083 }
2084
2085 /*
2086 Function of type rdb_index_field_unpack_t
2087 */
2088
rdb_unpack_simple(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2089 static int rdb_unpack_simple(Rdb_field_packing *const fpi,
2090 Field *const field __attribute__((__unused__)),
2091 uchar *const dst, Rdb_string_reader *const reader,
2092 Rdb_string_reader *const unp_reader) {
2093 const uchar *ptr;
2094 const uint len = fpi->m_max_image_len;
2095 Rdb_bit_reader bit_reader(unp_reader);
2096
2097 if (!(ptr = (const uchar *)reader->read(len))) {
2098 return UNPACK_FAILURE;
2099 }
2100
2101 return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2102 fpi->m_charset_codec, ptr, len, dst);
2103 }
2104
2105 // See Rdb_charset_space_info::spaces_xfrm
2106 const int RDB_SPACE_XFRM_SIZE = 32;
2107
2108 // A class holding information about how space character is represented in a
2109 // charset.
2110 class Rdb_charset_space_info {
2111 public:
2112 Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
2113 Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
2114 Rdb_charset_space_info() = default;
2115
2116 // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
2117 std::vector<uchar> spaces_xfrm;
2118
2119 // length(strxfrm(' '))
2120 size_t space_xfrm_len;
2121
2122 // length of the space character itself
2123 // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
2124 // (length=2)
2125 size_t space_mb_len;
2126 };
2127
2128 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
2129 rdb_mem_comparable_space;
2130
2131 /*
2132 @brief
2133 For a given charset, get
2134 - strxfrm(' '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
2135 - length of strxfrm(charset, ' ')
2136 - length of the space character in the charset
2137
2138 @param cs IN Charset to get the space for
2139 @param ptr OUT A few space characters
2140 @param len OUT Return length of the space (in bytes)
2141
2142 @detail
2143 It is tempting to pre-generate mem-comparable form of space character for
2144 every charset on server startup.
2145 One can't do that: some charsets are not initialized until somebody
2146 attempts to use them (e.g. create or open a table that has a field that
2147 uses the charset).
2148 */
2149
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)2150 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
2151 const std::vector<uchar> **xfrm,
2152 size_t *const xfrm_len,
2153 size_t *const mb_len) {
2154 DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
2155 if (!rdb_mem_comparable_space[cs->number].get()) {
2156 mysql_mutex_lock(&rdb_mem_cmp_space_mutex);
2157 if (!rdb_mem_comparable_space[cs->number].get()) {
2158 // Upper bound of how many bytes can be occupied by multi-byte form of a
2159 // character in any charset.
2160 const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
2161 DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
2162
2163 // multi-byte form of the ' ' (space) character
2164 uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
2165
2166 const size_t space_mb_len = cs->cset->wc_mb(
2167 cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
2168
2169 uchar space[20]; // mem-comparable image of the space character
2170
2171 const size_t space_len = cs->coll->strnxfrm(cs, space, sizeof(space), 1,
2172 space_mb, space_mb_len, 0);
2173 Rdb_charset_space_info *const info = new Rdb_charset_space_info;
2174 info->space_xfrm_len = space_len;
2175 info->space_mb_len = space_mb_len;
2176 while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
2177 info->spaces_xfrm.insert(info->spaces_xfrm.end(), space,
2178 space + space_len);
2179 }
2180 rdb_mem_comparable_space[cs->number].reset(info);
2181 }
2182 mysql_mutex_unlock(&rdb_mem_cmp_space_mutex);
2183 }
2184
2185 *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
2186 *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
2187 *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
2188 }
2189
2190 mysql_mutex_t rdb_mem_cmp_space_mutex;
2191
2192 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
2193 rdb_collation_data;
2194 mysql_mutex_t rdb_collation_data_mutex;
2195
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)2196 static bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
2197 return (cs->coll == &my_collation_8bit_simple_ci_handler);
2198 }
2199
2200 static const Rdb_collation_codec *
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)2201 rdb_init_collation_mapping(const my_core::CHARSET_INFO *const cs) {
2202 DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
2203 const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
2204
2205 if (codec == nullptr && rdb_is_collation_supported(cs)) {
2206 mysql_mutex_lock(&rdb_collation_data_mutex);
2207 codec = rdb_collation_data[cs->number];
2208 if (codec == nullptr) {
2209 Rdb_collation_codec *cur = nullptr;
2210
2211 // Compute reverse mapping for simple collations.
2212 if (cs->coll == &my_collation_8bit_simple_ci_handler) {
2213 cur = new Rdb_collation_codec;
2214 std::map<uchar, std::vector<uchar>> rev_map;
2215 size_t max_conflict_size = 0;
2216 for (int src = 0; src < 256; src++) {
2217 uchar dst = cs->sort_order[src];
2218 rev_map[dst].push_back(src);
2219 max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
2220 }
2221 cur->m_dec_idx.resize(max_conflict_size);
2222
2223 for (auto const &p : rev_map) {
2224 uchar dst = p.first;
2225 for (uint idx = 0; idx < p.second.size(); idx++) {
2226 uchar src = p.second[idx];
2227 uchar bits =
2228 my_bit_log2(my_round_up_to_next_power(p.second.size()));
2229 cur->m_enc_idx[src] = idx;
2230 cur->m_enc_size[src] = bits;
2231 cur->m_dec_size[dst] = bits;
2232 cur->m_dec_idx[idx][dst] = src;
2233 }
2234 }
2235
2236 cur->m_make_unpack_info_func = {
2237 {rdb_make_unpack_simple_varchar, rdb_make_unpack_simple}};
2238 cur->m_unpack_func = {
2239 {rdb_unpack_simple_varchar_space_pad, rdb_unpack_simple}};
2240 } else {
2241 // Out of luck for now.
2242 }
2243
2244 if (cur != nullptr) {
2245 codec = cur;
2246 cur->m_cs = cs;
2247 rdb_collation_data[cs->number] = cur;
2248 }
2249 }
2250 mysql_mutex_unlock(&rdb_collation_data_mutex);
2251 }
2252
2253 return codec;
2254 }
2255
get_segment_size_from_collation(const CHARSET_INFO * const cs)2256 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
2257 int ret;
2258 if (cs == &my_charset_utf8mb4_bin || cs == &my_charset_utf16_bin ||
2259 cs == &my_charset_utf16le_bin || cs == &my_charset_utf32_bin) {
2260 /*
2261 In these collations, a character produces one weight, which is 3 bytes.
2262 Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
2263 get 3*3+1=10
2264 */
2265 ret = 10;
2266 } else {
2267 /*
2268 All other collations. There are two classes:
2269 - Unicode-based, except for collations mentioned in the if-condition.
2270 For these all weights are 2 bytes long, a character may produce 0..8
2271 weights.
2272 in any case, 8 bytes of payload in the segment guarantee that the last
2273 space character won't span across segments.
2274
2275 - Collations not based on unicode. These have length(strxfrm(' '))=1,
2276 there nothing to worry about.
2277
2278 In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
2279 */
2280 ret = 9;
2281 }
2282 DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
2283 return ret;
2284 }
2285
2286 /*
2287 @brief
2288 Setup packing of index field into its mem-comparable form
2289
2290 @detail
2291 - It is possible produce mem-comparable form for any datatype.
2292 - Some datatypes also allow to unpack the original value from its
2293 mem-comparable form.
2294 = Some of these require extra information to be stored in "unpack_info".
2295 unpack_info is not a part of mem-comparable form, it is only used to
2296 restore the original value
2297
2298 @param
2299 field IN field to be packed/un-packed
2300
2301 @return
2302 TRUE - Field can be read with index-only reads
2303 FALSE - Otherwise
2304 */
2305
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint & keynr_arg,const uint & key_part_arg,const uint16 & key_length)2306 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
2307 const Field *const field, const uint &keynr_arg,
2308 const uint &key_part_arg,
2309 const uint16 &key_length) {
2310 int res = false;
2311 enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
2312
2313 m_keynr = keynr_arg;
2314 m_key_part = key_part_arg;
2315
2316 m_maybe_null = field ? field->real_maybe_null() : false;
2317 m_unpack_func = nullptr;
2318 m_make_unpack_info_func = nullptr;
2319 m_unpack_data_len = 0;
2320 space_xfrm = nullptr; // safety
2321
2322 /* Calculate image length. By default, is is pack_length() */
2323 m_max_image_len =
2324 field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
2325 m_skip_func = rdb_skip_max_length;
2326 m_pack_func = rdb_pack_with_make_sort_key;
2327
2328 switch (type) {
2329 case MYSQL_TYPE_LONGLONG:
2330 case MYSQL_TYPE_LONG:
2331 case MYSQL_TYPE_INT24:
2332 case MYSQL_TYPE_SHORT:
2333 case MYSQL_TYPE_TINY:
2334 m_unpack_func = rdb_unpack_integer;
2335 return true;
2336
2337 case MYSQL_TYPE_DOUBLE:
2338 m_unpack_func = rdb_unpack_double;
2339 return true;
2340
2341 case MYSQL_TYPE_FLOAT:
2342 m_unpack_func = rdb_unpack_float;
2343 return true;
2344
2345 case MYSQL_TYPE_NEWDECIMAL:
2346 /*
2347 Decimal is packed with Field_new_decimal::make_sort_key, which just
2348 does memcpy.
2349 Unpacking decimal values was supported only after fix for issue#253,
2350 because of that ha_rocksdb::get_storage_type() handles decimal values
2351 in a special way.
2352 */
2353 case MYSQL_TYPE_DATETIME2:
2354 case MYSQL_TYPE_TIMESTAMP2:
2355 /* These are packed with Field_temporal_with_date_and_timef::make_sort_key */
2356 case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
2357 case MYSQL_TYPE_YEAR: /* YEAR is packed with Field_tiny::make_sort_key */
2358 /* Everything that comes here is packed with just a memcpy(). */
2359 m_unpack_func = rdb_unpack_binary_str;
2360 return true;
2361
2362 case MYSQL_TYPE_NEWDATE:
2363 /*
2364 This is packed by Field_newdate::make_sort_key. It assumes the data is
2365 3 bytes, and packing is done by swapping the byte order (for both big-
2366 and little-endian)
2367 */
2368 m_unpack_func = rdb_unpack_newdate;
2369 return true;
2370 case MYSQL_TYPE_TINY_BLOB:
2371 case MYSQL_TYPE_MEDIUM_BLOB:
2372 case MYSQL_TYPE_LONG_BLOB:
2373 case MYSQL_TYPE_BLOB: {
2374 if (key_descr) {
2375 // The my_charset_bin collation is special in that it will consider
2376 // shorter strings sorting as less than longer strings.
2377 //
2378 // See Field_blob::make_sort_key for details.
2379 m_max_image_len =
2380 key_length + (field->charset() == &my_charset_bin
2381 ? reinterpret_cast<const Field_blob *>(field)
2382 ->pack_length_no_ptr()
2383 : 0);
2384 // Return false because indexes on text/blob will always require
2385 // a prefix. With a prefix, the optimizer will not be able to do an
2386 // index-only scan since there may be content occuring after the prefix
2387 // length.
2388 return false;
2389 }
2390 }
2391 default:
2392 break;
2393 }
2394
2395 m_unpack_info_stores_value = false;
2396 /* Handle [VAR](CHAR|BINARY) */
2397
2398 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
2399 /*
2400 For CHAR-based columns, check how strxfrm image will take.
2401 field->field_length = field->char_length() * cs->mbmaxlen.
2402 */
2403 const CHARSET_INFO *cs = field->charset();
2404 m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
2405 }
2406 const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
2407 const CHARSET_INFO *cs = field->charset();
2408 // max_image_len before chunking is taken into account
2409 const int max_image_len_before_chunks = m_max_image_len;
2410
2411 if (is_varchar) {
2412 // The default for varchar is variable-length, without space-padding for
2413 // comparisons
2414 m_varchar_charset = cs;
2415 m_skip_func = rdb_skip_variable_length;
2416 m_pack_func = rdb_pack_with_varchar_encoding;
2417 m_max_image_len =
2418 (m_max_image_len / (RDB_ESCAPE_LENGTH - 1) + 1) * RDB_ESCAPE_LENGTH;
2419
2420 const auto field_var = static_cast<const Field_varstring *>(field);
2421 m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
2422 }
2423
2424 if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
2425 // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
2426 // information about character-based datatypes are compared.
2427 bool use_unknown_collation = false;
2428 DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
2429 use_unknown_collation = true;);
2430
2431 if (cs == &my_charset_bin) {
2432 // - SQL layer pads BINARY(N) so that it always is N bytes long.
2433 // - For VARBINARY(N), values may have different lengths, so we're using
2434 // variable-length encoding. This is also the only charset where the
2435 // values are not space-padded for comparison.
2436 m_unpack_func = is_varchar ? rdb_unpack_binary_or_utf8_varchar
2437 : rdb_unpack_binary_str;
2438 res = true;
2439 } else if (cs == &my_charset_latin1_bin || cs == &my_charset_utf8_bin) {
2440 // For _bin collations, mem-comparable form of the string is the string
2441 // itself.
2442
2443 if (is_varchar) {
2444 // VARCHARs - are compared as if they were space-padded - but are
2445 // not actually space-padded (reading the value back produces the
2446 // original value, without the padding)
2447 m_unpack_func = rdb_unpack_binary_or_utf8_varchar_space_pad;
2448 m_skip_func = rdb_skip_variable_space_pad;
2449 m_pack_func = rdb_pack_with_varchar_space_pad;
2450 m_make_unpack_info_func = rdb_dummy_make_unpack_info;
2451 m_segment_size = get_segment_size_from_collation(cs);
2452 m_max_image_len =
2453 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
2454 m_segment_size;
2455 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
2456 &space_mb_len);
2457 } else {
2458 // SQL layer pads CHAR(N) values to their maximum length.
2459 // We just store that and restore it back.
2460 m_unpack_func = (cs == &my_charset_latin1_bin) ? rdb_unpack_binary_str
2461 : rdb_unpack_utf8_str;
2462 }
2463 res = true;
2464 } else {
2465 // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
2466
2467 res = true; // index-only scans are possible
2468 m_unpack_data_len = is_varchar ? 0 : field->field_length;
2469 const uint idx = is_varchar ? 0 : 1;
2470 const Rdb_collation_codec *codec = nullptr;
2471
2472 if (is_varchar) {
2473 // VARCHAR requires space-padding for doing comparisons
2474 //
2475 // The check for cs->levels_for_order is to catch
2476 // latin2_czech_cs and cp1250_czech_cs - multi-level collations
2477 // that Variable-Length Space Padded Encoding can't handle.
2478 // It is not expected to work for any other multi-level collations,
2479 // either.
2480 // Currently we handle these collations as NO_PAD, even if they have
2481 // PAD_SPACE attribute.
2482 if (cs->levels_for_order == 1) {
2483 m_pack_func = rdb_pack_with_varchar_space_pad;
2484 m_skip_func = rdb_skip_variable_space_pad;
2485 m_segment_size = get_segment_size_from_collation(cs);
2486 m_max_image_len =
2487 (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
2488 m_segment_size;
2489 rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
2490 &space_mb_len);
2491 } else {
2492 // NO_LINT_DEBUG
2493 sql_print_warning("RocksDB: you're trying to create an index "
2494 "with a multi-level collation %s",
2495 cs->name);
2496 // NO_LINT_DEBUG
2497 sql_print_warning("MyRocks will handle this collation internally "
2498 " as if it had a NO_PAD attribute.");
2499 m_pack_func = rdb_pack_with_varchar_encoding;
2500 m_skip_func = rdb_skip_variable_length;
2501 }
2502 }
2503
2504 if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
2505 // The collation allows to store extra information in the unpack_info
2506 // which can be used to restore the original value from the
2507 // mem-comparable form.
2508 m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
2509 m_unpack_func = codec->m_unpack_func[idx];
2510 m_charset_codec = codec;
2511 } else if (use_unknown_collation) {
2512 // We have no clue about how this collation produces mem-comparable
2513 // form. Our way of restoring the original value is to keep a copy of
2514 // the original value in unpack_info.
2515 m_unpack_info_stores_value = true;
2516 m_make_unpack_info_func = is_varchar ? rdb_make_unpack_unknown_varchar
2517 : rdb_make_unpack_unknown;
2518 m_unpack_func =
2519 is_varchar ? rdb_unpack_unknown_varchar : rdb_unpack_unknown;
2520 } else {
2521 // Same as above: we don't know how to restore the value from its
2522 // mem-comparable form.
2523 // Here, we just indicate to the SQL layer we can't do it.
2524 DBUG_ASSERT(m_unpack_func == nullptr);
2525 m_unpack_info_stores_value = false;
2526 res = false; // Indicate that index-only reads are not possible
2527 }
2528 }
2529
2530 // Make an adjustment: unpacking partially covered columns is not
2531 // possible. field->table is populated when called through
2532 // Rdb_key_def::setup, but not during ha_rocksdb::index_flags.
2533 if (field->table) {
2534 // Get the original Field object and compare lengths. If this key part is
2535 // a prefix of a column, then we can't do index-only scans.
2536 if (field->table->field[field->field_index]->field_length != key_length) {
2537 m_unpack_func = nullptr;
2538 m_make_unpack_info_func = nullptr;
2539 m_unpack_info_stores_value = true;
2540 res = false;
2541 }
2542 } else {
2543 if (field->field_length != key_length) {
2544 m_unpack_func = nullptr;
2545 m_make_unpack_info_func = nullptr;
2546 m_unpack_info_stores_value = true;
2547 res = false;
2548 }
2549 }
2550 }
2551 return res;
2552 }
2553
get_field_in_table(const TABLE * const tbl) const2554 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
2555 return tbl->key_info[m_keynr].key_part[m_key_part].field;
2556 }
2557
fill_hidden_pk_val(uchar ** dst,const longlong & hidden_pk_id) const2558 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
2559 const longlong &hidden_pk_id) const {
2560 DBUG_ASSERT(m_max_image_len == 8);
2561
2562 String to;
2563 rdb_netstr_append_uint64(&to, hidden_pk_id);
2564 memcpy(*dst, to.ptr(), m_max_image_len);
2565
2566 *dst += m_max_image_len;
2567 }
2568
2569 ///////////////////////////////////////////////////////////////////////////////////////////
2570 // Rdb_ddl_manager
2571 ///////////////////////////////////////////////////////////////////////////////////////////
2572
~Rdb_tbl_def()2573 Rdb_tbl_def::~Rdb_tbl_def() {
2574 auto ddl_manager = rdb_get_ddl_manager();
2575 /* Don't free key definitions */
2576 if (m_key_descr_arr) {
2577 for (uint i = 0; i < m_key_count; i++) {
2578 if (ddl_manager && m_key_descr_arr[i]) {
2579 ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
2580 }
2581
2582 m_key_descr_arr[i] = nullptr;
2583 }
2584
2585 delete[] m_key_descr_arr;
2586 m_key_descr_arr = nullptr;
2587 }
2588 }
2589
2590 /*
2591 Put table definition DDL entry. Actual write is done at
2592 Rdb_dict_manager::commit.
2593
2594 We write
2595 dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
2596
2597 Where key entries are a tuple of
2598 ( cf_id, index_nr )
2599 */
2600
put_dict(Rdb_dict_manager * const dict,rocksdb::WriteBatch * const batch,uchar * const key,const size_t & keylen)2601 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
2602 rocksdb::WriteBatch *const batch, uchar *const key,
2603 const size_t &keylen) {
2604 StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
2605 indexes.alloc(Rdb_key_def::VERSION_SIZE +
2606 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
2607 rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
2608
2609 for (uint i = 0; i < m_key_count; i++) {
2610 const Rdb_key_def &kd = *m_key_descr_arr[i];
2611
2612 const uchar flags =
2613 (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
2614 (kd.m_is_auto_cf ? Rdb_key_def::AUTO_CF_FLAG : 0);
2615
2616 const uint cf_id = kd.get_cf()->GetID();
2617 /*
2618 If cf_id already exists, cf_flags must be the same.
2619 To prevent race condition, reading/modifying/committing CF flags
2620 need to be protected by mutex (dict_manager->lock()).
2621 When RocksDB supports transaction with pessimistic concurrency
2622 control, we can switch to use it and removing mutex.
2623 */
2624 uint existing_cf_flags;
2625 if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
2626 if (existing_cf_flags != flags) {
2627 my_printf_error(ER_UNKNOWN_ERROR,
2628 "Column Family Flag is different from existing flag. "
2629 "Assign a new CF flag, or do not change existing "
2630 "CF flag.",
2631 MYF(0));
2632 return true;
2633 }
2634 } else {
2635 dict->add_cf_flags(batch, cf_id, flags);
2636 }
2637
2638 rdb_netstr_append_uint32(&indexes, cf_id);
2639 rdb_netstr_append_uint32(&indexes, kd.m_index_number);
2640 dict->add_or_update_index_cf_mapping(batch, kd.m_index_type,
2641 kd.m_kv_format_version,
2642 kd.m_index_number, cf_id);
2643 }
2644
2645 const rocksdb::Slice skey((char *)key, keylen);
2646 const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
2647
2648 dict->put_key(batch, skey, svalue);
2649 return false;
2650 }
2651
check_if_is_mysql_system_table()2652 void Rdb_tbl_def::check_if_is_mysql_system_table() {
2653 static const char *const system_dbs[] = {
2654 "mysql", "performance_schema", "information_schema",
2655 };
2656
2657 m_is_mysql_system_table = false;
2658 for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
2659 if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
2660 m_is_mysql_system_table = true;
2661 break;
2662 }
2663 }
2664 }
2665
set_name(const std::string & name)2666 void Rdb_tbl_def::set_name(const std::string &name) {
2667 int err __attribute__((__unused__));
2668
2669 m_dbname_tablename = name;
2670 err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
2671 &m_partition);
2672 DBUG_ASSERT(err == 0);
2673
2674 check_if_is_mysql_system_table();
2675 }
2676
2677 /*
2678 Static function of type my_hash_get_key that gets invoked by
2679 the m_ddl_hash object of type my_core::HASH.
2680 It manufactures a key (db+table name in our case) from a record
2681 (Rdb_tbl_def in our case).
2682 */
get_hash_key(Rdb_tbl_def * const rec,size_t * const length,my_bool not_used)2683 const uchar *Rdb_ddl_manager::get_hash_key(Rdb_tbl_def *const rec,
2684 size_t *const length,
2685 my_bool not_used
2686 __attribute__((__unused__))) {
2687 const std::string &dbname_tablename = rec->full_tablename();
2688 *length = dbname_tablename.size();
2689 return reinterpret_cast<const uchar *>(dbname_tablename.c_str());
2690 }
2691
2692 /*
2693 Static function of type void (*my_hash_free_element_func_t)(void*) that gets
2694 invoked by the m_ddl_hash object of type my_core::HASH.
2695 It deletes a record (Rdb_tbl_def in our case).
2696 */
free_hash_elem(void * const data)2697 void Rdb_ddl_manager::free_hash_elem(void *const data) {
2698 Rdb_tbl_def *elem = reinterpret_cast<Rdb_tbl_def *>(data);
2699 delete elem;
2700 }
2701
erase_index_num(const GL_INDEX_ID & gl_index_id)2702 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
2703 m_index_num_to_keydef.erase(gl_index_id);
2704 }
2705
2706 namespace // anonymous namespace = not visible outside this source file
2707 {
2708 struct Rdb_validate_tbls : public Rdb_tables_scanner {
2709 using tbl_info_t = std::pair<std::string, bool>;
2710 using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
2711
2712 tbl_list_t m_list;
2713
2714 int add_table(Rdb_tbl_def *tdef) override;
2715
2716 bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
2717
2718 bool scan_for_frms(const std::string &datadir, const std::string &dbname,
2719 bool *has_errors);
2720
2721 bool check_frm_file(const std::string &fullpath, const std::string &dbname,
2722 const std::string &tablename, bool *has_errors);
2723 };
2724 } // anonymous namespace
2725
2726 /*
2727 Get a list of tables that we expect to have .frm files for. This will use the
2728 information just read from the RocksDB data dictionary.
2729 */
add_table(Rdb_tbl_def * tdef)2730 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
2731 DBUG_ASSERT(tdef != nullptr);
2732
2733 /* Add the database/table into the list */
2734 bool is_partition = tdef->base_partition().size() != 0;
2735 m_list[tdef->base_dbname()].insert(
2736 tbl_info_t(tdef->base_tablename(), is_partition));
2737
2738 return HA_EXIT_SUCCESS;
2739 }
2740
2741 /*
2742 Access the .frm file for this dbname/tablename and see if it is a RocksDB
2743 table (or partition table).
2744 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)2745 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
2746 const std::string &dbname,
2747 const std::string &tablename,
2748 bool *has_errors) {
2749 /* Check this .frm file to see what engine it uses */
2750 String fullfilename(fullpath.c_str(), &my_charset_bin);
2751 fullfilename.append(FN_DIRSEP);
2752 fullfilename.append(tablename.c_str());
2753 fullfilename.append(".frm");
2754
2755 /*
2756 This function will return the legacy_db_type of the table. Currently
2757 it does not reference the first parameter (THD* thd), but if it ever
2758 did in the future we would need to make a version that does it without
2759 the connection handle as we don't have one here.
2760 */
2761 enum legacy_db_type eng_type;
2762 frm_type_enum type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type);
2763 if (type == FRMTYPE_ERROR) {
2764 sql_print_warning("RocksDB: Failed to open/read .from file: %s",
2765 fullfilename.ptr());
2766 return false;
2767 }
2768
2769 if (type == FRMTYPE_TABLE) {
2770 /* For a RocksDB table do we have a reference in the data dictionary? */
2771 if (eng_type == DB_TYPE_ROCKSDB) {
2772 /*
2773 Attempt to remove the table entry from the list of tables. If this
2774 fails then we know we had a .frm file that wasn't registered in RocksDB.
2775 */
2776 tbl_info_t element(tablename, false);
2777 if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
2778 sql_print_warning("RocksDB: Schema mismatch - "
2779 "A .frm file exists for table %s.%s, "
2780 "but that table is not registered in RocksDB",
2781 dbname.c_str(), tablename.c_str());
2782 *has_errors = true;
2783 }
2784 } else if (eng_type == DB_TYPE_PARTITION_DB) {
2785 /*
2786 For partition tables, see if it is in the m_list as a partition,
2787 but don't generate an error if it isn't there - we don't know that the
2788 .frm is for RocksDB.
2789 */
2790 if (m_list.count(dbname) > 0) {
2791 m_list[dbname].erase(tbl_info_t(tablename, true));
2792 }
2793 }
2794 }
2795
2796 return true;
2797 }
2798
2799 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)2800 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
2801 const std::string &dbname,
2802 bool *has_errors) {
2803 bool result = true;
2804 std::string fullpath = datadir + dbname;
2805 struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
2806
2807 /* Access the directory */
2808 if (dir_info == nullptr) {
2809 sql_print_warning("RocksDB: Could not open database directory: %s",
2810 fullpath.c_str());
2811 return false;
2812 }
2813
2814 /* Scan through the files in the directory */
2815 struct fileinfo *file_info = dir_info->dir_entry;
2816 for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
2817 /* Find .frm files that are not temp files (those that start with '#') */
2818 const char *ext = strrchr(file_info->name, '.');
2819 if (ext != nullptr && !is_prefix(file_info->name, tmp_file_prefix) &&
2820 strcmp(ext, ".frm") == 0) {
2821 std::string tablename =
2822 std::string(file_info->name, ext - file_info->name);
2823
2824 /* Check to see if the .frm file is from RocksDB */
2825 if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
2826 result = false;
2827 break;
2828 }
2829 }
2830 }
2831
2832 /* Remove any databases who have no more tables listed */
2833 if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
2834 m_list.erase(dbname);
2835 }
2836
2837 /* Release the directory entry */
2838 my_dirend(dir_info);
2839
2840 return result;
2841 }
2842
2843 /*
2844 Scan the datadir for all databases (subdirectories) and get a list of .frm
2845 files they contain
2846 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)2847 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
2848 bool *has_errors) {
2849 bool result = true;
2850 struct st_my_dir *dir_info;
2851 struct fileinfo *file_info;
2852
2853 dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
2854 if (dir_info == nullptr) {
2855 sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
2856 return false;
2857 }
2858
2859 file_info = dir_info->dir_entry;
2860 for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
2861 /* Ignore files/dirs starting with '.' */
2862 if (file_info->name[0] == '.')
2863 continue;
2864
2865 /* Ignore all non-directory files */
2866 if (!MY_S_ISDIR(file_info->mystat->st_mode))
2867 continue;
2868
2869 /* Scan all the .frm files in the directory */
2870 if (!scan_for_frms(datadir, file_info->name, has_errors)) {
2871 result = false;
2872 break;
2873 }
2874 }
2875
2876 /* Release the directory info */
2877 my_dirend(dir_info);
2878
2879 return result;
2880 }
2881
2882 /*
2883 Validate that all the tables in the RocksDB database dictionary match the .frm
2884 files in the datdir
2885 */
validate_schemas(void)2886 bool Rdb_ddl_manager::validate_schemas(void) {
2887 bool has_errors = false;
2888 const std::string datadir = std::string(mysql_real_data_home);
2889 Rdb_validate_tbls table_list;
2890
2891 /* Get the list of tables from the database dictionary */
2892 if (scan_for_tables(&table_list) != 0) {
2893 return false;
2894 }
2895
2896 /* Compare that to the list of actual .frm files */
2897 if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
2898 return false;
2899 }
2900
2901 /*
2902 Any tables left in the tables list are ones that are registered in RocksDB
2903 but don't have .frm files.
2904 */
2905 for (const auto &db : table_list.m_list) {
2906 for (const auto &table : db.second) {
2907 sql_print_warning("RocksDB: Schema mismatch - "
2908 "Table %s.%s is registered in RocksDB "
2909 "but does not have a .frm file",
2910 db.first.c_str(), table.first.c_str());
2911 has_errors = true;
2912 }
2913 }
2914
2915 return !has_errors;
2916 }
2917
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t & validate_tables)2918 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
2919 Rdb_cf_manager *const cf_manager,
2920 const uint32_t &validate_tables) {
2921 const ulong TABLE_HASH_SIZE = 32;
2922 m_dict = dict_arg;
2923 mysql_rwlock_init(0, &m_rwlock);
2924 (void)my_hash_init(&m_ddl_hash,
2925 /*system_charset_info*/ &my_charset_bin, TABLE_HASH_SIZE,
2926 0, 0, (my_hash_get_key)Rdb_ddl_manager::get_hash_key,
2927 Rdb_ddl_manager::free_hash_elem, 0);
2928
2929 /* Read the data dictionary and populate the hash */
2930 uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
2931 rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
2932 const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
2933 Rdb_key_def::INDEX_NUMBER_SIZE);
2934
2935 /* Reading data dictionary should always skip bloom filter */
2936 rocksdb::Iterator *it = m_dict->new_iterator();
2937 int i = 0;
2938
2939 uint max_index_id_in_dict = 0;
2940 m_dict->get_max_index_id(&max_index_id_in_dict);
2941
2942 for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
2943 const uchar *ptr;
2944 const uchar *ptr_end;
2945 const rocksdb::Slice key = it->key();
2946 const rocksdb::Slice val = it->value();
2947
2948 if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
2949 memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE))
2950 break;
2951
2952 if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
2953 sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
2954 (int)key.size());
2955 return true;
2956 }
2957
2958 Rdb_tbl_def *const tdef =
2959 new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
2960
2961 // Now, read the DDLs.
2962 const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
2963 if (real_val_size % Rdb_key_def::PACKED_SIZE * 2) {
2964 sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
2965 tdef->full_tablename().c_str());
2966 return true;
2967 }
2968 tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
2969 tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
2970
2971 ptr = reinterpret_cast<const uchar *>(val.data());
2972 const int version = rdb_netbuf_read_uint16(&ptr);
2973 if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
2974 sql_print_error("RocksDB: DDL ENTRY Version was not expected."
2975 "Expected: %d, Actual: %d",
2976 Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
2977 return true;
2978 }
2979 ptr_end = ptr + real_val_size;
2980 for (uint keyno = 0; ptr < ptr_end; keyno++) {
2981 GL_INDEX_ID gl_index_id;
2982 rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
2983 uint16 m_index_dict_version = 0;
2984 uchar m_index_type = 0;
2985 uint16 kv_version = 0;
2986 uint flags = 0;
2987 if (!m_dict->get_index_info(gl_index_id, &m_index_dict_version,
2988 &m_index_type, &kv_version)) {
2989 sql_print_error("RocksDB: Could not get index information "
2990 "for Index Number (%u,%u), table %s",
2991 gl_index_id.cf_id, gl_index_id.index_id,
2992 tdef->full_tablename().c_str());
2993 return true;
2994 }
2995 if (max_index_id_in_dict < gl_index_id.index_id) {
2996 sql_print_error("RocksDB: Found max index id %u from data dictionary "
2997 "but also found larger index id %u from dictionary. "
2998 "This should never happen and possibly a bug.",
2999 max_index_id_in_dict, gl_index_id.index_id);
3000 return true;
3001 }
3002 if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
3003 sql_print_error("RocksDB: Could not get Column Family Flags "
3004 "for CF Number %d, table %s",
3005 gl_index_id.cf_id, tdef->full_tablename().c_str());
3006 return true;
3007 }
3008
3009 rocksdb::ColumnFamilyHandle *const cfh =
3010 cf_manager->get_cf(gl_index_id.cf_id);
3011 DBUG_ASSERT(cfh != nullptr);
3012
3013 /*
3014 We can't fully initialize Rdb_key_def object here, because full
3015 initialization requires that there is an open TABLE* where we could
3016 look at Field* objects and set max_length and other attributes
3017 */
3018 tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
3019 gl_index_id.index_id, keyno, cfh, m_index_dict_version, m_index_type,
3020 kv_version, flags & Rdb_key_def::REVERSE_CF_FLAG,
3021 flags & Rdb_key_def::AUTO_CF_FLAG, "",
3022 m_dict->get_stats(gl_index_id));
3023 }
3024 put(tdef);
3025 i++;
3026 }
3027
3028 /*
3029 If validate_tables is greater than 0 run the validation. Only fail the
3030 initialzation if the setting is 1. If the setting is 2 we continue.
3031 */
3032 if (validate_tables > 0 && !validate_schemas()) {
3033 if (validate_tables == 1) {
3034 sql_print_error("RocksDB: Problems validating data dictionary "
3035 "against .frm files, exiting");
3036 return true;
3037 }
3038 }
3039
3040 // index ids used by applications should not conflict with
3041 // data dictionary index ids
3042 if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
3043 max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
3044 }
3045
3046 m_sequence.init(max_index_id_in_dict + 1);
3047
3048 if (!it->status().ok()) {
3049 const std::string s = it->status().ToString();
3050 sql_print_error("RocksDB: Table_store: load error: %s", s.c_str());
3051 return true;
3052 }
3053 delete it;
3054 sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
3055 i);
3056 return false;
3057 }
3058
find(const std::string & table_name,const bool & lock)3059 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
3060 const bool &lock) {
3061 if (lock) {
3062 mysql_rwlock_rdlock(&m_rwlock);
3063 }
3064
3065 Rdb_tbl_def *const rec = reinterpret_cast<Rdb_tbl_def *>(my_hash_search(
3066 &m_ddl_hash, reinterpret_cast<const uchar *>(table_name.c_str()),
3067 table_name.size()));
3068
3069 if (lock) {
3070 mysql_rwlock_unlock(&m_rwlock);
3071 }
3072
3073 return rec;
3074 }
3075
3076 // this is a safe version of the find() function below. It acquires a read
3077 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
3078 // are finding it. Copying it into 'ret' increments the count making sure
3079 // that the object will not be discarded until we are finished with it.
3080 std::shared_ptr<const Rdb_key_def>
safe_find(GL_INDEX_ID gl_index_id)3081 Rdb_ddl_manager::safe_find(GL_INDEX_ID gl_index_id) {
3082 std::shared_ptr<const Rdb_key_def> ret(nullptr);
3083
3084 mysql_rwlock_rdlock(&m_rwlock);
3085
3086 auto it = m_index_num_to_keydef.find(gl_index_id);
3087 if (it != m_index_num_to_keydef.end()) {
3088 const auto table_def = find(it->second.first, false);
3089 if (table_def && it->second.second < table_def->m_key_count) {
3090 const auto &kd = table_def->m_key_descr_arr[it->second.second];
3091 if (kd->max_storage_fmt_length() != 0) {
3092 ret = kd;
3093 }
3094 }
3095 }
3096
3097 mysql_rwlock_unlock(&m_rwlock);
3098
3099 return ret;
3100 }
3101
3102 // this method assumes at least read-only lock on m_rwlock
3103 const std::shared_ptr<Rdb_key_def> &
find(GL_INDEX_ID gl_index_id)3104 Rdb_ddl_manager::find(GL_INDEX_ID gl_index_id) {
3105 auto it = m_index_num_to_keydef.find(gl_index_id);
3106 if (it != m_index_num_to_keydef.end()) {
3107 auto table_def = find(it->second.first, false);
3108 if (table_def) {
3109 if (it->second.second < table_def->m_key_count) {
3110 return table_def->m_key_descr_arr[it->second.second];
3111 }
3112 }
3113 }
3114
3115 static std::shared_ptr<Rdb_key_def> empty = nullptr;
3116
3117 return empty;
3118 }
3119
set_stats(const std::unordered_map<GL_INDEX_ID,Rdb_index_stats> & stats)3120 void Rdb_ddl_manager::set_stats(
3121 const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
3122 mysql_rwlock_wrlock(&m_rwlock);
3123 for (auto src : stats) {
3124 const auto &keydef = find(src.second.m_gl_index_id);
3125 if (keydef) {
3126 keydef->m_stats = src.second;
3127 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
3128 }
3129 }
3130 mysql_rwlock_unlock(&m_rwlock);
3131 }
3132
adjust_stats(const std::vector<Rdb_index_stats> & new_data,const std::vector<Rdb_index_stats> & deleted_data)3133 void Rdb_ddl_manager::adjust_stats(
3134 const std::vector<Rdb_index_stats> &new_data,
3135 const std::vector<Rdb_index_stats> &deleted_data) {
3136 mysql_rwlock_wrlock(&m_rwlock);
3137 int i = 0;
3138 for (const auto &data : {new_data, deleted_data}) {
3139 for (const auto &src : data) {
3140 const auto &keydef = find(src.m_gl_index_id);
3141 if (keydef) {
3142 keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
3143 m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
3144 }
3145 }
3146 i++;
3147 }
3148 const bool should_save_stats = !m_stats2store.empty();
3149 mysql_rwlock_unlock(&m_rwlock);
3150 if (should_save_stats) {
3151 // Queue an async persist_stats(false) call to the background thread.
3152 rdb_queue_save_stats_request();
3153 }
3154 }
3155
persist_stats(const bool & sync)3156 void Rdb_ddl_manager::persist_stats(const bool &sync) {
3157 mysql_rwlock_wrlock(&m_rwlock);
3158 const auto local_stats2store = std::move(m_stats2store);
3159 m_stats2store.clear();
3160 mysql_rwlock_unlock(&m_rwlock);
3161
3162 // Persist stats
3163 const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
3164 std::vector<Rdb_index_stats> stats;
3165 std::transform(local_stats2store.begin(), local_stats2store.end(),
3166 std::back_inserter(stats),
3167 [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
3168 return s.second;
3169 });
3170 m_dict->add_stats(wb.get(), stats);
3171 m_dict->commit(wb.get(), sync);
3172 }
3173
3174 /*
3175 Put table definition of `tbl` into the mapping, and also write it to the
3176 on-disk data dictionary.
3177 */
3178
put_and_write(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch)3179 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
3180 rocksdb::WriteBatch *const batch) {
3181 uchar buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
3182 uint pos = 0;
3183
3184 rdb_netbuf_store_index(buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
3185 pos += Rdb_key_def::INDEX_NUMBER_SIZE;
3186
3187 const std::string &dbname_tablename = tbl->full_tablename();
3188 memcpy(buf + pos, dbname_tablename.c_str(), dbname_tablename.size());
3189 pos += dbname_tablename.size();
3190
3191 int res;
3192 if ((res = tbl->put_dict(m_dict, batch, buf, pos))) {
3193 return res;
3194 }
3195 if ((res = put(tbl))) {
3196 return res;
3197 }
3198 return HA_EXIT_SUCCESS;
3199 }
3200
3201 /* Return 0 - ok, other value - error */
3202 /* TODO:
3203 This function modifies m_ddl_hash and m_index_num_to_keydef.
3204 However, these changes need to be reversed if dict_manager.commit fails
3205 See the discussion here: https://reviews.facebook.net/D35925#inline-259167
3206 Tracked by https://github.com/facebook/mysql-5.6/issues/33
3207 */
put(Rdb_tbl_def * const tbl,const bool & lock)3208 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool &lock) {
3209 Rdb_tbl_def *rec;
3210 my_bool result;
3211 const std::string &dbname_tablename = tbl->full_tablename();
3212
3213 if (lock)
3214 mysql_rwlock_wrlock(&m_rwlock);
3215
3216 // We have to do this find because 'tbl' is not yet in the list. We need
3217 // to find the one we are replacing ('rec')
3218 rec = find(dbname_tablename, false);
3219 if (rec) {
3220 // this will free the old record.
3221 my_hash_delete(&m_ddl_hash, reinterpret_cast<uchar *>(rec));
3222 }
3223 result = my_hash_insert(&m_ddl_hash, reinterpret_cast<uchar *>(tbl));
3224
3225 for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
3226 m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
3227 std::make_pair(dbname_tablename, keyno);
3228 }
3229
3230 if (lock)
3231 mysql_rwlock_unlock(&m_rwlock);
3232 return result;
3233 }
3234
remove(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch,const bool & lock)3235 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
3236 rocksdb::WriteBatch *const batch,
3237 const bool &lock) {
3238 if (lock)
3239 mysql_rwlock_wrlock(&m_rwlock);
3240
3241 uchar buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
3242 uint pos = 0;
3243
3244 rdb_netbuf_store_index(buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
3245 pos += Rdb_key_def::INDEX_NUMBER_SIZE;
3246
3247 const std::string &dbname_tablename = tbl->full_tablename();
3248 memcpy(buf + pos, dbname_tablename.c_str(), dbname_tablename.size());
3249 pos += dbname_tablename.size();
3250
3251 const rocksdb::Slice tkey((char *)buf, pos);
3252 m_dict->delete_key(batch, tkey);
3253
3254 /* The following will also delete the object: */
3255 my_hash_delete(&m_ddl_hash, reinterpret_cast<uchar *>(tbl));
3256
3257 if (lock)
3258 mysql_rwlock_unlock(&m_rwlock);
3259 }
3260
rename(const std::string & from,const std::string & to,rocksdb::WriteBatch * const batch)3261 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
3262 rocksdb::WriteBatch *const batch) {
3263 Rdb_tbl_def *rec;
3264 Rdb_tbl_def *new_rec;
3265 bool res = true;
3266 uchar new_buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
3267 uint new_pos = 0;
3268
3269 mysql_rwlock_wrlock(&m_rwlock);
3270 if (!(rec = find(from, false))) {
3271 mysql_rwlock_unlock(&m_rwlock);
3272 return true;
3273 }
3274
3275 new_rec = new Rdb_tbl_def(to);
3276
3277 new_rec->m_key_count = rec->m_key_count;
3278 new_rec->m_auto_incr_val =
3279 rec->m_auto_incr_val.load(std::memory_order_relaxed);
3280 new_rec->m_key_descr_arr = rec->m_key_descr_arr;
3281 // so that it's not free'd when deleting the old rec
3282 rec->m_key_descr_arr = nullptr;
3283
3284 // Create a new key
3285 rdb_netbuf_store_index(new_buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
3286 new_pos += Rdb_key_def::INDEX_NUMBER_SIZE;
3287
3288 const std::string &dbname_tablename = new_rec->full_tablename();
3289 memcpy(new_buf + new_pos, dbname_tablename.c_str(), dbname_tablename.size());
3290 new_pos += dbname_tablename.size();
3291
3292 // Create a key to add
3293 if (!new_rec->put_dict(m_dict, batch, new_buf, new_pos)) {
3294 remove(rec, batch, false);
3295 put(new_rec, false);
3296 res = false; // ok
3297 }
3298
3299 mysql_rwlock_unlock(&m_rwlock);
3300 return res;
3301 }
3302
cleanup()3303 void Rdb_ddl_manager::cleanup() {
3304 my_hash_free(&m_ddl_hash);
3305 mysql_rwlock_destroy(&m_rwlock);
3306 m_sequence.cleanup();
3307 }
3308
scan_for_tables(Rdb_tables_scanner * const tables_scanner)3309 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
3310 int i, ret;
3311 Rdb_tbl_def *rec;
3312
3313 DBUG_ASSERT(tables_scanner != nullptr);
3314
3315 mysql_rwlock_rdlock(&m_rwlock);
3316
3317 ret = 0;
3318 i = 0;
3319
3320 while ((
3321 rec = reinterpret_cast<Rdb_tbl_def *>(my_hash_element(&m_ddl_hash, i)))) {
3322 ret = tables_scanner->add_table(rec);
3323 if (ret)
3324 break;
3325 i++;
3326 }
3327
3328 mysql_rwlock_unlock(&m_rwlock);
3329 return ret;
3330 }
3331
init(rocksdb::DB * const rdb_dict,Rdb_cf_manager * const cf_manager)3332 bool Rdb_dict_manager::init(rocksdb::DB *const rdb_dict,
3333 Rdb_cf_manager *const cf_manager) {
3334 DBUG_ASSERT(rdb_dict != nullptr);
3335 DBUG_ASSERT(cf_manager != nullptr);
3336
3337 mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
3338
3339 m_db = rdb_dict;
3340 bool is_automatic;
3341
3342 m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME, "",
3343 nullptr, &is_automatic);
3344
3345 rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
3346
3347 m_key_slice_max_index_id =
3348 rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
3349 Rdb_key_def::INDEX_NUMBER_SIZE);
3350
3351 resume_drop_indexes();
3352 rollback_ongoing_index_creation();
3353
3354 // If system CF was created then we need to set its flags as well to make
3355 // sure that CF is properly initialized.
3356 if (m_system_cfh != nullptr) {
3357 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
3358 rocksdb::WriteBatch *const batch = wb.get();
3359
3360 add_cf_flags(batch, m_system_cfh->GetID(), 0);
3361 commit(batch);
3362 }
3363
3364 return (m_system_cfh == nullptr);
3365 }
3366
begin() const3367 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
3368 return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
3369 }
3370
put_key(rocksdb::WriteBatchBase * const batch,const rocksdb::Slice & key,const rocksdb::Slice & value) const3371 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
3372 const rocksdb::Slice &key,
3373 const rocksdb::Slice &value) const {
3374 batch->Put(m_system_cfh, key, value);
3375 }
3376
get_value(const rocksdb::Slice & key,std::string * const value) const3377 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
3378 std::string *const value) const {
3379 rocksdb::ReadOptions options;
3380 options.total_order_seek = true;
3381 return m_db->Get(options, m_system_cfh, key, value);
3382 }
3383
delete_key(rocksdb::WriteBatchBase * batch,const rocksdb::Slice & key) const3384 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
3385 const rocksdb::Slice &key) const {
3386 batch->Delete(m_system_cfh, key);
3387 }
3388
new_iterator() const3389 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
3390 /* Reading data dictionary should always skip bloom filter */
3391 rocksdb::ReadOptions read_options;
3392 read_options.total_order_seek = true;
3393 return m_db->NewIterator(read_options, m_system_cfh);
3394 }
3395
commit(rocksdb::WriteBatch * const batch,const bool & sync) const3396 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
3397 const bool &sync) const {
3398 if (!batch)
3399 return HA_EXIT_FAILURE;
3400 int res = 0;
3401 rocksdb::WriteOptions options;
3402 options.sync = sync;
3403 rocksdb::Status s = m_db->Write(options, batch);
3404 res = !s.ok(); // we return true when something failed
3405 if (res) {
3406 rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
3407 }
3408 batch->Clear();
3409 return res;
3410 }
3411
dump_index_id(uchar * const netbuf,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id)3412 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
3413 Rdb_key_def::DATA_DICT_TYPE dict_type,
3414 const GL_INDEX_ID &gl_index_id) {
3415 rdb_netbuf_store_uint32(netbuf, dict_type);
3416 rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
3417 gl_index_id.cf_id);
3418 rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
3419 gl_index_id.index_id);
3420 }
3421
delete_with_prefix(rocksdb::WriteBatch * const batch,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id) const3422 void Rdb_dict_manager::delete_with_prefix(
3423 rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
3424 const GL_INDEX_ID &gl_index_id) const {
3425 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3426 dump_index_id(key_buf, dict_type, gl_index_id);
3427 rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3428
3429 delete_key(batch, key);
3430 }
3431
add_or_update_index_cf_mapping(rocksdb::WriteBatch * batch,const uchar m_index_type,const uint16_t kv_version,const uint32_t index_id,const uint32_t cf_id) const3432 void Rdb_dict_manager::add_or_update_index_cf_mapping(
3433 rocksdb::WriteBatch *batch, const uchar m_index_type,
3434 const uint16_t kv_version, const uint32_t index_id,
3435 const uint32_t cf_id) const {
3436 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3437 uchar value_buf[256] = {0};
3438 GL_INDEX_ID gl_index_id = {cf_id, index_id};
3439 dump_index_id(key_buf, Rdb_key_def::INDEX_INFO, gl_index_id);
3440 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3441
3442 uchar *ptr = value_buf;
3443 rdb_netbuf_store_uint16(ptr, Rdb_key_def::INDEX_INFO_VERSION_LATEST);
3444 ptr += 2;
3445 rdb_netbuf_store_byte(ptr, m_index_type);
3446 ptr += 1;
3447 rdb_netbuf_store_uint16(ptr, kv_version);
3448 ptr += 2;
3449
3450 const rocksdb::Slice value =
3451 rocksdb::Slice((char *)value_buf, ptr - value_buf);
3452 batch->Put(m_system_cfh, key, value);
3453 }
3454
add_cf_flags(rocksdb::WriteBatch * const batch,const uint32_t & cf_id,const uint32_t & cf_flags) const3455 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
3456 const uint32_t &cf_id,
3457 const uint32_t &cf_flags) const {
3458 DBUG_ASSERT(batch != nullptr);
3459
3460 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
3461 uchar value_buf[Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE] =
3462 {0};
3463 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
3464 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
3465 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3466
3467 rdb_netbuf_store_uint16(value_buf, Rdb_key_def::CF_DEFINITION_VERSION);
3468 rdb_netbuf_store_uint32(value_buf + Rdb_key_def::VERSION_SIZE, cf_flags);
3469 const rocksdb::Slice value =
3470 rocksdb::Slice((char *)value_buf, sizeof(value_buf));
3471 batch->Put(m_system_cfh, key, value);
3472 }
3473
delete_index_info(rocksdb::WriteBatch * batch,const GL_INDEX_ID & gl_index_id) const3474 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
3475 const GL_INDEX_ID &gl_index_id) const {
3476 delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
3477 }
3478
get_index_info(const GL_INDEX_ID & gl_index_id,uint16_t * m_index_dict_version,uchar * m_index_type,uint16_t * kv_version) const3479 bool Rdb_dict_manager::get_index_info(const GL_INDEX_ID &gl_index_id,
3480 uint16_t *m_index_dict_version,
3481 uchar *m_index_type,
3482 uint16_t *kv_version) const {
3483 bool found = false;
3484 bool error = false;
3485 std::string value;
3486 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3487 dump_index_id(key_buf, Rdb_key_def::INDEX_INFO, gl_index_id);
3488 const rocksdb::Slice &key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3489
3490 const rocksdb::Status &status = get_value(key, &value);
3491 if (status.ok()) {
3492 const uchar *const val = (const uchar *)value.c_str();
3493 const uchar *ptr = val;
3494 *m_index_dict_version = rdb_netbuf_to_uint16(val);
3495 *kv_version = 0;
3496 *m_index_type = 0;
3497 ptr += 2;
3498 switch (*m_index_dict_version) {
3499
3500 case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
3501 case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
3502 *m_index_type = rdb_netbuf_to_byte(ptr);
3503 ptr += 1;
3504 *kv_version = rdb_netbuf_to_uint16(ptr);
3505 found = true;
3506 break;
3507
3508 default:
3509 error = true;
3510 break;
3511 }
3512
3513 switch (*m_index_type) {
3514 case Rdb_key_def::INDEX_TYPE_PRIMARY:
3515 case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
3516 error = *kv_version > Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
3517 break;
3518 }
3519 case Rdb_key_def::INDEX_TYPE_SECONDARY:
3520 error = *kv_version > Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
3521 break;
3522 default:
3523 error = true;
3524 break;
3525 }
3526 }
3527
3528 if (error) {
3529 // NO_LINT_DEBUG
3530 sql_print_error("RocksDB: Found invalid key version number (%u, %u, %u) "
3531 "from data dictionary. This should never happen "
3532 "and it may be a bug.",
3533 *m_index_dict_version, *m_index_type, *kv_version);
3534 abort_with_stack_traces();
3535 }
3536
3537 return found;
3538 }
3539
get_cf_flags(const uint32_t & cf_id,uint32_t * const cf_flags) const3540 bool Rdb_dict_manager::get_cf_flags(const uint32_t &cf_id,
3541 uint32_t *const cf_flags) const {
3542 DBUG_ASSERT(cf_flags != nullptr);
3543
3544 bool found = false;
3545 std::string value;
3546 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
3547
3548 rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
3549 rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
3550
3551 const rocksdb::Slice key =
3552 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
3553 const rocksdb::Status status = get_value(key, &value);
3554
3555 if (status.ok()) {
3556 const uchar *val = (const uchar *)value.c_str();
3557 DBUG_ASSERT(val);
3558
3559 const uint16_t version = rdb_netbuf_to_uint16(val);
3560
3561 if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
3562 *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
3563 found = true;
3564 }
3565 }
3566
3567 return found;
3568 }
3569
3570 /*
3571 Returning index ids that were marked as deleted (via DROP TABLE) but
3572 still not removed by drop_index_thread yet, or indexes that are marked as
3573 ongoing creation.
3574 */
get_ongoing_index_operation(std::unordered_set<GL_INDEX_ID> * gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const3575 void Rdb_dict_manager::get_ongoing_index_operation(
3576 std::unordered_set<GL_INDEX_ID> *gl_index_ids,
3577 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3578 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3579 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3580
3581 uchar index_buf[Rdb_key_def::INDEX_NUMBER_SIZE];
3582 rdb_netbuf_store_uint32(index_buf, dd_type);
3583 const rocksdb::Slice index_slice(reinterpret_cast<char *>(index_buf),
3584 Rdb_key_def::INDEX_NUMBER_SIZE);
3585
3586 rocksdb::Iterator *it = new_iterator();
3587 for (it->Seek(index_slice); it->Valid(); it->Next()) {
3588 rocksdb::Slice key = it->key();
3589 const uchar *const ptr = (const uchar *)key.data();
3590
3591 /*
3592 Ongoing drop/create index operations require key to be of the form:
3593 dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
3594
3595 This may need to be changed in the future if we want to process a new
3596 ddl_type with different format.
3597 */
3598 if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
3599 rdb_netbuf_to_uint32(ptr) != dd_type) {
3600 break;
3601 }
3602
3603 // We don't check version right now since currently we always store only
3604 // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
3605 // If increasing version number, we need to add version check logic here.
3606 GL_INDEX_ID gl_index_id;
3607 gl_index_id.cf_id =
3608 rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
3609 gl_index_id.index_id =
3610 rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
3611 gl_index_ids->insert(gl_index_id);
3612 }
3613 delete it;
3614 }
3615
3616 /*
3617 Returning true if index_id is create/delete ongoing (undergoing creation or
3618 marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
3619 or not.
3620 */
is_index_operation_ongoing(const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const3621 bool Rdb_dict_manager::is_index_operation_ongoing(
3622 const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3623 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3624 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3625
3626 bool found = false;
3627 std::string value;
3628 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3629 dump_index_id(key_buf, dd_type, gl_index_id);
3630 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3631
3632 const rocksdb::Status status = get_value(key, &value);
3633 if (status.ok()) {
3634 found = true;
3635 }
3636 return found;
3637 }
3638
3639 /*
3640 Adding index_id to data dictionary so that the index id is removed
3641 by drop_index_thread, or to track online index creation.
3642 */
start_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const3643 void Rdb_dict_manager::start_ongoing_index_operation(
3644 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
3645 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3646 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3647 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3648
3649 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3650 uchar value_buf[Rdb_key_def::VERSION_SIZE] = {0};
3651 dump_index_id(key_buf, dd_type, gl_index_id);
3652
3653 // version as needed
3654 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
3655 rdb_netbuf_store_uint16(value_buf,
3656 Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
3657 } else {
3658 rdb_netbuf_store_uint16(value_buf,
3659 Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
3660 }
3661
3662 const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3663 const rocksdb::Slice value =
3664 rocksdb::Slice((char *)value_buf, sizeof(value_buf));
3665 batch->Put(m_system_cfh, key, value);
3666 }
3667
3668 /*
3669 Removing index_id from data dictionary to confirm drop_index_thread
3670 completed dropping entire key/values of the index_id
3671 */
end_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const3672 void Rdb_dict_manager::end_ongoing_index_operation(
3673 rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
3674 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3675 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3676 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3677
3678 delete_with_prefix(batch, dd_type, gl_index_id);
3679 }
3680
3681 /*
3682 Returning true if there is no target index ids to be removed
3683 by drop_index_thread
3684 */
is_drop_index_empty() const3685 bool Rdb_dict_manager::is_drop_index_empty() const {
3686 std::unordered_set<GL_INDEX_ID> gl_index_ids;
3687 get_ongoing_drop_indexes(&gl_index_ids);
3688 return gl_index_ids.empty();
3689 }
3690
3691 /*
3692 This function is supposed to be called by DROP TABLE. Logging messages
3693 that dropping indexes started, and adding data dictionary so that
3694 all associated indexes to be removed
3695 */
add_drop_table(std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 & n_keys,rocksdb::WriteBatch * const batch) const3696 void Rdb_dict_manager::add_drop_table(
3697 std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 &n_keys,
3698 rocksdb::WriteBatch *const batch) const {
3699 std::unordered_set<GL_INDEX_ID> dropped_index_ids;
3700 for (uint32 i = 0; i < n_keys; i++) {
3701 dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
3702 }
3703
3704 add_drop_index(dropped_index_ids, batch);
3705 }
3706
3707 /*
3708 Called during inplace index drop operations. Logging messages
3709 that dropping indexes started, and adding data dictionary so that
3710 all associated indexes to be removed
3711 */
add_drop_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const3712 void Rdb_dict_manager::add_drop_index(
3713 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
3714 rocksdb::WriteBatch *const batch) const {
3715 for (const auto &gl_index_id : gl_index_ids) {
3716 log_start_drop_index(gl_index_id, "Begin");
3717 start_drop_index(batch, gl_index_id);
3718 }
3719 }
3720
3721 /*
3722 Called during inplace index creation operations. Logging messages
3723 that adding indexes started, and updates data dictionary with all associated
3724 indexes to be added.
3725 */
add_create_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const3726 void Rdb_dict_manager::add_create_index(
3727 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
3728 rocksdb::WriteBatch *const batch) const {
3729 for (const auto &gl_index_id : gl_index_ids) {
3730 // NO_LINT_DEBUG
3731 sql_print_information("RocksDB: Begin index creation (%u,%u)",
3732 gl_index_id.cf_id, gl_index_id.index_id);
3733 start_create_index(batch, gl_index_id);
3734 }
3735 }
3736
3737 /*
3738 This function is supposed to be called by drop_index_thread, when it
3739 finished dropping any index, or at the completion of online index creation.
3740 */
finish_indexes_operation(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const3741 void Rdb_dict_manager::finish_indexes_operation(
3742 const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
3743 Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3744 DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3745 dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3746
3747 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
3748 rocksdb::WriteBatch *const batch = wb.get();
3749
3750 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
3751 get_ongoing_create_indexes(&incomplete_create_indexes);
3752
3753 for (const auto &gl_index_id : gl_index_ids) {
3754 if (is_index_operation_ongoing(gl_index_id, dd_type)) {
3755 // NO_LINT_DEBUG
3756 sql_print_information("RocksDB: Finished %s (%u,%u)",
3757 dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING
3758 ? "filtering dropped index"
3759 : "index creation",
3760 gl_index_id.cf_id, gl_index_id.index_id);
3761
3762 end_ongoing_index_operation(batch, gl_index_id, dd_type);
3763
3764 /*
3765 Remove the corresponding incomplete create indexes from data
3766 dictionary as well
3767 */
3768 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
3769 if (incomplete_create_indexes.count(gl_index_id)) {
3770 end_ongoing_index_operation(batch, gl_index_id,
3771 Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3772 }
3773 }
3774 }
3775
3776 if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
3777 delete_index_info(batch, gl_index_id);
3778 }
3779 }
3780 commit(batch);
3781 }
3782
3783 /*
3784 This function is supposed to be called when initializing
3785 Rdb_dict_manager (at startup). If there is any index ids that are
3786 drop ongoing, printing out messages for diagnostics purposes.
3787 */
resume_drop_indexes() const3788 void Rdb_dict_manager::resume_drop_indexes() const {
3789 std::unordered_set<GL_INDEX_ID> gl_index_ids;
3790 get_ongoing_drop_indexes(&gl_index_ids);
3791
3792 uint max_index_id_in_dict = 0;
3793 get_max_index_id(&max_index_id_in_dict);
3794
3795 for (const auto &gl_index_id : gl_index_ids) {
3796 log_start_drop_index(gl_index_id, "Resume");
3797 if (max_index_id_in_dict < gl_index_id.index_id) {
3798 sql_print_error("RocksDB: Found max index id %u from data dictionary "
3799 "but also found dropped index id (%u,%u) from drop_index "
3800 "dictionary. This should never happen and is possibly a "
3801 "bug.",
3802 max_index_id_in_dict, gl_index_id.cf_id,
3803 gl_index_id.index_id);
3804 abort_with_stack_traces();
3805 }
3806 }
3807 }
3808
rollback_ongoing_index_creation() const3809 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
3810 const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
3811 rocksdb::WriteBatch *const batch = wb.get();
3812
3813 std::unordered_set<GL_INDEX_ID> gl_index_ids;
3814 get_ongoing_create_indexes(&gl_index_ids);
3815
3816 for (const auto &gl_index_id : gl_index_ids) {
3817 // NO_LINT_DEBUG
3818 sql_print_information("RocksDB: Removing incomplete create index (%u,%u)",
3819 gl_index_id.cf_id, gl_index_id.index_id);
3820
3821 start_drop_index(batch, gl_index_id);
3822 }
3823
3824 commit(batch);
3825 }
3826
log_start_drop_table(const std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 & n_keys,const char * const log_action) const3827 void Rdb_dict_manager::log_start_drop_table(
3828 const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 &n_keys,
3829 const char *const log_action) const {
3830 for (uint32 i = 0; i < n_keys; i++) {
3831 log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
3832 }
3833 }
3834
log_start_drop_index(GL_INDEX_ID gl_index_id,const char * log_action) const3835 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
3836 const char *log_action) const {
3837 uint16 m_index_dict_version = 0;
3838 uchar m_index_type = 0;
3839 uint16 kv_version = 0;
3840
3841 if (!get_index_info(gl_index_id, &m_index_dict_version, &m_index_type,
3842 &kv_version)) {
3843 /*
3844 If we don't find the index info, it could be that it's because it was a
3845 partially created index that isn't in the data dictionary yet that needs
3846 to be rolled back.
3847 */
3848 std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
3849 get_ongoing_create_indexes(&incomplete_create_indexes);
3850
3851 if (!incomplete_create_indexes.count(gl_index_id)) {
3852 /* If it's not a partially created index, something is very wrong. */
3853 sql_print_error("RocksDB: Failed to get column family info "
3854 "from index id (%u,%u). MyRocks data dictionary may "
3855 "get corrupted.",
3856 gl_index_id.cf_id, gl_index_id.index_id);
3857 abort_with_stack_traces();
3858 }
3859 }
3860 sql_print_information("RocksDB: %s filtering dropped index (%u,%u)",
3861 log_action, gl_index_id.cf_id, gl_index_id.index_id);
3862 }
3863
get_max_index_id(uint32_t * const index_id) const3864 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
3865 bool found = false;
3866 std::string value;
3867
3868 const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
3869 if (status.ok()) {
3870 const uchar *const val = (const uchar *)value.c_str();
3871 const uint16_t &version = rdb_netbuf_to_uint16(val);
3872 if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
3873 *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
3874 found = true;
3875 }
3876 }
3877 return found;
3878 }
3879
update_max_index_id(rocksdb::WriteBatch * const batch,const uint32_t & index_id) const3880 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
3881 const uint32_t &index_id) const {
3882 DBUG_ASSERT(batch != nullptr);
3883
3884 uint32_t old_index_id = -1;
3885 if (get_max_index_id(&old_index_id)) {
3886 if (old_index_id > index_id) {
3887 sql_print_error("RocksDB: Found max index id %u from data dictionary "
3888 "but trying to update to older value %u. This should "
3889 "never happen and possibly a bug.",
3890 old_index_id, index_id);
3891 return true;
3892 }
3893 }
3894
3895 uchar value_buf[Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE] =
3896 {0};
3897 rdb_netbuf_store_uint16(value_buf, Rdb_key_def::MAX_INDEX_ID_VERSION);
3898 rdb_netbuf_store_uint32(value_buf + Rdb_key_def::VERSION_SIZE, index_id);
3899 const rocksdb::Slice value =
3900 rocksdb::Slice((char *)value_buf, sizeof(value_buf));
3901 batch->Put(m_system_cfh, m_key_slice_max_index_id, value);
3902 return false;
3903 }
3904
add_stats(rocksdb::WriteBatch * const batch,const std::vector<Rdb_index_stats> & stats) const3905 void Rdb_dict_manager::add_stats(
3906 rocksdb::WriteBatch *const batch,
3907 const std::vector<Rdb_index_stats> &stats) const {
3908 DBUG_ASSERT(batch != nullptr);
3909
3910 for (const auto &it : stats) {
3911 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3912 dump_index_id(key_buf, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
3913
3914 // IndexStats::materialize takes complete care of serialization including
3915 // storing the version
3916 const auto value =
3917 Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it}, 1.);
3918
3919 batch->Put(m_system_cfh, rocksdb::Slice((char *)key_buf, sizeof(key_buf)),
3920 value);
3921 }
3922 }
3923
get_stats(GL_INDEX_ID gl_index_id) const3924 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
3925 uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3926 dump_index_id(key_buf, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
3927
3928 std::string value;
3929 const rocksdb::Status status = get_value(
3930 rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf)),
3931 &value);
3932 if (status.ok()) {
3933 std::vector<Rdb_index_stats> v;
3934 // unmaterialize checks if the version matches
3935 if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
3936 return v[0];
3937 }
3938 }
3939
3940 return Rdb_index_stats();
3941 }
3942
get_and_update_next_number(Rdb_dict_manager * const dict)3943 uint Rdb_seq_generator::get_and_update_next_number(
3944 Rdb_dict_manager *const dict) {
3945 DBUG_ASSERT(dict != nullptr);
3946
3947 uint res;
3948 mysql_mutex_lock(&m_mutex);
3949
3950 res = m_next_number++;
3951
3952 const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
3953 rocksdb::WriteBatch *const batch = wb.get();
3954
3955 DBUG_ASSERT(batch != nullptr);
3956 dict->update_max_index_id(batch, res);
3957 dict->commit(batch);
3958
3959 mysql_mutex_unlock(&m_mutex);
3960
3961 return res;
3962 }
3963
3964 } // namespace myrocks
3965