1 /*
2    Copyright (c) 2012,2013 Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16 
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation // gcc: Class implementation
19 #endif
20 
21 /* This C++ file's header file */
22 #include "./rdb_datadic.h"
23 
24 /* C++ standard header files */
25 #include <algorithm>
26 #include <array>
27 #include <limits>
28 #include <map>
29 #include <set>
30 #include <utility>
31 #include <vector>
32 
33 /* MySQL header files */
34 #include "./key.h"
35 #include "./m_ctype.h"
36 #include "./my_bit.h"
37 
38 /* MyRocks header files */
39 #include "./ha_rocksdb_proto.h"
40 #include "./my_stacktrace.h"
41 #include "./rdb_cf_manager.h"
42 #include "./rdb_utils.h"
43 
44 namespace myrocks {
45 
46 void get_mem_comparable_space(const CHARSET_INFO *cs,
47                               const std::vector<uchar> **xfrm, size_t *xfrm_len,
48                               size_t *mb_len);
49 
50 /*
51   Rdb_key_def class implementation
52 */
53 
Rdb_key_def(uint indexnr_arg,uint keyno_arg,rocksdb::ColumnFamilyHandle * cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_auto_cf_arg,const char * _name,Rdb_index_stats _stats)54 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
55                          rocksdb::ColumnFamilyHandle *cf_handle_arg,
56                          uint16_t index_dict_version_arg, uchar index_type_arg,
57                          uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
58                          bool is_auto_cf_arg, const char *_name,
59                          Rdb_index_stats _stats)
60     : m_index_number(indexnr_arg), m_cf_handle(cf_handle_arg),
61       m_index_dict_version(index_dict_version_arg),
62       m_index_type(index_type_arg), m_kv_format_version(kv_format_version_arg),
63       m_is_reverse_cf(is_reverse_cf_arg), m_is_auto_cf(is_auto_cf_arg),
64       m_name(_name), m_stats(_stats), m_pk_part_no(nullptr),
65       m_pack_info(nullptr), m_keyno(keyno_arg), m_key_parts(0),
66       m_prefix_extractor(nullptr), m_maxlength(0) // means 'not intialized'
67 {
68   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
69   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
70   DBUG_ASSERT(m_cf_handle != nullptr);
71 }
72 
Rdb_key_def(const Rdb_key_def & k)73 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
74     : m_index_number(k.m_index_number), m_cf_handle(k.m_cf_handle),
75       m_is_reverse_cf(k.m_is_reverse_cf), m_is_auto_cf(k.m_is_auto_cf),
76       m_name(k.m_name), m_stats(k.m_stats), m_pk_part_no(k.m_pk_part_no),
77       m_pack_info(k.m_pack_info), m_keyno(k.m_keyno),
78       m_key_parts(k.m_key_parts), m_prefix_extractor(k.m_prefix_extractor),
79       m_maxlength(k.m_maxlength) {
80   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
81   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
82   if (k.m_pack_info) {
83     const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
84     m_pack_info =
85         reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
86     memcpy(m_pack_info, k.m_pack_info, size);
87   }
88 
89   if (k.m_pk_part_no) {
90     const size_t size = sizeof(uint) * m_key_parts;
91     m_pk_part_no = reinterpret_cast<uint *>(my_malloc(size, MYF(0)));
92     memcpy(m_pk_part_no, k.m_pk_part_no, size);
93   }
94 }
95 
~Rdb_key_def()96 Rdb_key_def::~Rdb_key_def() {
97   mysql_mutex_destroy(&m_mutex);
98 
99   my_free(m_pk_part_no);
100   m_pk_part_no = nullptr;
101 
102   my_free(m_pack_info);
103   m_pack_info = nullptr;
104 }
105 
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)106 void Rdb_key_def::setup(const TABLE *const tbl,
107                         const Rdb_tbl_def *const tbl_def) {
108   DBUG_ASSERT(tbl != nullptr);
109   DBUG_ASSERT(tbl_def != nullptr);
110 
111   /*
112     Set max_length based on the table.  This can be called concurrently from
113     multiple threads, so there is a mutex to protect this code.
114   */
115   const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
116   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
117   const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
118   if (!m_maxlength) {
119     mysql_mutex_lock(&m_mutex);
120     if (m_maxlength != 0) {
121       mysql_mutex_unlock(&m_mutex);
122       return;
123     }
124 
125     KEY *key_info = nullptr;
126     KEY *pk_info = nullptr;
127     if (!is_hidden_pk) {
128       key_info = &tbl->key_info[m_keyno];
129       if (!hidden_pk_exists)
130         pk_info = &tbl->key_info[tbl->s->primary_key];
131       m_name = std::string(key_info->name);
132     } else {
133       m_name = HIDDEN_PK_NAME;
134     }
135 
136     if (secondary_key)
137       m_pk_key_parts = hidden_pk_exists ? 1 : pk_info->actual_key_parts;
138     else {
139       pk_info = nullptr;
140       m_pk_key_parts = 0;
141     }
142 
143     // "unique" secondary keys support:
144     m_key_parts = is_hidden_pk ? 1 : key_info->actual_key_parts;
145 
146     if (secondary_key) {
147       /*
148         In most cases, SQL layer puts PK columns as invisible suffix at the
149         end of secondary key. There are cases where this doesn't happen:
150         - unique secondary indexes.
151         - partitioned tables.
152 
153         Internally, we always need PK columns as suffix (and InnoDB does,
154         too, if you were wondering).
155 
156         The loop below will attempt to put all PK columns at the end of key
157         definition.  Columns that are already included in the index (either
158         by the user or by "extended keys" feature) are not included for the
159         second time.
160       */
161       m_key_parts += m_pk_key_parts;
162     }
163 
164     if (secondary_key)
165       m_pk_part_no = reinterpret_cast<uint *>(
166           my_malloc(sizeof(uint) * m_key_parts, MYF(0)));
167     else
168       m_pk_part_no = nullptr;
169 
170     const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
171     m_pack_info =
172         reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
173 
174     size_t max_len = INDEX_NUMBER_SIZE;
175     int unpack_len = 0;
176     int max_part_len = 0;
177     bool simulating_extkey = false;
178     uint dst_i = 0;
179 
180     uint keyno_to_set = m_keyno;
181     uint keypart_to_set = 0;
182 
183     if (is_hidden_pk) {
184       Field *field = nullptr;
185       m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
186       m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
187       max_len += m_pack_info[dst_i].m_max_image_len;
188       max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
189       dst_i++;
190     } else {
191       KEY_PART_INFO *key_part = key_info->key_part;
192 
193       /* this loop also loops over the 'extended key' tail */
194       for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
195         Field *const field = key_part ? key_part->field : nullptr;
196 
197         if (simulating_extkey && !hidden_pk_exists) {
198           DBUG_ASSERT(secondary_key);
199           /* Check if this field is already present in the key definition */
200           bool found = false;
201           for (uint j = 0; j < key_info->actual_key_parts; j++) {
202             if (field->field_index ==
203                     key_info->key_part[j].field->field_index &&
204                 key_part->length == key_info->key_part[j].length) {
205               found = true;
206               break;
207             }
208           }
209 
210           if (found) {
211             key_part++;
212             continue;
213           }
214         }
215 
216         if (field && field->real_maybe_null())
217           max_len += 1; // NULL-byte
218 
219         m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
220                                  key_part ? key_part->length : 0);
221         m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
222 
223         if (pk_info) {
224           m_pk_part_no[dst_i] = -1;
225           for (uint j = 0; j < m_pk_key_parts; j++) {
226             if (field->field_index == pk_info->key_part[j].field->field_index) {
227               m_pk_part_no[dst_i] = j;
228               break;
229             }
230           }
231         } else if (secondary_key && hidden_pk_exists) {
232           /*
233             The hidden pk can never be part of the sk.  So it is always
234             appended to the end of the sk.
235           */
236           m_pk_part_no[dst_i] = -1;
237           if (simulating_extkey)
238             m_pk_part_no[dst_i] = 0;
239         }
240 
241         max_len += m_pack_info[dst_i].m_max_image_len;
242 
243         max_part_len =
244             std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
245 
246         key_part++;
247         /*
248           For "unique" secondary indexes, pretend they have
249           "index extensions"
250          */
251         if (secondary_key && src_i + 1 == key_info->actual_key_parts) {
252           simulating_extkey = true;
253           if (!hidden_pk_exists) {
254             keyno_to_set = tbl->s->primary_key;
255             key_part = pk_info->key_part;
256             keypart_to_set = (uint)-1;
257           } else {
258             keyno_to_set = tbl_def->m_key_count - 1;
259             key_part = nullptr;
260             keypart_to_set = 0;
261           }
262         }
263 
264         dst_i++;
265       }
266     }
267 
268     m_key_parts = dst_i;
269 
270     /* Initialize the memory needed by the stats structure */
271     m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
272 
273     /* Cache prefix extractor for bloom filter usage later */
274     rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
275     m_prefix_extractor = opt.prefix_extractor;
276 
277     /*
278       This should be the last member variable set before releasing the mutex
279       so that other threads can't see the object partially set up.
280      */
281     m_maxlength = max_len;
282 
283     mysql_mutex_unlock(&m_mutex);
284   }
285 }
286 
287 /**
288   Read a memcmp key part from a slice using the passed in reader.
289 
290   Returns -1 if field was null, 1 if error, 0 otherwise.
291 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const292 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
293                                       Rdb_string_reader *reader,
294                                       const uint part_num) const {
295   /* It is impossible to unpack the column. Skip it. */
296   if (m_pack_info[part_num].m_maybe_null) {
297     const char *nullp;
298     if (!(nullp = reader->read(1)))
299       return 1;
300     if (*nullp == 0) {
301       /* This is a NULL value */
302       return -1;
303     } else {
304       /* If NULL marker is not '0', it can be only '1'  */
305       if (*nullp != 1)
306         return 1;
307     }
308   }
309 
310   Rdb_field_packing *fpi = &m_pack_info[part_num];
311   DBUG_ASSERT(table_arg->s != nullptr);
312 
313   bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
314                            (table_arg->s->primary_key == MAX_INDEXES);
315   Field *field = nullptr;
316   if (!is_hidden_pk_part)
317     field = fpi->get_field_in_table(table_arg);
318   if (fpi->m_skip_func(fpi, field, reader))
319     return 1;
320 
321   return 0;
322 }
323 
324 /**
325   Get a mem-comparable form of Primary Key from mem-comparable form of this key
326 
327   @param
328     pk_descr        Primary Key descriptor
329     key             Index tuple from this key in mem-comparable form
330     pk_buffer  OUT  Put here mem-comparable form of the Primary Key.
331 
332   @note
333     It may or may not be possible to restore primary key columns to their
334     mem-comparable form.  To handle all cases, this function copies mem-
335     comparable forms directly.
336 
337     RocksDB SE supports "Extended keys". This means that PK columns are present
338     at the end of every key.  If the key already includes PK columns, then
339     these columns are not present at the end of the key.
340 
341     Because of the above, we copy each primary key column.
342 
343   @todo
344     If we checked crc32 checksums in this function, we would catch some CRC
345     violations that we currently don't. On the other hand, there is a broader
346     set of queries for which we would check the checksum twice.
347 */
348 
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const349 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
350                                         const Rdb_key_def &pk_descr,
351                                         const rocksdb::Slice *const key,
352                                         uchar *const pk_buffer) const {
353   DBUG_ASSERT(table != nullptr);
354   DBUG_ASSERT(key != nullptr);
355   DBUG_ASSERT(pk_buffer);
356 
357   uint size = 0;
358   uchar *buf = pk_buffer;
359   DBUG_ASSERT(m_pk_key_parts);
360 
361   /* Put the PK number */
362   rdb_netbuf_store_index(buf, pk_descr.m_index_number);
363   buf += INDEX_NUMBER_SIZE;
364   size += INDEX_NUMBER_SIZE;
365 
366   const char *start_offs[MAX_REF_PARTS];
367   const char *end_offs[MAX_REF_PARTS];
368   int pk_key_part;
369   uint i;
370   Rdb_string_reader reader(key);
371 
372   // Skip the index number
373   if ((!reader.read(INDEX_NUMBER_SIZE)))
374     return RDB_INVALID_KEY_LEN;
375 
376   for (i = 0; i < m_key_parts; i++) {
377     if ((pk_key_part = m_pk_part_no[i]) != -1) {
378       start_offs[pk_key_part] = reader.get_current_ptr();
379     }
380 
381     if (read_memcmp_key_part(table, &reader, i) > 0) {
382       return RDB_INVALID_KEY_LEN;
383     }
384 
385     if (pk_key_part != -1) {
386       end_offs[pk_key_part] = reader.get_current_ptr();
387     }
388   }
389 
390   for (i = 0; i < m_pk_key_parts; i++) {
391     const uint part_size = end_offs[i] - start_offs[i];
392     memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
393     buf += part_size;
394     size += part_size;
395   }
396 
397   return size;
398 }
399 
400 /**
401   Get a mem-comparable form of Secondary Key from mem-comparable form of this
402   key, without the extended primary key tail.
403 
404   @param
405     key                Index tuple from this key in mem-comparable form
406     sk_buffer     OUT  Put here mem-comparable form of the Secondary Key.
407     n_null_fields OUT  Put number of null fields contained within sk entry
408 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const409 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
410                                       const rocksdb::Slice &key,
411                                       uchar *sk_buffer,
412                                       uint *n_null_fields) const {
413   DBUG_ASSERT(table != nullptr);
414   DBUG_ASSERT(sk_buffer != nullptr);
415   DBUG_ASSERT(n_null_fields != nullptr);
416   DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
417 
418   uchar *buf = sk_buffer;
419 
420   int res;
421   Rdb_string_reader reader(&key);
422   const char *start = reader.get_current_ptr();
423 
424   // Skip the index number
425   if ((!reader.read(INDEX_NUMBER_SIZE)))
426     return RDB_INVALID_KEY_LEN;
427 
428   for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
429     if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
430       return RDB_INVALID_KEY_LEN;
431     } else if (res == -1) {
432       (*n_null_fields)++;
433     }
434   }
435 
436   uint sk_memcmp_len = reader.get_current_ptr() - start;
437   memcpy(buf, start, sk_memcmp_len);
438   return sk_memcmp_len;
439 }
440 
441 /**
442   Convert index tuple into storage (i.e. mem-comparable) format
443 
444   @detail
445     Currently this is done by unpacking into table->record[0] and then
446     packing index columns into storage format.
447 
448   @param pack_buffer Temporary area for packing varchar columns. Its
449                      size is at least max_storage_fmt_length() bytes.
450 */
451 
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,const uchar * const key_tuple,const key_part_map & keypart_map) const452 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
453                                    uchar *const packed_tuple,
454                                    const uchar *const key_tuple,
455                                    const key_part_map &keypart_map) const {
456   DBUG_ASSERT(tbl != nullptr);
457   DBUG_ASSERT(pack_buffer != nullptr);
458   DBUG_ASSERT(packed_tuple != nullptr);
459   DBUG_ASSERT(key_tuple != nullptr);
460 
461   /* We were given a record in KeyTupleFormat. First, save it to record */
462   const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
463   key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
464 
465   uint n_used_parts = my_count_bits(keypart_map);
466   if (keypart_map == HA_WHOLE_KEY)
467     n_used_parts = 0; // Full key is used
468 
469   /* Then, convert the record into a mem-comparable form */
470   return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
471                      false, 0, n_used_parts);
472 }
473 
474 /**
475   @brief
476     Check if "unpack info" data includes checksum.
477 
478   @detail
479     This is used only by CHECK TABLE to count the number of rows that have
480     checksums.
481 */
482 
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)483 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
484   const uchar *ptr = (const uchar *)unpack_info.data();
485   size_t size = unpack_info.size();
486 
487   // Skip unpack info if present.
488   if (size >= RDB_UNPACK_HEADER_SIZE && ptr[0] == RDB_UNPACK_DATA_TAG) {
489     const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
490     SHIP_ASSERT(size >= skip_len);
491 
492     size -= skip_len;
493     ptr += skip_len;
494   }
495 
496   return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
497 }
498 
499 /*
500   @return Number of bytes that were changed
501 */
successor(uchar * const packed_tuple,const uint & len)502 int Rdb_key_def::successor(uchar *const packed_tuple, const uint &len) {
503   DBUG_ASSERT(packed_tuple != nullptr);
504 
505   int changed = 0;
506   uchar *p = packed_tuple + len - 1;
507   for (; p > packed_tuple; p--) {
508     changed++;
509     if (*p != uchar(0xFF)) {
510       *p = *p + 1;
511       break;
512     }
513     *p = '\0';
514   }
515   return changed;
516 }
517 
518 /**
519   Get index columns from the record and pack them into mem-comparable form.
520 
521   @param
522     tbl                   Table we're working on
523     record           IN   Record buffer with fields in table->record format
524     pack_buffer      IN   Temporary area for packing varchars. The size is
525                           at least max_storage_fmt_length() bytes.
526     packed_tuple     OUT  Key in the mem-comparable form
527     unpack_info      OUT  Unpack data
528     unpack_info_len  OUT  Unpack data length
529     n_key_parts           Number of keyparts to process. 0 means all of them.
530     n_null_fields    OUT  Number of key fields with NULL value.
531 
532   @detail
533     Some callers do not need the unpack information, they can pass
534     unpack_info=nullptr, unpack_info_len=nullptr.
535 
536   @return
537     Length of the packed tuple
538 */
539 
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool & should_store_row_debug_checksums,const longlong & hidden_pk_id,uint n_key_parts,uint * const n_null_fields) const540 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
541                               const uchar *const record,
542                               uchar *const packed_tuple,
543                               Rdb_string_writer *const unpack_info,
544                               const bool &should_store_row_debug_checksums,
545                               const longlong &hidden_pk_id, uint n_key_parts,
546                               uint *const n_null_fields) const {
547   DBUG_ASSERT(tbl != nullptr);
548   DBUG_ASSERT(pack_buffer != nullptr);
549   DBUG_ASSERT(record != nullptr);
550   DBUG_ASSERT(packed_tuple != nullptr);
551   // Checksums for PKs are made when record is packed.
552   // We should never attempt to make checksum just from PK values
553   DBUG_ASSERT_IMP(should_store_row_debug_checksums,
554                   (m_index_type == INDEX_TYPE_SECONDARY));
555 
556   uchar *tuple = packed_tuple;
557   size_t unpack_len_pos = size_t(-1);
558   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
559 
560   rdb_netbuf_store_index(tuple, m_index_number);
561   tuple += INDEX_NUMBER_SIZE;
562 
563   // If n_key_parts is 0, it means all columns.
564   // The following includes the 'extended key' tail.
565   // The 'extended key' includes primary key. This is done to 'uniqify'
566   // non-unique indexes
567   const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
568 
569   // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
570   // hidden key part.  So we skip it (its always 1 part).
571   if (hidden_pk_exists && !hidden_pk_id && use_all_columns)
572     n_key_parts = m_key_parts - 1;
573   else if (use_all_columns)
574     n_key_parts = m_key_parts;
575 
576   if (n_null_fields)
577     *n_null_fields = 0;
578 
579   if (unpack_info) {
580     unpack_info->clear();
581     unpack_info->write_uint8(RDB_UNPACK_DATA_TAG);
582     unpack_len_pos = unpack_info->get_current_pos();
583     // we don't know the total length yet, so write a zero
584     unpack_info->write_uint16(0);
585   }
586 
587   for (uint i = 0; i < n_key_parts; i++) {
588     // Fill hidden pk id into the last key part for secondary keys for tables
589     // with no pk
590     if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
591       m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
592       break;
593     }
594 
595     Field *const field = m_pack_info[i].get_field_in_table(tbl);
596     DBUG_ASSERT(field != nullptr);
597 
598     // Old Field methods expected the record pointer to be at tbl->record[0].
599     // The quick and easy way to fix this was to pass along the offset
600     // for the pointer.
601     const my_ptrdiff_t ptr_diff = record - tbl->record[0];
602 
603     if (field->real_maybe_null()) {
604       DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
605       if (field->is_real_null(ptr_diff)) {
606         /* NULL value. store '\0' so that it sorts before non-NULL values */
607         *tuple++ = 0;
608         /* That's it, don't store anything else */
609         if (n_null_fields)
610           (*n_null_fields)++;
611         continue;
612       } else {
613         /* Not a NULL value. Store '1' */
614         *tuple++ = 1;
615       }
616     }
617 
618     const bool create_unpack_info =
619         (unpack_info && // we were requested to generate unpack_info
620          m_pack_info[i].uses_unpack_info()); // and this keypart uses it
621     Rdb_pack_field_context pack_ctx(unpack_info);
622 
623     // Set the offset for methods which do not take an offset as an argument
624     DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
625                                      m_pack_info[i].m_max_image_len));
626     field->move_field_offset(ptr_diff);
627 
628     m_pack_info[i].m_pack_func(&m_pack_info[i], field, pack_buffer, &tuple,
629                                &pack_ctx);
630 
631     /* Make "unpack info" to be stored in the value */
632     if (create_unpack_info) {
633       m_pack_info[i].m_make_unpack_info_func(m_pack_info[i].m_charset_codec,
634                                              field, &pack_ctx);
635     }
636     field->move_field_offset(-ptr_diff);
637   }
638 
639   if (unpack_info) {
640     const size_t len = unpack_info->get_current_pos();
641     DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
642 
643     // Don't store the unpack_info if it has only the header (that is, there's
644     // no meaningful content).
645     // Primary Keys are special: for them, store the unpack_info even if it's
646     // empty (provided m_maybe_unpack_info==true, see
647     // ha_rocksdb::convert_record_to_storage_format)
648     if (len == RDB_UNPACK_HEADER_SIZE &&
649         m_index_type != Rdb_key_def::INDEX_TYPE_PRIMARY) {
650       unpack_info->clear();
651     } else {
652       unpack_info->write_uint16_at(unpack_len_pos, len);
653     }
654 
655     //
656     // Secondary keys have key and value checksums in the value part
657     // Primary key is a special case (the value part has non-indexed columns),
658     // so the checksums are computed and stored by
659     // ha_rocksdb::convert_record_to_storage_format
660     //
661     if (should_store_row_debug_checksums) {
662       const uint32_t key_crc32 = crc32(0, packed_tuple, tuple - packed_tuple);
663       const uint32_t val_crc32 =
664           crc32(0, unpack_info->ptr(), unpack_info->get_current_pos());
665 
666       unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
667       unpack_info->write_uint32(key_crc32);
668       unpack_info->write_uint32(val_crc32);
669     }
670   }
671 
672   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
673 
674   return tuple - packed_tuple;
675 }
676 
677 /**
678   Pack the hidden primary key into mem-comparable form.
679 
680   @param
681     tbl                   Table we're working on
682     hidden_pk_id     IN   New value to be packed into key
683     packed_tuple     OUT  Key in the mem-comparable form
684 
685   @return
686     Length of the packed tuple
687 */
688 
pack_hidden_pk(const longlong & hidden_pk_id,uchar * const packed_tuple) const689 uint Rdb_key_def::pack_hidden_pk(const longlong &hidden_pk_id,
690                                  uchar *const packed_tuple) const {
691   DBUG_ASSERT(packed_tuple != nullptr);
692 
693   uchar *tuple = packed_tuple;
694   rdb_netbuf_store_index(tuple, m_index_number);
695   tuple += INDEX_NUMBER_SIZE;
696   DBUG_ASSERT(m_key_parts == 1);
697   DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
698                                    m_pack_info[0].m_max_image_len));
699 
700   m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
701 
702   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
703   return tuple - packed_tuple;
704 }
705 
706 /*
707   Function of type rdb_index_field_pack_t
708 */
709 
rdb_pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)710 void rdb_pack_with_make_sort_key(Rdb_field_packing *const fpi,
711                                  Field *const field,
712                                  uchar *const buf __attribute__((__unused__)),
713                                  uchar **dst,
714                                  Rdb_pack_field_context *const pack_ctx
715                                  __attribute__((__unused__))) {
716   DBUG_ASSERT(fpi != nullptr);
717   DBUG_ASSERT(field != nullptr);
718   DBUG_ASSERT(dst != nullptr);
719   DBUG_ASSERT(*dst != nullptr);
720 
721   const int max_len = fpi->m_max_image_len;
722   field->make_sort_key(*dst, max_len);
723   *dst += max_len;
724 }
725 
726 /*
727   Compares two keys without unpacking
728 
729   @detail
730   @return
731     0 - Ok. column_index is the index of the first column which is different.
732           -1 if two kes are equal
733     1 - Data format error.
734 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const735 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
736                               const rocksdb::Slice *key2,
737                               std::size_t *const column_index) const {
738   DBUG_ASSERT(key1 != nullptr);
739   DBUG_ASSERT(key2 != nullptr);
740   DBUG_ASSERT(column_index != nullptr);
741 
742   // the caller should check the return value and
743   // not rely on column_index being valid
744   *column_index = 0xbadf00d;
745 
746   Rdb_string_reader reader1(key1);
747   Rdb_string_reader reader2(key2);
748 
749   // Skip the index number
750   if ((!reader1.read(INDEX_NUMBER_SIZE)))
751     return HA_EXIT_FAILURE;
752 
753   if ((!reader2.read(INDEX_NUMBER_SIZE)))
754     return HA_EXIT_FAILURE;
755 
756   for (uint i = 0; i < m_key_parts; i++) {
757     const Rdb_field_packing *const fpi = &m_pack_info[i];
758     if (fpi->m_maybe_null) {
759       const auto nullp1 = reader1.read(1);
760       const auto nullp2 = reader2.read(1);
761 
762       if (nullp1 == nullptr || nullp2 == nullptr) {
763         return HA_EXIT_FAILURE;
764       }
765 
766       if (*nullp1 != *nullp2) {
767         *column_index = i;
768         return HA_EXIT_SUCCESS;
769       }
770 
771       if (*nullp1 == 0) {
772         /* This is a NULL value */
773         continue;
774       }
775     }
776 
777     const auto before_skip1 = reader1.get_current_ptr();
778     const auto before_skip2 = reader2.get_current_ptr();
779     DBUG_ASSERT(fpi->m_skip_func);
780     if (fpi->m_skip_func(fpi, nullptr, &reader1))
781       return HA_EXIT_FAILURE;
782     if (fpi->m_skip_func(fpi, nullptr, &reader2))
783       return HA_EXIT_FAILURE;
784     const auto size1 = reader1.get_current_ptr() - before_skip1;
785     const auto size2 = reader2.get_current_ptr() - before_skip2;
786     if (size1 != size2) {
787       *column_index = i;
788       return HA_EXIT_SUCCESS;
789     }
790 
791     if (memcmp(before_skip1, before_skip2, size1) != 0) {
792       *column_index = i;
793       return HA_EXIT_SUCCESS;
794     }
795   }
796 
797   *column_index = m_key_parts;
798   return HA_EXIT_SUCCESS;
799 }
800 
801 /*
802   @brief
803     Given a zero-padded key, determine its real key length
804 
805   @detail
806     Fixed-size skip functions just read.
807 */
808 
key_length(const TABLE * const table,const rocksdb::Slice & key) const809 size_t Rdb_key_def::key_length(const TABLE *const table,
810                                const rocksdb::Slice &key) const {
811   DBUG_ASSERT(table != nullptr);
812 
813   Rdb_string_reader reader(&key);
814 
815   if ((!reader.read(INDEX_NUMBER_SIZE)))
816     return size_t(-1);
817 
818   for (uint i = 0; i < m_key_parts; i++) {
819     const Rdb_field_packing *fpi = &m_pack_info[i];
820     const Field *field = nullptr;
821     if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY)
822       field = fpi->get_field_in_table(table);
823     if (fpi->m_skip_func(fpi, field, &reader))
824       return size_t(-1);
825   }
826   return key.size() - reader.remaining_bytes();
827 }
828 
829 /*
830   Take mem-comparable form and unpack_info and unpack it to Table->record
831 
832   @detail
833     not all indexes support this
834 
835   @return
836     UNPACK_SUCCESS - Ok
837     UNPACK_FAILURE - Data format error.
838 */
839 
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool & verify_row_debug_checksums) const840 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
841                                const rocksdb::Slice *const packed_key,
842                                const rocksdb::Slice *const unpack_info,
843                                const bool &verify_row_debug_checksums) const {
844   Rdb_string_reader reader(packed_key);
845   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
846 
847   const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
848   const bool hidden_pk_exists = table_has_hidden_pk(table);
849   const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
850   // There is no checksuming data after unpack_info for primary keys, because
851   // the layout there is different. The checksum is verified in
852   // ha_rocksdb::convert_record_from_storage_format instead.
853   DBUG_ASSERT_IMP(!secondary_key, !verify_row_debug_checksums);
854 
855   // Old Field methods expected the record pointer to be at tbl->record[0].
856   // The quick and easy way to fix this was to pass along the offset
857   // for the pointer.
858   const my_ptrdiff_t ptr_diff = buf - table->record[0];
859 
860   // Skip the index number
861   if ((!reader.read(INDEX_NUMBER_SIZE))) {
862     return HA_EXIT_FAILURE;
863   }
864 
865   // For secondary keys, we expect the value field to contain unpack data and
866   // checksum data in that order. One or both can be missing, but they cannot
867   // be reordered.
868   const bool has_unpack_info =
869       unp_reader.remaining_bytes() &&
870       *unp_reader.get_current_ptr() == RDB_UNPACK_DATA_TAG;
871   if (has_unpack_info && !unp_reader.read(RDB_UNPACK_HEADER_SIZE)) {
872     return HA_EXIT_FAILURE;
873   }
874 
875   for (uint i = 0; i < m_key_parts; i++) {
876     Rdb_field_packing *const fpi = &m_pack_info[i];
877 
878     /*
879       Hidden pk field is packed at the end of the secondary keys, but the SQL
880       layer does not know about it. Skip retrieving field if hidden pk.
881     */
882     if ((secondary_key && hidden_pk_exists && i + 1 == m_key_parts) ||
883         is_hidden_pk) {
884       DBUG_ASSERT(fpi->m_unpack_func);
885       if (fpi->m_skip_func(fpi, nullptr, &reader)) {
886         return HA_EXIT_FAILURE;
887       }
888       continue;
889     }
890 
891     Field *const field = fpi->get_field_in_table(table);
892 
893     if (fpi->m_unpack_func) {
894       /* It is possible to unpack this column. Do it. */
895 
896       if (fpi->m_maybe_null) {
897         const char *nullp;
898         if (!(nullp = reader.read(1)))
899           return HA_EXIT_FAILURE;
900         if (*nullp == 0) {
901           /* Set the NULL-bit of this field */
902           field->set_null(ptr_diff);
903           /* Also set the field to its default value */
904           uint field_offset = field->ptr - table->record[0];
905           memcpy(buf + field_offset, table->s->default_values + field_offset,
906                  field->pack_length());
907           continue;
908         } else if (*nullp == 1)
909           field->set_notnull(ptr_diff);
910         else
911           return HA_EXIT_FAILURE;
912       }
913 
914       // If we need unpack info, but there is none, tell the unpack function
915       // this by passing unp_reader as nullptr. If we never read unpack_info
916       // during unpacking anyway, then there won't an error.
917       const bool maybe_missing_unpack =
918           !has_unpack_info && fpi->uses_unpack_info();
919       const int res =
920           fpi->m_unpack_func(fpi, field, field->ptr + ptr_diff, &reader,
921                              maybe_missing_unpack ? nullptr : &unp_reader);
922 
923       if (res)
924         return res;
925     } else {
926       /* It is impossible to unpack the column. Skip it. */
927       if (fpi->m_maybe_null) {
928         const char *nullp;
929         if (!(nullp = reader.read(1)))
930           return HA_EXIT_FAILURE;
931         if (*nullp == 0) {
932           /* This is a NULL value */
933           continue;
934         }
935         /* If NULL marker is not '0', it can be only '1'  */
936         if (*nullp != 1)
937           return HA_EXIT_FAILURE;
938       }
939       if (fpi->m_skip_func(fpi, field, &reader))
940         return HA_EXIT_FAILURE;
941     }
942   }
943 
944   /*
945     Check checksum values if present
946   */
947   const char *ptr;
948   if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
949     if (verify_row_debug_checksums) {
950       uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
951           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
952       const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
953           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
954 
955       const uint32_t computed_key_chksum =
956           crc32(0, (const uchar *)packed_key->data(), packed_key->size());
957       const uint32_t computed_val_chksum =
958           crc32(0, (const uchar *)unpack_info->data(),
959                 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
960 
961       DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
962                       stored_key_chksum++;);
963 
964       if (stored_key_chksum != computed_key_chksum) {
965         report_checksum_mismatch(true, packed_key->data(), packed_key->size());
966         return HA_EXIT_FAILURE;
967       }
968 
969       if (stored_val_chksum != computed_val_chksum) {
970         report_checksum_mismatch(false, unpack_info->data(),
971                                  unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
972         return HA_EXIT_FAILURE;
973       }
974     } else {
975       /* The checksums are present but we are not checking checksums */
976     }
977   }
978 
979   if (reader.remaining_bytes())
980     return HA_EXIT_FAILURE;
981 
982   return HA_EXIT_SUCCESS;
983 }
984 
table_has_hidden_pk(const TABLE * const table)985 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
986   return table->s->primary_key == MAX_INDEXES;
987 }
988 
report_checksum_mismatch(const bool & is_key,const char * const data,const size_t data_size) const989 void Rdb_key_def::report_checksum_mismatch(const bool &is_key,
990                                            const char *const data,
991                                            const size_t data_size) const {
992   // NO_LINT_DEBUG
993   sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
994                   is_key ? "key" : "value", get_index_number());
995 
996   const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
997   // NO_LINT_DEBUG
998   sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
999                   (uint64_t)data_size, buf.c_str());
1000 
1001   my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1002 }
1003 
index_format_min_check(const int & pk_min,const int & sk_min) const1004 bool Rdb_key_def::index_format_min_check(const int &pk_min,
1005                                          const int &sk_min) const {
1006   switch (m_index_type) {
1007   case INDEX_TYPE_PRIMARY:
1008   case INDEX_TYPE_HIDDEN_PRIMARY:
1009     return (m_kv_format_version >= pk_min);
1010   case INDEX_TYPE_SECONDARY:
1011     return (m_kv_format_version >= sk_min);
1012   default:
1013     DBUG_ASSERT(0);
1014     return false;
1015   }
1016 }
1017 
1018 ///////////////////////////////////////////////////////////////////////////////////////////
1019 // Rdb_field_packing
1020 ///////////////////////////////////////////////////////////////////////////////////////////
1021 
1022 /*
1023   Function of type rdb_index_field_skip_t
1024 */
1025 
rdb_skip_max_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1026 int rdb_skip_max_length(const Rdb_field_packing *const fpi,
1027                         const Field *const field __attribute__((__unused__)),
1028                         Rdb_string_reader *const reader) {
1029   if (!reader->read(fpi->m_max_image_len))
1030     return HA_EXIT_FAILURE;
1031   return HA_EXIT_SUCCESS;
1032 }
1033 
1034 /*
1035   (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1036   split in the middle of an UTF-8 character. See the implementation of
1037   rdb_unpack_binary_or_utf8_varchar.
1038 */
1039 
1040 const uint RDB_ESCAPE_LENGTH = 9;
1041 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1042               "RDB_ESCAPE_LENGTH-1 must be even.");
1043 
1044 /*
1045   Function of type rdb_index_field_skip_t
1046 */
1047 
rdb_skip_variable_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1048 static int rdb_skip_variable_length(const Rdb_field_packing *const fpi
1049                                     __attribute__((__unused__)),
1050                                     const Field *const field,
1051                                     Rdb_string_reader *const reader) {
1052   const uchar *ptr;
1053   bool finished = false;
1054 
1055   size_t dst_len; /* How much data can be there */
1056   if (field) {
1057     const Field_varstring *const field_var =
1058         static_cast<const Field_varstring *>(field);
1059     dst_len = field_var->pack_length() - field_var->length_bytes;
1060   } else {
1061     dst_len = UINT_MAX;
1062   }
1063 
1064   /* Decode the length-emitted encoding here */
1065   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1066     /* See rdb_pack_with_varchar_encoding. */
1067     const uchar pad =
1068         255 - ptr[RDB_ESCAPE_LENGTH - 1]; // number of padding bytes
1069     const uchar used_bytes = RDB_ESCAPE_LENGTH - 1 - pad;
1070 
1071     if (used_bytes > RDB_ESCAPE_LENGTH - 1 || used_bytes > dst_len) {
1072       return HA_EXIT_FAILURE; /* cannot store that much, invalid data */
1073     }
1074 
1075     if (used_bytes < RDB_ESCAPE_LENGTH - 1) {
1076       finished = true;
1077       break;
1078     }
1079     dst_len -= used_bytes;
1080   }
1081 
1082   if (!finished) {
1083     return HA_EXIT_FAILURE;
1084   }
1085 
1086   return HA_EXIT_SUCCESS;
1087 }
1088 
1089 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1090 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1091 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1092 
1093 /*
1094   Skip a keypart that uses Variable-Length Space-Padded encoding
1095 */
1096 
rdb_skip_variable_space_pad(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1097 static int rdb_skip_variable_space_pad(const Rdb_field_packing *const fpi,
1098                                        const Field *const field,
1099                                        Rdb_string_reader *const reader) {
1100   const uchar *ptr;
1101   bool finished = false;
1102 
1103   size_t dst_len = UINT_MAX; /* How much data can be there */
1104 
1105   if (field) {
1106     const Field_varstring *const field_var =
1107         static_cast<const Field_varstring *>(field);
1108     dst_len = field_var->pack_length() - field_var->length_bytes;
1109   }
1110 
1111   /* Decode the length-emitted encoding here */
1112   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1113     // See rdb_pack_with_varchar_space_pad
1114     const uchar c = ptr[fpi->m_segment_size - 1];
1115     if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1116       // This is the last segment
1117       finished = true;
1118       break;
1119     } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1120                c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1121       // This is not the last segment
1122       if ((fpi->m_segment_size - 1) > dst_len) {
1123         // The segment is full of data but the table field can't hold that
1124         // much! This must be data corruption.
1125         return HA_EXIT_FAILURE;
1126       }
1127       dst_len -= (fpi->m_segment_size - 1);
1128     } else {
1129       // Encountered a value that's none of the VARCHAR_CMP* constants
1130       // It's data corruption.
1131       return HA_EXIT_FAILURE;
1132     }
1133   }
1134   return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1135 }
1136 
1137 /*
1138   Function of type rdb_index_field_unpack_t
1139 */
1140 
rdb_unpack_integer(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1141 int rdb_unpack_integer(Rdb_field_packing *const fpi, Field *const field,
1142                        uchar *const to, Rdb_string_reader *const reader,
1143                        Rdb_string_reader *const unp_reader
1144                        __attribute__((__unused__))) {
1145   const int length = fpi->m_max_image_len;
1146 
1147   const uchar *from;
1148   if (!(from = (const uchar *)reader->read(length)))
1149     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1150 
1151 #ifdef WORDS_BIGENDIAN
1152   {
1153     if (((Field_num *)field)->unsigned_flag)
1154       to[0] = from[0];
1155     else
1156       to[0] = (char)(from[0] ^ 128); // Reverse the sign bit.
1157     memcpy(to + 1, from + 1, length - 1);
1158   }
1159 #else
1160   {
1161     const int sign_byte = from[0];
1162     if (((Field_num *)field)->unsigned_flag)
1163       to[length - 1] = sign_byte;
1164     else
1165       to[length - 1] =
1166           static_cast<char>(sign_byte ^ 128); // Reverse the sign bit.
1167     for (int i = 0, j = length - 1; i < length - 1; ++i, --j)
1168       to[i] = from[j];
1169   }
1170 #endif
1171   return UNPACK_SUCCESS;
1172 }
1173 
1174 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)1175 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1176 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1177   // A few systems store the most-significant _word_ first on little-endian
1178   dst[0] = src[3];
1179   dst[1] = src[2];
1180   dst[2] = src[1];
1181   dst[3] = src[0];
1182   dst[4] = src[7];
1183   dst[5] = src[6];
1184   dst[6] = src[5];
1185   dst[7] = src[4];
1186 #else
1187   dst[0] = src[7];
1188   dst[1] = src[6];
1189   dst[2] = src[5];
1190   dst[3] = src[4];
1191   dst[4] = src[3];
1192   dst[5] = src[2];
1193   dst[6] = src[1];
1194   dst[7] = src[0];
1195 #endif
1196 }
1197 
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)1198 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1199   dst[0] = src[3];
1200   dst[1] = src[2];
1201   dst[2] = src[1];
1202   dst[3] = src[0];
1203 }
1204 #else
1205 #define rdb_swap_double_bytes nullptr
1206 #define rdb_swap_float_bytes nullptr
1207 #endif
1208 
rdb_unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t & size,const int & exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))1209 static int rdb_unpack_floating_point(
1210     uchar *const dst, Rdb_string_reader *const reader, const size_t &size,
1211     const int &exp_digit, const uchar *const zero_pattern,
1212     const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
1213   const uchar *const from = (const uchar *)reader->read(size);
1214   if (from == nullptr)
1215     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1216 
1217   /* Check to see if the value is zero */
1218   if (memcmp(from, zero_pattern, size) == 0) {
1219     memcpy(dst, zero_val, size);
1220     return UNPACK_SUCCESS;
1221   }
1222 
1223 #if defined(WORDS_BIGENDIAN)
1224   // On big-endian, output can go directly into result
1225   uchar *const tmp = dst;
1226 #else
1227   // Otherwise use a temporary buffer to make byte-swapping easier later
1228   uchar tmp[8];
1229 #endif
1230 
1231   memcpy(tmp, from, size);
1232 
1233   if (tmp[0] & 0x80) {
1234     // If the high bit is set the original value was positive so
1235     // remove the high bit and subtract one from the exponent.
1236     ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1237     exp_part &= 0x7FFF;                            // clear high bit;
1238     exp_part -= (ushort)1 << (16 - 1 - exp_digit); // subtract from exponent
1239     tmp[0] = (uchar)(exp_part >> 8);
1240     tmp[1] = (uchar)exp_part;
1241   } else {
1242     // Otherwise the original value was negative and all bytes have been
1243     // negated.
1244     for (size_t ii = 0; ii < size; ii++)
1245       tmp[ii] ^= 0xFF;
1246   }
1247 
1248 #if !defined(WORDS_BIGENDIAN)
1249   // On little-endian, swap the bytes around
1250   swap_func(dst, tmp);
1251 #else
1252   static_assert(swap_func == nullptr, "Assuming that no swapping is needed.");
1253 #endif
1254 
1255   return UNPACK_SUCCESS;
1256 }
1257 
1258 #if !defined(DBL_EXP_DIG)
1259 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
1260 #endif
1261 
1262 /*
1263   Function of type rdb_index_field_unpack_t
1264 
1265   Unpack a double by doing the reverse action of change_double_for_sort
1266   (sql/filesort.cc).  Note that this only works on IEEE values.
1267   Note also that this code assumes that NaN and +/-Infinity are never
1268   allowed in the database.
1269 */
rdb_unpack_double(Rdb_field_packing * const fpi,Field * const field,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1270 static int rdb_unpack_double(Rdb_field_packing *const fpi
1271                              __attribute__((__unused__)),
1272                              Field *const field __attribute__((__unused__)),
1273                              uchar *const field_ptr,
1274                              Rdb_string_reader *const reader,
1275                              Rdb_string_reader *const unp_reader
1276                              __attribute__((__unused__))) {
1277   static double zero_val = 0.0;
1278   static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
1279 
1280   return rdb_unpack_floating_point(
1281       field_ptr, reader, sizeof(double), DBL_EXP_DIG, zero_pattern,
1282       (const uchar *)&zero_val, rdb_swap_double_bytes);
1283 }
1284 
1285 #if !defined(FLT_EXP_DIG)
1286 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
1287 #endif
1288 
1289 /*
1290   Function of type rdb_index_field_unpack_t
1291 
1292   Unpack a float by doing the reverse action of Field_float::make_sort_key
1293   (sql/field.cc).  Note that this only works on IEEE values.
1294   Note also that this code assumes that NaN and +/-Infinity are never
1295   allowed in the database.
1296 */
rdb_unpack_float(Rdb_field_packing * const,Field * const field,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1297 static int rdb_unpack_float(Rdb_field_packing *const,
1298                             Field *const field __attribute__((__unused__)),
1299                             uchar *const field_ptr,
1300                             Rdb_string_reader *const reader,
1301                             Rdb_string_reader *const unp_reader
1302                             __attribute__((__unused__))) {
1303   static float zero_val = 0.0;
1304   static const uchar zero_pattern[4] = {128, 0, 0, 0};
1305 
1306   return rdb_unpack_floating_point(
1307       field_ptr, reader, sizeof(float), FLT_EXP_DIG, zero_pattern,
1308       (const uchar *)&zero_val, rdb_swap_float_bytes);
1309 }
1310 
1311 /*
1312   Function of type rdb_index_field_unpack_t used to
1313   Unpack by doing the reverse action to Field_newdate::make_sort_key.
1314 */
1315 
rdb_unpack_newdate(Rdb_field_packing * const fpi,Field * constfield,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1316 int rdb_unpack_newdate(Rdb_field_packing *const fpi, Field *constfield,
1317                        uchar *const field_ptr, Rdb_string_reader *const reader,
1318                        Rdb_string_reader *const unp_reader
1319                        __attribute__((__unused__))) {
1320   const char *from;
1321   DBUG_ASSERT(fpi->m_max_image_len == 3);
1322 
1323   if (!(from = reader->read(3)))
1324     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1325 
1326   field_ptr[0] = from[2];
1327   field_ptr[1] = from[1];
1328   field_ptr[2] = from[0];
1329   return UNPACK_SUCCESS;
1330 }
1331 
1332 /*
1333   Function of type rdb_index_field_unpack_t, used to
1334   Unpack the string by copying it over.
1335   This is for BINARY(n) where the value occupies the whole length.
1336 */
1337 
rdb_unpack_binary_str(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1338 static int rdb_unpack_binary_str(Rdb_field_packing *const fpi,
1339                                  Field *const field, uchar *const to,
1340                                  Rdb_string_reader *const reader,
1341                                  Rdb_string_reader *const unp_reader
1342                                  __attribute__((__unused__))) {
1343   const char *from;
1344   if (!(from = reader->read(fpi->m_max_image_len)))
1345     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1346 
1347   memcpy(to, from, fpi->m_max_image_len);
1348   return UNPACK_SUCCESS;
1349 }
1350 
1351 /*
1352   Function of type rdb_index_field_unpack_t.
1353   For UTF-8, we need to convert 2-byte wide-character entities back into
1354   UTF8 sequences.
1355 */
1356 
rdb_unpack_utf8_str(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1357 static int rdb_unpack_utf8_str(Rdb_field_packing *const fpi, Field *const field,
1358                                uchar *dst, Rdb_string_reader *const reader,
1359                                Rdb_string_reader *const unp_reader
1360                                __attribute__((__unused__))) {
1361   my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
1362   const uchar *src;
1363   if (!(src = (const uchar *)reader->read(fpi->m_max_image_len)))
1364     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1365 
1366   const uchar *const src_end = src + fpi->m_max_image_len;
1367   uchar *const dst_end = dst + field->pack_length();
1368 
1369   while (src < src_end) {
1370     my_wc_t wc = (src[0] << 8) | src[1];
1371     src += 2;
1372     int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
1373     DBUG_ASSERT(res > 0 && res <= 3);
1374     if (res < 0)
1375       return UNPACK_FAILURE;
1376     dst += res;
1377   }
1378 
1379   cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
1380                    cset->pad_char);
1381   return UNPACK_SUCCESS;
1382 }
1383 
1384 /*
1385   Function of type rdb_index_field_pack_t
1386 */
1387 
rdb_pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)1388 static void rdb_pack_with_varchar_encoding(
1389     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
1390     Rdb_pack_field_context *const pack_ctx __attribute__((__unused__))) {
1391   /*
1392     Use a flag byte every Nth byte. Set it to (255 - #pad) where #pad is 0
1393     when the var length field filled all N-1 previous bytes and #pad is
1394     otherwise the number of padding bytes used.
1395 
1396     If N=8 and the field is:
1397     * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
1398     * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
1399     And the 4 byte string compares as greater than the 3 byte string
1400   */
1401   const CHARSET_INFO *const charset = field->charset();
1402   Field_varstring *const field_var = (Field_varstring *)field;
1403 
1404   const size_t value_length = (field_var->length_bytes == 1)
1405                                   ? (uint)*field->ptr
1406                                   : uint2korr(field->ptr);
1407   size_t xfrm_len = charset->coll->strnxfrm(
1408       charset, buf, fpi->m_max_image_len, field_var->char_length(),
1409       field_var->ptr + field_var->length_bytes, value_length, 0);
1410 
1411   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
1412 
1413   size_t encoded_size = 0;
1414   uchar *ptr = *dst;
1415   while (1) {
1416     const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, xfrm_len);
1417     const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
1418     memcpy(ptr, buf, copy_len);
1419     ptr += copy_len;
1420     buf += copy_len;
1421     // pad with zeros if necessary;
1422     for (size_t idx = 0; idx < padding_bytes; idx++)
1423       *(ptr++) = 0;
1424     *(ptr++) = 255 - padding_bytes;
1425 
1426     xfrm_len -= copy_len;
1427     encoded_size += RDB_ESCAPE_LENGTH;
1428     if (padding_bytes != 0)
1429       break;
1430   }
1431   *dst += encoded_size;
1432 }
1433 
1434 /*
1435   Compare the string in [buf..buf_end) with a string that is an infinite
1436   sequence of strings in space_xfrm
1437 */
1438 
1439 static int
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)1440 rdb_compare_string_with_spaces(const uchar *buf, const uchar *const buf_end,
1441                                const std::vector<uchar> *const space_xfrm) {
1442   int cmp = 0;
1443   while (buf < buf_end) {
1444     size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
1445     if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0)
1446       break;
1447     buf += bytes;
1448   }
1449   return cmp;
1450 }
1451 
1452 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
1453 /*
1454   Pack the data with Variable-Length Space-Padded Encoding.
1455 
1456   The encoding is there to meet two goals:
1457 
1458   Goal#1. Comparison. The SQL standard says
1459 
1460     " If the collation for the comparison has the PAD SPACE characteristic,
1461     for the purposes of the comparison, the shorter value is effectively
1462     extended to the length of the longer by concatenation of <space>s on the
1463     right.
1464 
1465   At the moment, all MySQL collations except one have the PAD SPACE
1466   characteristic.  The exception is the "binary" collation that is used by
1467   [VAR]BINARY columns. (Note that binary collations for specific charsets,
1468   like utf8_bin or latin1_bin are not the same as "binary" collation, they have
1469   the PAD SPACE characteristic).
1470 
1471   Goal#2 is to preserve the number of trailing spaces in the original value.
1472 
1473   This is achieved by using the following encoding:
1474   The key part:
1475   - Stores mem-comparable image of the column
1476   - It is stored in chunks of fpi->m_segment_size bytes (*)
1477     = If the remainder of the chunk is not occupied, it is padded with mem-
1478       comparable image of the space character (cs->pad_char to be precise).
1479   - The last byte of the chunk shows how the rest of column's mem-comparable
1480     image would compare to mem-comparable image of the column extended with
1481     spaces. There are three possible values.
1482      - VARCHAR_CMP_LESS_THAN_SPACES,
1483      - VARCHAR_CMP_EQUAL_TO_SPACES
1484      - VARCHAR_CMP_GREATER_THAN_SPACES
1485 
1486   VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
1487   is spaces, or something that sorts as spaces, so there is no reason to store
1488   it).
1489 
1490   Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
1491 
1492    'abcd\0'   => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0    ' <VARCHAR_CMP_EQUAL> ]
1493    'abcd'     => [ 'abcd' <VARCHAR_CMP_EQUAL>]
1494    'abcd   '  => [ 'abcd' <VARCHAR_CMP_EQUAL>]
1495    'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
1496 
1497   As mentioned above, the last chunk is padded with mem-comparable images of
1498   cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
1499 
1500   fpi->m_segment_size depends on the used collation. It is chosen to be such
1501   that no mem-comparable image of space will ever stretch across the segments
1502   (see get_segment_size_from_collation).
1503 
1504   == The value part (aka unpack_info) ==
1505   The value part stores the number of space characters that one needs to add
1506   when unpacking the string.
1507   - If the number is positive, it means add this many spaces at the end
1508   - If the number is negative, it means padding has added extra spaces which
1509     must be removed.
1510 
1511   Storage considerations
1512   - depending on column's max size, the number may occupy 1 or 2 bytes
1513   - the number of spaces that need to be removed is not more than
1514     RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
1515     then store it as unsigned.
1516 
1517   @seealso
1518     rdb_unpack_binary_or_utf8_varchar_space_pad
1519     rdb_unpack_simple_varchar_space_pad
1520     rdb_dummy_make_unpack_info
1521     rdb_skip_variable_space_pad
1522 */
1523 
1524 static void
rdb_pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)1525 rdb_pack_with_varchar_space_pad(Rdb_field_packing *const fpi,
1526                                 Field *const field, uchar *buf, uchar **dst,
1527                                 Rdb_pack_field_context *const pack_ctx) {
1528   Rdb_string_writer *const unpack_info = pack_ctx->writer;
1529   const CHARSET_INFO *const charset = field->charset();
1530   const auto field_var = static_cast<Field_varstring *>(field);
1531 
1532   const size_t value_length = (field_var->length_bytes == 1)
1533                                   ? (uint)*field->ptr
1534                                   : uint2korr(field->ptr);
1535 
1536   const size_t trimmed_len = charset->cset->lengthsp(
1537       charset, (const char *)field_var->ptr + field_var->length_bytes,
1538       value_length);
1539   const size_t xfrm_len = charset->coll->strnxfrm(
1540       charset, buf, fpi->m_max_image_len, field_var->char_length(),
1541       field_var->ptr + field_var->length_bytes, trimmed_len, 0);
1542 
1543   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
1544   uchar *const buf_end = buf + xfrm_len;
1545 
1546   size_t encoded_size = 0;
1547   uchar *ptr = *dst;
1548   size_t padding_bytes;
1549   while (true) {
1550     const size_t copy_len =
1551         std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
1552     padding_bytes = fpi->m_segment_size - 1 - copy_len;
1553     memcpy(ptr, buf, copy_len);
1554     ptr += copy_len;
1555     buf += copy_len;
1556 
1557     if (padding_bytes) {
1558       memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
1559       ptr += padding_bytes;
1560       *ptr = VARCHAR_CMP_EQUAL_TO_SPACES; // last segment
1561     } else {
1562       // Compare the string suffix with a hypothetical infinite string of
1563       // spaces. It could be that the first difference is beyond the end of
1564       // current chunk.
1565       const int cmp =
1566           rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
1567 
1568       if (cmp < 0)
1569         *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
1570       else if (cmp > 0)
1571         *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
1572       else {
1573         // It turns out all the rest are spaces.
1574         *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
1575       }
1576     }
1577     encoded_size += fpi->m_segment_size;
1578 
1579     if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES)
1580       break;
1581   }
1582 
1583   // m_unpack_info_stores_value means unpack_info stores the whole original
1584   // value. There is no need to store the number of trimmed/padded endspaces
1585   // in that case.
1586   if (unpack_info && !fpi->m_unpack_info_stores_value) {
1587     // (value_length - trimmed_len) is the number of trimmed space *characters*
1588     // then, padding_bytes is the number of *bytes* added as padding
1589     // then, we add 8, because we don't store negative values.
1590     DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
1591     DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
1592     const size_t removed_chars =
1593         RDB_TRIMMED_CHARS_OFFSET +
1594         (value_length - trimmed_len) / fpi->space_mb_len -
1595         padding_bytes / fpi->space_xfrm_len;
1596 
1597     if (fpi->m_unpack_info_uses_two_bytes) {
1598       unpack_info->write_uint16(removed_chars);
1599     } else {
1600       DBUG_ASSERT(removed_chars < 0x100);
1601       unpack_info->write_uint8(removed_chars);
1602     }
1603   }
1604 
1605   *dst += encoded_size;
1606 }
1607 
1608 /*
1609   Function of type rdb_index_field_unpack_t
1610 */
1611 
rdb_unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1612 static int rdb_unpack_binary_or_utf8_varchar(Rdb_field_packing *const fpi,
1613                                              Field *const field, uchar *dst,
1614                                              Rdb_string_reader *const reader,
1615                                              Rdb_string_reader *const unp_reader
1616                                              __attribute__((__unused__))) {
1617   const uchar *ptr;
1618   size_t len = 0;
1619   bool finished = false;
1620   uchar *d0 = dst;
1621   Field_varstring *const field_var = (Field_varstring *)field;
1622   dst += field_var->length_bytes;
1623   // How much we can unpack
1624   size_t dst_len = field_var->pack_length() - field_var->length_bytes;
1625   uchar *const dst_end = dst + dst_len;
1626 
1627   /* Decode the length-emitted encoding here */
1628   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1629     /* See rdb_pack_with_varchar_encoding. */
1630     uchar pad = 255 - ptr[RDB_ESCAPE_LENGTH - 1]; // number of padding bytes
1631     uchar used_bytes = RDB_ESCAPE_LENGTH - 1 - pad;
1632 
1633     if (used_bytes > RDB_ESCAPE_LENGTH - 1) {
1634       return UNPACK_FAILURE; /* cannot store that much, invalid data */
1635     }
1636 
1637     if (dst_len < used_bytes) {
1638       /* Encoded index tuple is longer than the size in the record buffer? */
1639       return UNPACK_FAILURE;
1640     }
1641 
1642     /*
1643       Now, we need to decode used_bytes of data and append them to the value.
1644     */
1645     if (fpi->m_varchar_charset == &my_charset_utf8_bin) {
1646       if (used_bytes & 1) {
1647         /*
1648           UTF-8 characters are encoded into two-byte entities. There is no way
1649           we can have an odd number of bytes after encoding.
1650         */
1651         return UNPACK_FAILURE;
1652       }
1653 
1654       const uchar *src = ptr;
1655       const uchar *src_end = ptr + used_bytes;
1656       while (src < src_end) {
1657         my_wc_t wc = (src[0] << 8) | src[1];
1658         src += 2;
1659         const CHARSET_INFO *cset = fpi->m_varchar_charset;
1660         int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
1661         DBUG_ASSERT(res > 0 && res <= 3);
1662         if (res < 0)
1663           return UNPACK_FAILURE;
1664         dst += res;
1665         len += res;
1666         dst_len -= res;
1667       }
1668     } else {
1669       memcpy(dst, ptr, used_bytes);
1670       dst += used_bytes;
1671       dst_len -= used_bytes;
1672       len += used_bytes;
1673     }
1674 
1675     if (used_bytes < RDB_ESCAPE_LENGTH - 1) {
1676       finished = true;
1677       break;
1678     }
1679   }
1680 
1681   if (!finished)
1682     return UNPACK_FAILURE;
1683 
1684   /* Save the length */
1685   if (field_var->length_bytes == 1) {
1686     d0[0] = len;
1687   } else {
1688     DBUG_ASSERT(field_var->length_bytes == 2);
1689     int2store(d0, len);
1690   }
1691   return UNPACK_SUCCESS;
1692 }
1693 
1694 /*
1695   @seealso
1696     rdb_pack_with_varchar_space_pad - packing function
1697     rdb_unpack_simple_varchar_space_pad - unpacking function for 'simple'
1698     charsets.
1699     rdb_skip_variable_space_pad - skip function
1700 */
rdb_unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1701 static int rdb_unpack_binary_or_utf8_varchar_space_pad(
1702     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
1703     Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
1704   const uchar *ptr;
1705   size_t len = 0;
1706   bool finished = false;
1707   Field_varstring *const field_var = static_cast<Field_varstring *>(field);
1708   uchar *d0 = dst;
1709   uchar *dst_end = dst + field_var->pack_length();
1710   dst += field_var->length_bytes;
1711 
1712   uint space_padding_bytes = 0;
1713   uint extra_spaces;
1714   if ((fpi->m_unpack_info_uses_two_bytes
1715            ? unp_reader->read_uint16(&extra_spaces)
1716            : unp_reader->read_uint8(&extra_spaces))) {
1717     return UNPACK_FAILURE;
1718   }
1719 
1720   if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
1721     space_padding_bytes =
1722         -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
1723     extra_spaces = 0;
1724   } else
1725     extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
1726 
1727   space_padding_bytes *= fpi->space_xfrm_len;
1728 
1729   /* Decode the length-emitted encoding here */
1730   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1731     const char last_byte = ptr[fpi->m_segment_size - 1];
1732     size_t used_bytes;
1733     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) // this is the last segment
1734     {
1735       if (space_padding_bytes > (fpi->m_segment_size - 1))
1736         return UNPACK_FAILURE; // Cannot happen, corrupted data
1737       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
1738       finished = true;
1739     } else {
1740       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
1741           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
1742         return UNPACK_FAILURE; // Invalid value
1743       }
1744       used_bytes = fpi->m_segment_size - 1;
1745     }
1746 
1747     // Now, need to decode used_bytes of data and append them to the value.
1748     if (fpi->m_varchar_charset == &my_charset_utf8_bin) {
1749       if (used_bytes & 1) {
1750         /*
1751           UTF-8 characters are encoded into two-byte entities. There is no way
1752           we can have an odd number of bytes after encoding.
1753         */
1754         return UNPACK_FAILURE;
1755       }
1756 
1757       const uchar *src = ptr;
1758       const uchar *const src_end = ptr + used_bytes;
1759       while (src < src_end) {
1760         my_wc_t wc = (src[0] << 8) | src[1];
1761         src += 2;
1762         const CHARSET_INFO *cset = fpi->m_varchar_charset;
1763         int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
1764         DBUG_ASSERT(res <= 3);
1765         if (res <= 0)
1766           return UNPACK_FAILURE;
1767         dst += res;
1768         len += res;
1769       }
1770     } else {
1771       if (dst + used_bytes > dst_end)
1772         return UNPACK_FAILURE;
1773       memcpy(dst, ptr, used_bytes);
1774       dst += used_bytes;
1775       len += used_bytes;
1776     }
1777 
1778     if (finished) {
1779       if (extra_spaces) {
1780         // Both binary and UTF-8 charset store space as ' ',
1781         // so the following is ok:
1782         if (dst + extra_spaces > dst_end)
1783           return UNPACK_FAILURE;
1784         memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
1785         len += extra_spaces;
1786       }
1787       break;
1788     }
1789   }
1790 
1791   if (!finished)
1792     return UNPACK_FAILURE;
1793 
1794   /* Save the length */
1795   if (field_var->length_bytes == 1) {
1796     d0[0] = len;
1797   } else {
1798     DBUG_ASSERT(field_var->length_bytes == 2);
1799     int2store(d0, len);
1800   }
1801   return UNPACK_SUCCESS;
1802 }
1803 
1804 /////////////////////////////////////////////////////////////////////////
1805 
1806 /*
1807   Function of type rdb_make_unpack_info_t
1808 */
1809 
rdb_make_unpack_unknown(const Rdb_collation_codec * codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)1810 static void rdb_make_unpack_unknown(const Rdb_collation_codec *codec
1811                                     __attribute__((__unused__)),
1812                                     const Field *const field,
1813                                     Rdb_pack_field_context *const pack_ctx) {
1814   pack_ctx->writer->write(field->ptr, field->pack_length());
1815 }
1816 
1817 /*
1818   This point of this function is only to indicate that unpack_info is
1819   available.
1820 
1821   The actual unpack_info data is produced by the function that packs the key,
1822   that is, rdb_pack_with_varchar_space_pad.
1823 */
1824 
rdb_dummy_make_unpack_info(const Rdb_collation_codec * codec,const Field * field,Rdb_pack_field_context * pack_ctx)1825 static void rdb_dummy_make_unpack_info(const Rdb_collation_codec *codec
1826                                        __attribute__((__unused__)),
1827                                        const Field *field
1828                                        __attribute__((__unused__)),
1829                                        Rdb_pack_field_context *pack_ctx
1830                                        __attribute__((__unused__))) {}
1831 
1832 /*
1833   Function of type rdb_index_field_unpack_t
1834 */
1835 
rdb_unpack_unknown(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1836 static int rdb_unpack_unknown(Rdb_field_packing *const fpi, Field *const field,
1837                               uchar *const dst, Rdb_string_reader *const reader,
1838                               Rdb_string_reader *const unp_reader) {
1839   const uchar *ptr;
1840   const uint len = fpi->m_unpack_data_len;
1841   // We don't use anything from the key, so skip over it.
1842   if (rdb_skip_max_length(fpi, field, reader)) {
1843     return UNPACK_FAILURE;
1844   }
1845 
1846   DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
1847 
1848   if ((ptr = (const uchar *)unp_reader->read(len))) {
1849     memcpy(dst, ptr, len);
1850     return UNPACK_SUCCESS;
1851   }
1852   return UNPACK_FAILURE;
1853 }
1854 
1855 /*
1856   Function of type rdb_make_unpack_info_t
1857 */
1858 
rdb_make_unpack_unknown_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)1859 static void rdb_make_unpack_unknown_varchar(
1860     const Rdb_collation_codec *const codec __attribute__((__unused__)),
1861     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
1862   const auto f = static_cast<const Field_varstring *>(field);
1863   uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
1864   len += f->length_bytes;
1865   pack_ctx->writer->write(field->ptr, len);
1866 }
1867 
1868 /*
1869   Function of type rdb_index_field_unpack_t
1870 
1871   @detail
1872   Unpack a key part in an "unknown" collation from its
1873   (mem_comparable_form, unpack_info) form.
1874 
1875   "Unknown" means we have no clue about how mem_comparable_form is made from
1876   the original string, so we keep the whole original string in the unpack_info.
1877 
1878   @seealso
1879     rdb_make_unpack_unknown, rdb_unpack_unknown
1880 */
1881 
rdb_unpack_unknown_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1882 static int rdb_unpack_unknown_varchar(Rdb_field_packing *const fpi,
1883                                       Field *const field, uchar *dst,
1884                                       Rdb_string_reader *const reader,
1885                                       Rdb_string_reader *const unp_reader) {
1886   const uchar *ptr;
1887   uchar *const d0 = dst;
1888   const auto f = static_cast<Field_varstring *>(field);
1889   dst += f->length_bytes;
1890   const uint len_bytes = f->length_bytes;
1891   // We don't use anything from the key, so skip over it.
1892   if (fpi->m_skip_func(fpi, field, reader)) {
1893     return UNPACK_FAILURE;
1894   }
1895 
1896   DBUG_ASSERT(len_bytes > 0);
1897   DBUG_ASSERT(unp_reader != nullptr);
1898 
1899   if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
1900     memcpy(d0, ptr, len_bytes);
1901     const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
1902     if ((ptr = (const uchar *)unp_reader->read(len))) {
1903       memcpy(dst, ptr, len);
1904       return UNPACK_SUCCESS;
1905     }
1906   }
1907   return UNPACK_FAILURE;
1908 }
1909 
1910 /*
1911   Write unpack_data for a "simple" collation
1912 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)1913 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
1914                                     const Rdb_collation_codec *const codec,
1915                                     const uchar *const src,
1916                                     const size_t src_len) {
1917   for (uint i = 0; i < src_len; i++) {
1918     writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
1919   }
1920 }
1921 
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t & src_len,uchar * const dst)1922 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
1923                                    const Rdb_collation_codec *const codec,
1924                                    const uchar *const src,
1925                                    const size_t &src_len, uchar *const dst) {
1926   for (uint i = 0; i < src_len; i++) {
1927     if (codec->m_dec_size[src[i]] > 0) {
1928       uint *ret;
1929       DBUG_ASSERT(reader != nullptr);
1930 
1931       if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
1932         return UNPACK_FAILURE;
1933       }
1934       dst[i] = codec->m_dec_idx[*ret][src[i]];
1935     } else {
1936       dst[i] = codec->m_dec_idx[0][src[i]];
1937     }
1938   }
1939 
1940   return UNPACK_SUCCESS;
1941 }
1942 
1943 /*
1944   Function of type rdb_make_unpack_info_t
1945 
1946   @detail
1947     Make unpack_data for VARCHAR(n) in a "simple" charset.
1948 */
1949 
1950 static void
rdb_make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)1951 rdb_make_unpack_simple_varchar(const Rdb_collation_codec *const codec,
1952                                const Field *const field,
1953                                Rdb_pack_field_context *const pack_ctx) {
1954   const auto f = static_cast<const Field_varstring *>(field);
1955   uchar *const src = f->ptr + f->length_bytes;
1956   const size_t src_len =
1957       f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
1958   Rdb_bit_writer bit_writer(pack_ctx->writer);
1959   // The std::min compares characters with bytes, but for simple collations,
1960   // mbmaxlen = 1.
1961   rdb_write_unpack_simple(&bit_writer, codec, src,
1962                           std::min((size_t)f->char_length(), src_len));
1963 }
1964 
1965 /*
1966   Function of type rdb_index_field_unpack_t
1967 
1968   @seealso
1969     rdb_pack_with_varchar_space_pad - packing function
1970     rdb_unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
1971 */
1972 
rdb_unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)1973 int rdb_unpack_simple_varchar_space_pad(Rdb_field_packing *const fpi,
1974                                         Field *const field, uchar *dst,
1975                                         Rdb_string_reader *const reader,
1976                                         Rdb_string_reader *const unp_reader) {
1977   const uchar *ptr;
1978   size_t len = 0;
1979   bool finished = false;
1980   uchar *d0 = dst;
1981   const Field_varstring *const field_var =
1982       static_cast<Field_varstring *>(field);
1983   // For simple collations, char_length is also number of bytes.
1984   DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
1985   uchar *dst_end = dst + field_var->pack_length();
1986   dst += field_var->length_bytes;
1987   Rdb_bit_reader bit_reader(unp_reader);
1988 
1989   uint space_padding_bytes = 0;
1990   uint extra_spaces;
1991   DBUG_ASSERT(unp_reader != nullptr);
1992 
1993   if ((fpi->m_unpack_info_uses_two_bytes
1994            ? unp_reader->read_uint16(&extra_spaces)
1995            : unp_reader->read_uint8(&extra_spaces))) {
1996     return UNPACK_FAILURE;
1997   }
1998 
1999   if (extra_spaces <= 8) {
2000     space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2001     extra_spaces = 0;
2002   } else
2003     extra_spaces -= 8;
2004 
2005   space_padding_bytes *= fpi->space_xfrm_len;
2006 
2007   /* Decode the length-emitted encoding here */
2008   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2009     const char last_byte =
2010         ptr[fpi->m_segment_size - 1]; // number of padding bytes
2011     size_t used_bytes;
2012     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2013       // this is the last one
2014       if (space_padding_bytes > (fpi->m_segment_size - 1))
2015         return UNPACK_FAILURE; // Cannot happen, corrupted data
2016       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2017       finished = true;
2018     } else {
2019       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2020           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2021         return UNPACK_FAILURE;
2022       }
2023       used_bytes = fpi->m_segment_size - 1;
2024     }
2025 
2026     if (dst + used_bytes > dst_end) {
2027       // The value on disk is longer than the field definition allows?
2028       return UNPACK_FAILURE;
2029     }
2030 
2031     uint ret;
2032     if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2033                                       used_bytes, dst)) != UNPACK_SUCCESS) {
2034       return ret;
2035     }
2036 
2037     dst += used_bytes;
2038     len += used_bytes;
2039 
2040     if (finished) {
2041       if (extra_spaces) {
2042         if (dst + extra_spaces > dst_end)
2043           return UNPACK_FAILURE;
2044         // pad_char has a 1-byte form in all charsets that
2045         // are handled by rdb_init_collation_mapping.
2046         memset(dst, field_var->charset()->pad_char, extra_spaces);
2047         len += extra_spaces;
2048       }
2049       break;
2050     }
2051   }
2052 
2053   if (!finished)
2054     return UNPACK_FAILURE;
2055 
2056   /* Save the length */
2057   if (field_var->length_bytes == 1) {
2058     d0[0] = len;
2059   } else {
2060     DBUG_ASSERT(field_var->length_bytes == 2);
2061     int2store(d0, len);
2062   }
2063   return UNPACK_SUCCESS;
2064 }
2065 
2066 /*
2067   Function of type rdb_make_unpack_info_t
2068 
2069   @detail
2070     Make unpack_data for CHAR(n) value in a "simple" charset.
2071     It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2072 
2073   @seealso
2074     The VARCHAR variant is in rdb_make_unpack_simple_varchar
2075 */
2076 
rdb_make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2077 static void rdb_make_unpack_simple(const Rdb_collation_codec *const codec,
2078                                    const Field *const field,
2079                                    Rdb_pack_field_context *const pack_ctx) {
2080   const uchar *const src = field->ptr;
2081   Rdb_bit_writer bit_writer(pack_ctx->writer);
2082   rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2083 }
2084 
2085 /*
2086   Function of type rdb_index_field_unpack_t
2087 */
2088 
rdb_unpack_simple(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2089 static int rdb_unpack_simple(Rdb_field_packing *const fpi,
2090                              Field *const field __attribute__((__unused__)),
2091                              uchar *const dst, Rdb_string_reader *const reader,
2092                              Rdb_string_reader *const unp_reader) {
2093   const uchar *ptr;
2094   const uint len = fpi->m_max_image_len;
2095   Rdb_bit_reader bit_reader(unp_reader);
2096 
2097   if (!(ptr = (const uchar *)reader->read(len))) {
2098     return UNPACK_FAILURE;
2099   }
2100 
2101   return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2102                                 fpi->m_charset_codec, ptr, len, dst);
2103 }
2104 
2105 // See Rdb_charset_space_info::spaces_xfrm
2106 const int RDB_SPACE_XFRM_SIZE = 32;
2107 
2108 // A class holding information about how space character is represented in a
2109 // charset.
2110 class Rdb_charset_space_info {
2111 public:
2112   Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
2113   Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
2114   Rdb_charset_space_info() = default;
2115 
2116   // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
2117   std::vector<uchar> spaces_xfrm;
2118 
2119   // length(strxfrm(' '))
2120   size_t space_xfrm_len;
2121 
2122   // length of the space character itself
2123   // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
2124   // (length=2)
2125   size_t space_mb_len;
2126 };
2127 
2128 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
2129     rdb_mem_comparable_space;
2130 
2131 /*
2132   @brief
2133   For a given charset, get
2134    - strxfrm('    '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
2135    - length of strxfrm(charset, ' ')
2136    - length of the space character in the charset
2137 
2138   @param cs  IN    Charset to get the space for
2139   @param ptr OUT   A few space characters
2140   @param len OUT   Return length of the space (in bytes)
2141 
2142   @detail
2143     It is tempting to pre-generate mem-comparable form of space character for
2144     every charset on server startup.
2145     One can't do that: some charsets are not initialized until somebody
2146     attempts to use them (e.g. create or open a table that has a field that
2147     uses the charset).
2148 */
2149 
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)2150 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
2151                                          const std::vector<uchar> **xfrm,
2152                                          size_t *const xfrm_len,
2153                                          size_t *const mb_len) {
2154   DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
2155   if (!rdb_mem_comparable_space[cs->number].get()) {
2156     mysql_mutex_lock(&rdb_mem_cmp_space_mutex);
2157     if (!rdb_mem_comparable_space[cs->number].get()) {
2158       // Upper bound of how many bytes can be occupied by multi-byte form of a
2159       // character in any charset.
2160       const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
2161       DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
2162 
2163       // multi-byte form of the ' ' (space) character
2164       uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
2165 
2166       const size_t space_mb_len = cs->cset->wc_mb(
2167           cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
2168 
2169       uchar space[20]; // mem-comparable image of the space character
2170 
2171       const size_t space_len = cs->coll->strnxfrm(cs, space, sizeof(space), 1,
2172                                                   space_mb, space_mb_len, 0);
2173       Rdb_charset_space_info *const info = new Rdb_charset_space_info;
2174       info->space_xfrm_len = space_len;
2175       info->space_mb_len = space_mb_len;
2176       while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
2177         info->spaces_xfrm.insert(info->spaces_xfrm.end(), space,
2178                                  space + space_len);
2179       }
2180       rdb_mem_comparable_space[cs->number].reset(info);
2181     }
2182     mysql_mutex_unlock(&rdb_mem_cmp_space_mutex);
2183   }
2184 
2185   *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
2186   *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
2187   *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
2188 }
2189 
2190 mysql_mutex_t rdb_mem_cmp_space_mutex;
2191 
2192 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
2193     rdb_collation_data;
2194 mysql_mutex_t rdb_collation_data_mutex;
2195 
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)2196 static bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
2197   return (cs->coll == &my_collation_8bit_simple_ci_handler);
2198 }
2199 
2200 static const Rdb_collation_codec *
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)2201 rdb_init_collation_mapping(const my_core::CHARSET_INFO *const cs) {
2202   DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
2203   const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
2204 
2205   if (codec == nullptr && rdb_is_collation_supported(cs)) {
2206     mysql_mutex_lock(&rdb_collation_data_mutex);
2207     codec = rdb_collation_data[cs->number];
2208     if (codec == nullptr) {
2209       Rdb_collation_codec *cur = nullptr;
2210 
2211       // Compute reverse mapping for simple collations.
2212       if (cs->coll == &my_collation_8bit_simple_ci_handler) {
2213         cur = new Rdb_collation_codec;
2214         std::map<uchar, std::vector<uchar>> rev_map;
2215         size_t max_conflict_size = 0;
2216         for (int src = 0; src < 256; src++) {
2217           uchar dst = cs->sort_order[src];
2218           rev_map[dst].push_back(src);
2219           max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
2220         }
2221         cur->m_dec_idx.resize(max_conflict_size);
2222 
2223         for (auto const &p : rev_map) {
2224           uchar dst = p.first;
2225           for (uint idx = 0; idx < p.second.size(); idx++) {
2226             uchar src = p.second[idx];
2227             uchar bits =
2228                 my_bit_log2(my_round_up_to_next_power(p.second.size()));
2229             cur->m_enc_idx[src] = idx;
2230             cur->m_enc_size[src] = bits;
2231             cur->m_dec_size[dst] = bits;
2232             cur->m_dec_idx[idx][dst] = src;
2233           }
2234         }
2235 
2236         cur->m_make_unpack_info_func = {
2237             {rdb_make_unpack_simple_varchar, rdb_make_unpack_simple}};
2238         cur->m_unpack_func = {
2239             {rdb_unpack_simple_varchar_space_pad, rdb_unpack_simple}};
2240       } else {
2241         // Out of luck for now.
2242       }
2243 
2244       if (cur != nullptr) {
2245         codec = cur;
2246         cur->m_cs = cs;
2247         rdb_collation_data[cs->number] = cur;
2248       }
2249     }
2250     mysql_mutex_unlock(&rdb_collation_data_mutex);
2251   }
2252 
2253   return codec;
2254 }
2255 
get_segment_size_from_collation(const CHARSET_INFO * const cs)2256 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
2257   int ret;
2258   if (cs == &my_charset_utf8mb4_bin || cs == &my_charset_utf16_bin ||
2259       cs == &my_charset_utf16le_bin || cs == &my_charset_utf32_bin) {
2260     /*
2261       In these collations, a character produces one weight, which is 3 bytes.
2262       Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
2263       get 3*3+1=10
2264     */
2265     ret = 10;
2266   } else {
2267     /*
2268       All other collations. There are two classes:
2269       - Unicode-based, except for collations mentioned in the if-condition.
2270         For these all weights are 2 bytes long, a character may produce 0..8
2271         weights.
2272         in any case, 8 bytes of payload in the segment guarantee that the last
2273         space character won't span across segments.
2274 
2275       - Collations not based on unicode. These have length(strxfrm(' '))=1,
2276         there nothing to worry about.
2277 
2278       In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
2279     */
2280     ret = 9;
2281   }
2282   DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
2283   return ret;
2284 }
2285 
2286 /*
2287   @brief
2288     Setup packing of index field into its mem-comparable form
2289 
2290   @detail
2291     - It is possible produce mem-comparable form for any datatype.
2292     - Some datatypes also allow to unpack the original value from its
2293       mem-comparable form.
2294       = Some of these require extra information to be stored in "unpack_info".
2295         unpack_info is not a part of mem-comparable form, it is only used to
2296         restore the original value
2297 
2298   @param
2299     field  IN  field to be packed/un-packed
2300 
2301   @return
2302     TRUE  -  Field can be read with index-only reads
2303     FALSE -  Otherwise
2304 */
2305 
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint & keynr_arg,const uint & key_part_arg,const uint16 & key_length)2306 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
2307                               const Field *const field, const uint &keynr_arg,
2308                               const uint &key_part_arg,
2309                               const uint16 &key_length) {
2310   int res = false;
2311   enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
2312 
2313   m_keynr = keynr_arg;
2314   m_key_part = key_part_arg;
2315 
2316   m_maybe_null = field ? field->real_maybe_null() : false;
2317   m_unpack_func = nullptr;
2318   m_make_unpack_info_func = nullptr;
2319   m_unpack_data_len = 0;
2320   space_xfrm = nullptr; // safety
2321 
2322   /* Calculate image length. By default, is is pack_length() */
2323   m_max_image_len =
2324       field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
2325   m_skip_func = rdb_skip_max_length;
2326   m_pack_func = rdb_pack_with_make_sort_key;
2327 
2328   switch (type) {
2329   case MYSQL_TYPE_LONGLONG:
2330   case MYSQL_TYPE_LONG:
2331   case MYSQL_TYPE_INT24:
2332   case MYSQL_TYPE_SHORT:
2333   case MYSQL_TYPE_TINY:
2334     m_unpack_func = rdb_unpack_integer;
2335     return true;
2336 
2337   case MYSQL_TYPE_DOUBLE:
2338     m_unpack_func = rdb_unpack_double;
2339     return true;
2340 
2341   case MYSQL_TYPE_FLOAT:
2342     m_unpack_func = rdb_unpack_float;
2343     return true;
2344 
2345   case MYSQL_TYPE_NEWDECIMAL:
2346   /*
2347     Decimal is packed with Field_new_decimal::make_sort_key, which just
2348     does memcpy.
2349     Unpacking decimal values was supported only after fix for issue#253,
2350     because of that ha_rocksdb::get_storage_type() handles decimal values
2351     in a special way.
2352   */
2353   case MYSQL_TYPE_DATETIME2:
2354   case MYSQL_TYPE_TIMESTAMP2:
2355   /* These are packed with Field_temporal_with_date_and_timef::make_sort_key */
2356   case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
2357   case MYSQL_TYPE_YEAR:  /* YEAR is packed with  Field_tiny::make_sort_key */
2358     /* Everything that comes here is packed with just a memcpy(). */
2359     m_unpack_func = rdb_unpack_binary_str;
2360     return true;
2361 
2362   case MYSQL_TYPE_NEWDATE:
2363     /*
2364       This is packed by Field_newdate::make_sort_key. It assumes the data is
2365       3 bytes, and packing is done by swapping the byte order (for both big-
2366       and little-endian)
2367     */
2368     m_unpack_func = rdb_unpack_newdate;
2369     return true;
2370   case MYSQL_TYPE_TINY_BLOB:
2371   case MYSQL_TYPE_MEDIUM_BLOB:
2372   case MYSQL_TYPE_LONG_BLOB:
2373   case MYSQL_TYPE_BLOB: {
2374     if (key_descr) {
2375       // The my_charset_bin collation is special in that it will consider
2376       // shorter strings sorting as less than longer strings.
2377       //
2378       // See Field_blob::make_sort_key for details.
2379       m_max_image_len =
2380           key_length + (field->charset() == &my_charset_bin
2381                             ? reinterpret_cast<const Field_blob *>(field)
2382                                   ->pack_length_no_ptr()
2383                             : 0);
2384       // Return false because indexes on text/blob will always require
2385       // a prefix. With a prefix, the optimizer will not be able to do an
2386       // index-only scan since there may be content occuring after the prefix
2387       // length.
2388       return false;
2389     }
2390   }
2391   default:
2392     break;
2393   }
2394 
2395   m_unpack_info_stores_value = false;
2396   /* Handle [VAR](CHAR|BINARY) */
2397 
2398   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
2399     /*
2400       For CHAR-based columns, check how strxfrm image will take.
2401       field->field_length = field->char_length() * cs->mbmaxlen.
2402     */
2403     const CHARSET_INFO *cs = field->charset();
2404     m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
2405   }
2406   const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
2407   const CHARSET_INFO *cs = field->charset();
2408   // max_image_len before chunking is taken into account
2409   const int max_image_len_before_chunks = m_max_image_len;
2410 
2411   if (is_varchar) {
2412     // The default for varchar is variable-length, without space-padding for
2413     // comparisons
2414     m_varchar_charset = cs;
2415     m_skip_func = rdb_skip_variable_length;
2416     m_pack_func = rdb_pack_with_varchar_encoding;
2417     m_max_image_len =
2418         (m_max_image_len / (RDB_ESCAPE_LENGTH - 1) + 1) * RDB_ESCAPE_LENGTH;
2419 
2420     const auto field_var = static_cast<const Field_varstring *>(field);
2421     m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
2422   }
2423 
2424   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
2425     // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
2426     // information about character-based datatypes are compared.
2427     bool use_unknown_collation = false;
2428     DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
2429                     use_unknown_collation = true;);
2430 
2431     if (cs == &my_charset_bin) {
2432       // - SQL layer pads BINARY(N) so that it always is N bytes long.
2433       // - For VARBINARY(N), values may have different lengths, so we're using
2434       //   variable-length encoding. This is also the only charset where the
2435       //   values are not space-padded for comparison.
2436       m_unpack_func = is_varchar ? rdb_unpack_binary_or_utf8_varchar
2437                                  : rdb_unpack_binary_str;
2438       res = true;
2439     } else if (cs == &my_charset_latin1_bin || cs == &my_charset_utf8_bin) {
2440       // For _bin collations, mem-comparable form of the string is the string
2441       // itself.
2442 
2443       if (is_varchar) {
2444         // VARCHARs - are compared as if they were space-padded - but are
2445         // not actually space-padded (reading the value back produces the
2446         // original value, without the padding)
2447         m_unpack_func = rdb_unpack_binary_or_utf8_varchar_space_pad;
2448         m_skip_func = rdb_skip_variable_space_pad;
2449         m_pack_func = rdb_pack_with_varchar_space_pad;
2450         m_make_unpack_info_func = rdb_dummy_make_unpack_info;
2451         m_segment_size = get_segment_size_from_collation(cs);
2452         m_max_image_len =
2453             (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
2454             m_segment_size;
2455         rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
2456                                      &space_mb_len);
2457       } else {
2458         // SQL layer pads CHAR(N) values to their maximum length.
2459         // We just store that and restore it back.
2460         m_unpack_func = (cs == &my_charset_latin1_bin) ? rdb_unpack_binary_str
2461                                                        : rdb_unpack_utf8_str;
2462       }
2463       res = true;
2464     } else {
2465       // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
2466 
2467       res = true; // index-only scans are possible
2468       m_unpack_data_len = is_varchar ? 0 : field->field_length;
2469       const uint idx = is_varchar ? 0 : 1;
2470       const Rdb_collation_codec *codec = nullptr;
2471 
2472       if (is_varchar) {
2473         // VARCHAR requires space-padding for doing comparisons
2474         //
2475         // The check for cs->levels_for_order is to catch
2476         // latin2_czech_cs and cp1250_czech_cs - multi-level collations
2477         // that Variable-Length Space Padded Encoding can't handle.
2478         // It is not expected to work for any other multi-level collations,
2479         // either.
2480         // Currently we handle these collations as NO_PAD, even if they have
2481         // PAD_SPACE attribute.
2482         if (cs->levels_for_order == 1) {
2483           m_pack_func = rdb_pack_with_varchar_space_pad;
2484           m_skip_func = rdb_skip_variable_space_pad;
2485           m_segment_size = get_segment_size_from_collation(cs);
2486           m_max_image_len =
2487               (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
2488               m_segment_size;
2489           rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
2490                                        &space_mb_len);
2491         } else {
2492           //  NO_LINT_DEBUG
2493           sql_print_warning("RocksDB: you're trying to create an index "
2494                             "with a multi-level collation %s",
2495                             cs->name);
2496           //  NO_LINT_DEBUG
2497           sql_print_warning("MyRocks will handle this collation internally "
2498                             " as if it had a NO_PAD attribute.");
2499           m_pack_func = rdb_pack_with_varchar_encoding;
2500           m_skip_func = rdb_skip_variable_length;
2501         }
2502       }
2503 
2504       if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
2505         // The collation allows to store extra information in the unpack_info
2506         // which can be used to restore the original value from the
2507         // mem-comparable form.
2508         m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
2509         m_unpack_func = codec->m_unpack_func[idx];
2510         m_charset_codec = codec;
2511       } else if (use_unknown_collation) {
2512         // We have no clue about how this collation produces mem-comparable
2513         // form. Our way of restoring the original value is to keep a copy of
2514         // the original value in unpack_info.
2515         m_unpack_info_stores_value = true;
2516         m_make_unpack_info_func = is_varchar ? rdb_make_unpack_unknown_varchar
2517                                              : rdb_make_unpack_unknown;
2518         m_unpack_func =
2519             is_varchar ? rdb_unpack_unknown_varchar : rdb_unpack_unknown;
2520       } else {
2521         // Same as above: we don't know how to restore the value from its
2522         // mem-comparable form.
2523         // Here, we just indicate to the SQL layer we can't do it.
2524         DBUG_ASSERT(m_unpack_func == nullptr);
2525         m_unpack_info_stores_value = false;
2526         res = false; // Indicate that index-only reads are not possible
2527       }
2528     }
2529 
2530     // Make an adjustment: unpacking partially covered columns is not
2531     // possible. field->table is populated when called through
2532     // Rdb_key_def::setup, but not during ha_rocksdb::index_flags.
2533     if (field->table) {
2534       // Get the original Field object and compare lengths. If this key part is
2535       // a prefix of a column, then we can't do index-only scans.
2536       if (field->table->field[field->field_index]->field_length != key_length) {
2537         m_unpack_func = nullptr;
2538         m_make_unpack_info_func = nullptr;
2539         m_unpack_info_stores_value = true;
2540         res = false;
2541       }
2542     } else {
2543       if (field->field_length != key_length) {
2544         m_unpack_func = nullptr;
2545         m_make_unpack_info_func = nullptr;
2546         m_unpack_info_stores_value = true;
2547         res = false;
2548       }
2549     }
2550   }
2551   return res;
2552 }
2553 
get_field_in_table(const TABLE * const tbl) const2554 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
2555   return tbl->key_info[m_keynr].key_part[m_key_part].field;
2556 }
2557 
fill_hidden_pk_val(uchar ** dst,const longlong & hidden_pk_id) const2558 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
2559                                            const longlong &hidden_pk_id) const {
2560   DBUG_ASSERT(m_max_image_len == 8);
2561 
2562   String to;
2563   rdb_netstr_append_uint64(&to, hidden_pk_id);
2564   memcpy(*dst, to.ptr(), m_max_image_len);
2565 
2566   *dst += m_max_image_len;
2567 }
2568 
2569 ///////////////////////////////////////////////////////////////////////////////////////////
2570 // Rdb_ddl_manager
2571 ///////////////////////////////////////////////////////////////////////////////////////////
2572 
~Rdb_tbl_def()2573 Rdb_tbl_def::~Rdb_tbl_def() {
2574   auto ddl_manager = rdb_get_ddl_manager();
2575   /* Don't free key definitions */
2576   if (m_key_descr_arr) {
2577     for (uint i = 0; i < m_key_count; i++) {
2578       if (ddl_manager && m_key_descr_arr[i]) {
2579         ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
2580       }
2581 
2582       m_key_descr_arr[i] = nullptr;
2583     }
2584 
2585     delete[] m_key_descr_arr;
2586     m_key_descr_arr = nullptr;
2587   }
2588 }
2589 
2590 /*
2591   Put table definition DDL entry. Actual write is done at
2592   Rdb_dict_manager::commit.
2593 
2594   We write
2595     dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
2596 
2597   Where key entries are a tuple of
2598     ( cf_id, index_nr )
2599 */
2600 
put_dict(Rdb_dict_manager * const dict,rocksdb::WriteBatch * const batch,uchar * const key,const size_t & keylen)2601 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
2602                            rocksdb::WriteBatch *const batch, uchar *const key,
2603                            const size_t &keylen) {
2604   StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
2605   indexes.alloc(Rdb_key_def::VERSION_SIZE +
2606                 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
2607   rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
2608 
2609   for (uint i = 0; i < m_key_count; i++) {
2610     const Rdb_key_def &kd = *m_key_descr_arr[i];
2611 
2612     const uchar flags =
2613         (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
2614         (kd.m_is_auto_cf ? Rdb_key_def::AUTO_CF_FLAG : 0);
2615 
2616     const uint cf_id = kd.get_cf()->GetID();
2617     /*
2618       If cf_id already exists, cf_flags must be the same.
2619       To prevent race condition, reading/modifying/committing CF flags
2620       need to be protected by mutex (dict_manager->lock()).
2621       When RocksDB supports transaction with pessimistic concurrency
2622       control, we can switch to use it and removing mutex.
2623     */
2624     uint existing_cf_flags;
2625     if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
2626       if (existing_cf_flags != flags) {
2627         my_printf_error(ER_UNKNOWN_ERROR,
2628                         "Column Family Flag is different from existing flag. "
2629                         "Assign a new CF flag, or do not change existing "
2630                         "CF flag.",
2631                         MYF(0));
2632         return true;
2633       }
2634     } else {
2635       dict->add_cf_flags(batch, cf_id, flags);
2636     }
2637 
2638     rdb_netstr_append_uint32(&indexes, cf_id);
2639     rdb_netstr_append_uint32(&indexes, kd.m_index_number);
2640     dict->add_or_update_index_cf_mapping(batch, kd.m_index_type,
2641                                          kd.m_kv_format_version,
2642                                          kd.m_index_number, cf_id);
2643   }
2644 
2645   const rocksdb::Slice skey((char *)key, keylen);
2646   const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
2647 
2648   dict->put_key(batch, skey, svalue);
2649   return false;
2650 }
2651 
check_if_is_mysql_system_table()2652 void Rdb_tbl_def::check_if_is_mysql_system_table() {
2653   static const char *const system_dbs[] = {
2654       "mysql", "performance_schema", "information_schema",
2655   };
2656 
2657   m_is_mysql_system_table = false;
2658   for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
2659     if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
2660       m_is_mysql_system_table = true;
2661       break;
2662     }
2663   }
2664 }
2665 
set_name(const std::string & name)2666 void Rdb_tbl_def::set_name(const std::string &name) {
2667   int err __attribute__((__unused__));
2668 
2669   m_dbname_tablename = name;
2670   err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
2671                                        &m_partition);
2672   DBUG_ASSERT(err == 0);
2673 
2674   check_if_is_mysql_system_table();
2675 }
2676 
2677 /*
2678   Static function of type my_hash_get_key that gets invoked by
2679   the m_ddl_hash object of type my_core::HASH.
2680   It manufactures a key (db+table name in our case) from a record
2681   (Rdb_tbl_def in our case).
2682 */
get_hash_key(Rdb_tbl_def * const rec,size_t * const length,my_bool not_used)2683 const uchar *Rdb_ddl_manager::get_hash_key(Rdb_tbl_def *const rec,
2684                                            size_t *const length,
2685                                            my_bool not_used
2686                                            __attribute__((__unused__))) {
2687   const std::string &dbname_tablename = rec->full_tablename();
2688   *length = dbname_tablename.size();
2689   return reinterpret_cast<const uchar *>(dbname_tablename.c_str());
2690 }
2691 
2692 /*
2693   Static function of type void (*my_hash_free_element_func_t)(void*) that gets
2694   invoked by the m_ddl_hash object of type my_core::HASH.
2695   It deletes a record (Rdb_tbl_def in our case).
2696 */
free_hash_elem(void * const data)2697 void Rdb_ddl_manager::free_hash_elem(void *const data) {
2698   Rdb_tbl_def *elem = reinterpret_cast<Rdb_tbl_def *>(data);
2699   delete elem;
2700 }
2701 
erase_index_num(const GL_INDEX_ID & gl_index_id)2702 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
2703   m_index_num_to_keydef.erase(gl_index_id);
2704 }
2705 
2706 namespace // anonymous namespace = not visible outside this source file
2707 {
2708 struct Rdb_validate_tbls : public Rdb_tables_scanner {
2709   using tbl_info_t = std::pair<std::string, bool>;
2710   using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
2711 
2712   tbl_list_t m_list;
2713 
2714   int add_table(Rdb_tbl_def *tdef) override;
2715 
2716   bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
2717 
2718   bool scan_for_frms(const std::string &datadir, const std::string &dbname,
2719                      bool *has_errors);
2720 
2721   bool check_frm_file(const std::string &fullpath, const std::string &dbname,
2722                       const std::string &tablename, bool *has_errors);
2723 };
2724 } // anonymous namespace
2725 
2726 /*
2727   Get a list of tables that we expect to have .frm files for.  This will use the
2728   information just read from the RocksDB data dictionary.
2729 */
add_table(Rdb_tbl_def * tdef)2730 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
2731   DBUG_ASSERT(tdef != nullptr);
2732 
2733   /* Add the database/table into the list */
2734   bool is_partition = tdef->base_partition().size() != 0;
2735   m_list[tdef->base_dbname()].insert(
2736       tbl_info_t(tdef->base_tablename(), is_partition));
2737 
2738   return HA_EXIT_SUCCESS;
2739 }
2740 
2741 /*
2742   Access the .frm file for this dbname/tablename and see if it is a RocksDB
2743   table (or partition table).
2744 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)2745 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
2746                                        const std::string &dbname,
2747                                        const std::string &tablename,
2748                                        bool *has_errors) {
2749   /* Check this .frm file to see what engine it uses */
2750   String fullfilename(fullpath.c_str(), &my_charset_bin);
2751   fullfilename.append(FN_DIRSEP);
2752   fullfilename.append(tablename.c_str());
2753   fullfilename.append(".frm");
2754 
2755   /*
2756     This function will return the legacy_db_type of the table.  Currently
2757     it does not reference the first parameter (THD* thd), but if it ever
2758     did in the future we would need to make a version that does it without
2759     the connection handle as we don't have one here.
2760   */
2761   enum legacy_db_type eng_type;
2762   frm_type_enum type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type);
2763   if (type == FRMTYPE_ERROR) {
2764     sql_print_warning("RocksDB: Failed to open/read .from file: %s",
2765                       fullfilename.ptr());
2766     return false;
2767   }
2768 
2769   if (type == FRMTYPE_TABLE) {
2770     /* For a RocksDB table do we have a reference in the data dictionary? */
2771     if (eng_type == DB_TYPE_ROCKSDB) {
2772       /*
2773         Attempt to remove the table entry from the list of tables.  If this
2774         fails then we know we had a .frm file that wasn't registered in RocksDB.
2775       */
2776       tbl_info_t element(tablename, false);
2777       if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
2778         sql_print_warning("RocksDB: Schema mismatch - "
2779                           "A .frm file exists for table %s.%s, "
2780                           "but that table is not registered in RocksDB",
2781                           dbname.c_str(), tablename.c_str());
2782         *has_errors = true;
2783       }
2784     } else if (eng_type == DB_TYPE_PARTITION_DB) {
2785       /*
2786         For partition tables, see if it is in the m_list as a partition,
2787         but don't generate an error if it isn't there - we don't know that the
2788         .frm is for RocksDB.
2789       */
2790       if (m_list.count(dbname) > 0) {
2791         m_list[dbname].erase(tbl_info_t(tablename, true));
2792       }
2793     }
2794   }
2795 
2796   return true;
2797 }
2798 
2799 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)2800 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
2801                                       const std::string &dbname,
2802                                       bool *has_errors) {
2803   bool result = true;
2804   std::string fullpath = datadir + dbname;
2805   struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
2806 
2807   /* Access the directory */
2808   if (dir_info == nullptr) {
2809     sql_print_warning("RocksDB: Could not open database directory: %s",
2810                       fullpath.c_str());
2811     return false;
2812   }
2813 
2814   /* Scan through the files in the directory */
2815   struct fileinfo *file_info = dir_info->dir_entry;
2816   for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
2817     /* Find .frm files that are not temp files (those that start with '#') */
2818     const char *ext = strrchr(file_info->name, '.');
2819     if (ext != nullptr && !is_prefix(file_info->name, tmp_file_prefix) &&
2820         strcmp(ext, ".frm") == 0) {
2821       std::string tablename =
2822           std::string(file_info->name, ext - file_info->name);
2823 
2824       /* Check to see if the .frm file is from RocksDB */
2825       if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
2826         result = false;
2827         break;
2828       }
2829     }
2830   }
2831 
2832   /* Remove any databases who have no more tables listed */
2833   if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
2834     m_list.erase(dbname);
2835   }
2836 
2837   /* Release the directory entry */
2838   my_dirend(dir_info);
2839 
2840   return result;
2841 }
2842 
2843 /*
2844   Scan the datadir for all databases (subdirectories) and get a list of .frm
2845   files they contain
2846 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)2847 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
2848                                                  bool *has_errors) {
2849   bool result = true;
2850   struct st_my_dir *dir_info;
2851   struct fileinfo *file_info;
2852 
2853   dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
2854   if (dir_info == nullptr) {
2855     sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
2856     return false;
2857   }
2858 
2859   file_info = dir_info->dir_entry;
2860   for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
2861     /* Ignore files/dirs starting with '.' */
2862     if (file_info->name[0] == '.')
2863       continue;
2864 
2865     /* Ignore all non-directory files */
2866     if (!MY_S_ISDIR(file_info->mystat->st_mode))
2867       continue;
2868 
2869     /* Scan all the .frm files in the directory */
2870     if (!scan_for_frms(datadir, file_info->name, has_errors)) {
2871       result = false;
2872       break;
2873     }
2874   }
2875 
2876   /* Release the directory info */
2877   my_dirend(dir_info);
2878 
2879   return result;
2880 }
2881 
2882 /*
2883   Validate that all the tables in the RocksDB database dictionary match the .frm
2884   files in the datdir
2885 */
validate_schemas(void)2886 bool Rdb_ddl_manager::validate_schemas(void) {
2887   bool has_errors = false;
2888   const std::string datadir = std::string(mysql_real_data_home);
2889   Rdb_validate_tbls table_list;
2890 
2891   /* Get the list of tables from the database dictionary */
2892   if (scan_for_tables(&table_list) != 0) {
2893     return false;
2894   }
2895 
2896   /* Compare that to the list of actual .frm files */
2897   if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
2898     return false;
2899   }
2900 
2901   /*
2902     Any tables left in the tables list are ones that are registered in RocksDB
2903     but don't have .frm files.
2904   */
2905   for (const auto &db : table_list.m_list) {
2906     for (const auto &table : db.second) {
2907       sql_print_warning("RocksDB: Schema mismatch - "
2908                         "Table %s.%s is registered in RocksDB "
2909                         "but does not have a .frm file",
2910                         db.first.c_str(), table.first.c_str());
2911       has_errors = true;
2912     }
2913   }
2914 
2915   return !has_errors;
2916 }
2917 
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t & validate_tables)2918 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
2919                            Rdb_cf_manager *const cf_manager,
2920                            const uint32_t &validate_tables) {
2921   const ulong TABLE_HASH_SIZE = 32;
2922   m_dict = dict_arg;
2923   mysql_rwlock_init(0, &m_rwlock);
2924   (void)my_hash_init(&m_ddl_hash,
2925                      /*system_charset_info*/ &my_charset_bin, TABLE_HASH_SIZE,
2926                      0, 0, (my_hash_get_key)Rdb_ddl_manager::get_hash_key,
2927                      Rdb_ddl_manager::free_hash_elem, 0);
2928 
2929   /* Read the data dictionary and populate the hash */
2930   uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
2931   rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
2932   const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
2933                                        Rdb_key_def::INDEX_NUMBER_SIZE);
2934 
2935   /* Reading data dictionary should always skip bloom filter */
2936   rocksdb::Iterator *it = m_dict->new_iterator();
2937   int i = 0;
2938 
2939   uint max_index_id_in_dict = 0;
2940   m_dict->get_max_index_id(&max_index_id_in_dict);
2941 
2942   for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
2943     const uchar *ptr;
2944     const uchar *ptr_end;
2945     const rocksdb::Slice key = it->key();
2946     const rocksdb::Slice val = it->value();
2947 
2948     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
2949         memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE))
2950       break;
2951 
2952     if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
2953       sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
2954                       (int)key.size());
2955       return true;
2956     }
2957 
2958     Rdb_tbl_def *const tdef =
2959         new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
2960 
2961     // Now, read the DDLs.
2962     const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
2963     if (real_val_size % Rdb_key_def::PACKED_SIZE * 2) {
2964       sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
2965                       tdef->full_tablename().c_str());
2966       return true;
2967     }
2968     tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
2969     tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
2970 
2971     ptr = reinterpret_cast<const uchar *>(val.data());
2972     const int version = rdb_netbuf_read_uint16(&ptr);
2973     if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
2974       sql_print_error("RocksDB: DDL ENTRY Version was not expected."
2975                       "Expected: %d, Actual: %d",
2976                       Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
2977       return true;
2978     }
2979     ptr_end = ptr + real_val_size;
2980     for (uint keyno = 0; ptr < ptr_end; keyno++) {
2981       GL_INDEX_ID gl_index_id;
2982       rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
2983       uint16 m_index_dict_version = 0;
2984       uchar m_index_type = 0;
2985       uint16 kv_version = 0;
2986       uint flags = 0;
2987       if (!m_dict->get_index_info(gl_index_id, &m_index_dict_version,
2988                                   &m_index_type, &kv_version)) {
2989         sql_print_error("RocksDB: Could not get index information "
2990                         "for Index Number (%u,%u), table %s",
2991                         gl_index_id.cf_id, gl_index_id.index_id,
2992                         tdef->full_tablename().c_str());
2993         return true;
2994       }
2995       if (max_index_id_in_dict < gl_index_id.index_id) {
2996         sql_print_error("RocksDB: Found max index id %u from data dictionary "
2997                         "but also found larger index id %u from dictionary. "
2998                         "This should never happen and possibly a bug.",
2999                         max_index_id_in_dict, gl_index_id.index_id);
3000         return true;
3001       }
3002       if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
3003         sql_print_error("RocksDB: Could not get Column Family Flags "
3004                         "for CF Number %d, table %s",
3005                         gl_index_id.cf_id, tdef->full_tablename().c_str());
3006         return true;
3007       }
3008 
3009       rocksdb::ColumnFamilyHandle *const cfh =
3010           cf_manager->get_cf(gl_index_id.cf_id);
3011       DBUG_ASSERT(cfh != nullptr);
3012 
3013       /*
3014         We can't fully initialize Rdb_key_def object here, because full
3015         initialization requires that there is an open TABLE* where we could
3016         look at Field* objects and set max_length and other attributes
3017       */
3018       tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
3019           gl_index_id.index_id, keyno, cfh, m_index_dict_version, m_index_type,
3020           kv_version, flags & Rdb_key_def::REVERSE_CF_FLAG,
3021           flags & Rdb_key_def::AUTO_CF_FLAG, "",
3022           m_dict->get_stats(gl_index_id));
3023     }
3024     put(tdef);
3025     i++;
3026   }
3027 
3028   /*
3029     If validate_tables is greater than 0 run the validation.  Only fail the
3030     initialzation if the setting is 1.  If the setting is 2 we continue.
3031   */
3032   if (validate_tables > 0 && !validate_schemas()) {
3033     if (validate_tables == 1) {
3034       sql_print_error("RocksDB: Problems validating data dictionary "
3035                       "against .frm files, exiting");
3036       return true;
3037     }
3038   }
3039 
3040   // index ids used by applications should not conflict with
3041   // data dictionary index ids
3042   if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
3043     max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
3044   }
3045 
3046   m_sequence.init(max_index_id_in_dict + 1);
3047 
3048   if (!it->status().ok()) {
3049     const std::string s = it->status().ToString();
3050     sql_print_error("RocksDB: Table_store: load error: %s", s.c_str());
3051     return true;
3052   }
3053   delete it;
3054   sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
3055                         i);
3056   return false;
3057 }
3058 
find(const std::string & table_name,const bool & lock)3059 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
3060                                    const bool &lock) {
3061   if (lock) {
3062     mysql_rwlock_rdlock(&m_rwlock);
3063   }
3064 
3065   Rdb_tbl_def *const rec = reinterpret_cast<Rdb_tbl_def *>(my_hash_search(
3066       &m_ddl_hash, reinterpret_cast<const uchar *>(table_name.c_str()),
3067       table_name.size()));
3068 
3069   if (lock) {
3070     mysql_rwlock_unlock(&m_rwlock);
3071   }
3072 
3073   return rec;
3074 }
3075 
3076 // this is a safe version of the find() function below.  It acquires a read
3077 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
3078 // are finding it.  Copying it into 'ret' increments the count making sure
3079 // that the object will not be discarded until we are finished with it.
3080 std::shared_ptr<const Rdb_key_def>
safe_find(GL_INDEX_ID gl_index_id)3081 Rdb_ddl_manager::safe_find(GL_INDEX_ID gl_index_id) {
3082   std::shared_ptr<const Rdb_key_def> ret(nullptr);
3083 
3084   mysql_rwlock_rdlock(&m_rwlock);
3085 
3086   auto it = m_index_num_to_keydef.find(gl_index_id);
3087   if (it != m_index_num_to_keydef.end()) {
3088     const auto table_def = find(it->second.first, false);
3089     if (table_def && it->second.second < table_def->m_key_count) {
3090       const auto &kd = table_def->m_key_descr_arr[it->second.second];
3091       if (kd->max_storage_fmt_length() != 0) {
3092         ret = kd;
3093       }
3094     }
3095   }
3096 
3097   mysql_rwlock_unlock(&m_rwlock);
3098 
3099   return ret;
3100 }
3101 
3102 // this method assumes at least read-only lock on m_rwlock
3103 const std::shared_ptr<Rdb_key_def> &
find(GL_INDEX_ID gl_index_id)3104 Rdb_ddl_manager::find(GL_INDEX_ID gl_index_id) {
3105   auto it = m_index_num_to_keydef.find(gl_index_id);
3106   if (it != m_index_num_to_keydef.end()) {
3107     auto table_def = find(it->second.first, false);
3108     if (table_def) {
3109       if (it->second.second < table_def->m_key_count) {
3110         return table_def->m_key_descr_arr[it->second.second];
3111       }
3112     }
3113   }
3114 
3115   static std::shared_ptr<Rdb_key_def> empty = nullptr;
3116 
3117   return empty;
3118 }
3119 
set_stats(const std::unordered_map<GL_INDEX_ID,Rdb_index_stats> & stats)3120 void Rdb_ddl_manager::set_stats(
3121     const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
3122   mysql_rwlock_wrlock(&m_rwlock);
3123   for (auto src : stats) {
3124     const auto &keydef = find(src.second.m_gl_index_id);
3125     if (keydef) {
3126       keydef->m_stats = src.second;
3127       m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
3128     }
3129   }
3130   mysql_rwlock_unlock(&m_rwlock);
3131 }
3132 
adjust_stats(const std::vector<Rdb_index_stats> & new_data,const std::vector<Rdb_index_stats> & deleted_data)3133 void Rdb_ddl_manager::adjust_stats(
3134     const std::vector<Rdb_index_stats> &new_data,
3135     const std::vector<Rdb_index_stats> &deleted_data) {
3136   mysql_rwlock_wrlock(&m_rwlock);
3137   int i = 0;
3138   for (const auto &data : {new_data, deleted_data}) {
3139     for (const auto &src : data) {
3140       const auto &keydef = find(src.m_gl_index_id);
3141       if (keydef) {
3142         keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
3143         m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
3144       }
3145     }
3146     i++;
3147   }
3148   const bool should_save_stats = !m_stats2store.empty();
3149   mysql_rwlock_unlock(&m_rwlock);
3150   if (should_save_stats) {
3151     // Queue an async persist_stats(false) call to the background thread.
3152     rdb_queue_save_stats_request();
3153   }
3154 }
3155 
persist_stats(const bool & sync)3156 void Rdb_ddl_manager::persist_stats(const bool &sync) {
3157   mysql_rwlock_wrlock(&m_rwlock);
3158   const auto local_stats2store = std::move(m_stats2store);
3159   m_stats2store.clear();
3160   mysql_rwlock_unlock(&m_rwlock);
3161 
3162   // Persist stats
3163   const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
3164   std::vector<Rdb_index_stats> stats;
3165   std::transform(local_stats2store.begin(), local_stats2store.end(),
3166                  std::back_inserter(stats),
3167                  [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
3168                    return s.second;
3169                  });
3170   m_dict->add_stats(wb.get(), stats);
3171   m_dict->commit(wb.get(), sync);
3172 }
3173 
3174 /*
3175   Put table definition of `tbl` into the mapping, and also write it to the
3176   on-disk data dictionary.
3177 */
3178 
put_and_write(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch)3179 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
3180                                    rocksdb::WriteBatch *const batch) {
3181   uchar buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
3182   uint pos = 0;
3183 
3184   rdb_netbuf_store_index(buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
3185   pos += Rdb_key_def::INDEX_NUMBER_SIZE;
3186 
3187   const std::string &dbname_tablename = tbl->full_tablename();
3188   memcpy(buf + pos, dbname_tablename.c_str(), dbname_tablename.size());
3189   pos += dbname_tablename.size();
3190 
3191   int res;
3192   if ((res = tbl->put_dict(m_dict, batch, buf, pos))) {
3193     return res;
3194   }
3195   if ((res = put(tbl))) {
3196     return res;
3197   }
3198   return HA_EXIT_SUCCESS;
3199 }
3200 
3201 /* Return 0 - ok, other value - error */
3202 /* TODO:
3203   This function modifies m_ddl_hash and m_index_num_to_keydef.
3204   However, these changes need to be reversed if dict_manager.commit fails
3205   See the discussion here: https://reviews.facebook.net/D35925#inline-259167
3206   Tracked by https://github.com/facebook/mysql-5.6/issues/33
3207 */
put(Rdb_tbl_def * const tbl,const bool & lock)3208 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool &lock) {
3209   Rdb_tbl_def *rec;
3210   my_bool result;
3211   const std::string &dbname_tablename = tbl->full_tablename();
3212 
3213   if (lock)
3214     mysql_rwlock_wrlock(&m_rwlock);
3215 
3216   // We have to do this find because 'tbl' is not yet in the list.  We need
3217   // to find the one we are replacing ('rec')
3218   rec = find(dbname_tablename, false);
3219   if (rec) {
3220     // this will free the old record.
3221     my_hash_delete(&m_ddl_hash, reinterpret_cast<uchar *>(rec));
3222   }
3223   result = my_hash_insert(&m_ddl_hash, reinterpret_cast<uchar *>(tbl));
3224 
3225   for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
3226     m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
3227         std::make_pair(dbname_tablename, keyno);
3228   }
3229 
3230   if (lock)
3231     mysql_rwlock_unlock(&m_rwlock);
3232   return result;
3233 }
3234 
remove(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch,const bool & lock)3235 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
3236                              rocksdb::WriteBatch *const batch,
3237                              const bool &lock) {
3238   if (lock)
3239     mysql_rwlock_wrlock(&m_rwlock);
3240 
3241   uchar buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
3242   uint pos = 0;
3243 
3244   rdb_netbuf_store_index(buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
3245   pos += Rdb_key_def::INDEX_NUMBER_SIZE;
3246 
3247   const std::string &dbname_tablename = tbl->full_tablename();
3248   memcpy(buf + pos, dbname_tablename.c_str(), dbname_tablename.size());
3249   pos += dbname_tablename.size();
3250 
3251   const rocksdb::Slice tkey((char *)buf, pos);
3252   m_dict->delete_key(batch, tkey);
3253 
3254   /* The following will also delete the object: */
3255   my_hash_delete(&m_ddl_hash, reinterpret_cast<uchar *>(tbl));
3256 
3257   if (lock)
3258     mysql_rwlock_unlock(&m_rwlock);
3259 }
3260 
rename(const std::string & from,const std::string & to,rocksdb::WriteBatch * const batch)3261 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
3262                              rocksdb::WriteBatch *const batch) {
3263   Rdb_tbl_def *rec;
3264   Rdb_tbl_def *new_rec;
3265   bool res = true;
3266   uchar new_buf[FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE];
3267   uint new_pos = 0;
3268 
3269   mysql_rwlock_wrlock(&m_rwlock);
3270   if (!(rec = find(from, false))) {
3271     mysql_rwlock_unlock(&m_rwlock);
3272     return true;
3273   }
3274 
3275   new_rec = new Rdb_tbl_def(to);
3276 
3277   new_rec->m_key_count = rec->m_key_count;
3278   new_rec->m_auto_incr_val =
3279       rec->m_auto_incr_val.load(std::memory_order_relaxed);
3280   new_rec->m_key_descr_arr = rec->m_key_descr_arr;
3281   // so that it's not free'd when deleting the old rec
3282   rec->m_key_descr_arr = nullptr;
3283 
3284   // Create a new key
3285   rdb_netbuf_store_index(new_buf, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
3286   new_pos += Rdb_key_def::INDEX_NUMBER_SIZE;
3287 
3288   const std::string &dbname_tablename = new_rec->full_tablename();
3289   memcpy(new_buf + new_pos, dbname_tablename.c_str(), dbname_tablename.size());
3290   new_pos += dbname_tablename.size();
3291 
3292   // Create a key to add
3293   if (!new_rec->put_dict(m_dict, batch, new_buf, new_pos)) {
3294     remove(rec, batch, false);
3295     put(new_rec, false);
3296     res = false; // ok
3297   }
3298 
3299   mysql_rwlock_unlock(&m_rwlock);
3300   return res;
3301 }
3302 
cleanup()3303 void Rdb_ddl_manager::cleanup() {
3304   my_hash_free(&m_ddl_hash);
3305   mysql_rwlock_destroy(&m_rwlock);
3306   m_sequence.cleanup();
3307 }
3308 
scan_for_tables(Rdb_tables_scanner * const tables_scanner)3309 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
3310   int i, ret;
3311   Rdb_tbl_def *rec;
3312 
3313   DBUG_ASSERT(tables_scanner != nullptr);
3314 
3315   mysql_rwlock_rdlock(&m_rwlock);
3316 
3317   ret = 0;
3318   i = 0;
3319 
3320   while ((
3321       rec = reinterpret_cast<Rdb_tbl_def *>(my_hash_element(&m_ddl_hash, i)))) {
3322     ret = tables_scanner->add_table(rec);
3323     if (ret)
3324       break;
3325     i++;
3326   }
3327 
3328   mysql_rwlock_unlock(&m_rwlock);
3329   return ret;
3330 }
3331 
init(rocksdb::DB * const rdb_dict,Rdb_cf_manager * const cf_manager)3332 bool Rdb_dict_manager::init(rocksdb::DB *const rdb_dict,
3333                             Rdb_cf_manager *const cf_manager) {
3334   DBUG_ASSERT(rdb_dict != nullptr);
3335   DBUG_ASSERT(cf_manager != nullptr);
3336 
3337   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
3338 
3339   m_db = rdb_dict;
3340   bool is_automatic;
3341 
3342   m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME, "",
3343                                               nullptr, &is_automatic);
3344 
3345   rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
3346 
3347   m_key_slice_max_index_id =
3348       rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
3349                      Rdb_key_def::INDEX_NUMBER_SIZE);
3350 
3351   resume_drop_indexes();
3352   rollback_ongoing_index_creation();
3353 
3354   // If system CF was created then we need to set its flags as well to make
3355   // sure that CF is properly initialized.
3356   if (m_system_cfh != nullptr) {
3357     const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
3358     rocksdb::WriteBatch *const batch = wb.get();
3359 
3360     add_cf_flags(batch, m_system_cfh->GetID(), 0);
3361     commit(batch);
3362   }
3363 
3364   return (m_system_cfh == nullptr);
3365 }
3366 
begin() const3367 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
3368   return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
3369 }
3370 
put_key(rocksdb::WriteBatchBase * const batch,const rocksdb::Slice & key,const rocksdb::Slice & value) const3371 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
3372                                const rocksdb::Slice &key,
3373                                const rocksdb::Slice &value) const {
3374   batch->Put(m_system_cfh, key, value);
3375 }
3376 
get_value(const rocksdb::Slice & key,std::string * const value) const3377 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
3378                                             std::string *const value) const {
3379   rocksdb::ReadOptions options;
3380   options.total_order_seek = true;
3381   return m_db->Get(options, m_system_cfh, key, value);
3382 }
3383 
delete_key(rocksdb::WriteBatchBase * batch,const rocksdb::Slice & key) const3384 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
3385                                   const rocksdb::Slice &key) const {
3386   batch->Delete(m_system_cfh, key);
3387 }
3388 
new_iterator() const3389 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
3390   /* Reading data dictionary should always skip bloom filter */
3391   rocksdb::ReadOptions read_options;
3392   read_options.total_order_seek = true;
3393   return m_db->NewIterator(read_options, m_system_cfh);
3394 }
3395 
commit(rocksdb::WriteBatch * const batch,const bool & sync) const3396 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
3397                              const bool &sync) const {
3398   if (!batch)
3399     return HA_EXIT_FAILURE;
3400   int res = 0;
3401   rocksdb::WriteOptions options;
3402   options.sync = sync;
3403   rocksdb::Status s = m_db->Write(options, batch);
3404   res = !s.ok(); // we return true when something failed
3405   if (res) {
3406     rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
3407   }
3408   batch->Clear();
3409   return res;
3410 }
3411 
dump_index_id(uchar * const netbuf,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id)3412 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
3413                                      Rdb_key_def::DATA_DICT_TYPE dict_type,
3414                                      const GL_INDEX_ID &gl_index_id) {
3415   rdb_netbuf_store_uint32(netbuf, dict_type);
3416   rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
3417                           gl_index_id.cf_id);
3418   rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
3419                           gl_index_id.index_id);
3420 }
3421 
delete_with_prefix(rocksdb::WriteBatch * const batch,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id) const3422 void Rdb_dict_manager::delete_with_prefix(
3423     rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
3424     const GL_INDEX_ID &gl_index_id) const {
3425   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3426   dump_index_id(key_buf, dict_type, gl_index_id);
3427   rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3428 
3429   delete_key(batch, key);
3430 }
3431 
add_or_update_index_cf_mapping(rocksdb::WriteBatch * batch,const uchar m_index_type,const uint16_t kv_version,const uint32_t index_id,const uint32_t cf_id) const3432 void Rdb_dict_manager::add_or_update_index_cf_mapping(
3433     rocksdb::WriteBatch *batch, const uchar m_index_type,
3434     const uint16_t kv_version, const uint32_t index_id,
3435     const uint32_t cf_id) const {
3436   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3437   uchar value_buf[256] = {0};
3438   GL_INDEX_ID gl_index_id = {cf_id, index_id};
3439   dump_index_id(key_buf, Rdb_key_def::INDEX_INFO, gl_index_id);
3440   const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3441 
3442   uchar *ptr = value_buf;
3443   rdb_netbuf_store_uint16(ptr, Rdb_key_def::INDEX_INFO_VERSION_LATEST);
3444   ptr += 2;
3445   rdb_netbuf_store_byte(ptr, m_index_type);
3446   ptr += 1;
3447   rdb_netbuf_store_uint16(ptr, kv_version);
3448   ptr += 2;
3449 
3450   const rocksdb::Slice value =
3451       rocksdb::Slice((char *)value_buf, ptr - value_buf);
3452   batch->Put(m_system_cfh, key, value);
3453 }
3454 
add_cf_flags(rocksdb::WriteBatch * const batch,const uint32_t & cf_id,const uint32_t & cf_flags) const3455 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
3456                                     const uint32_t &cf_id,
3457                                     const uint32_t &cf_flags) const {
3458   DBUG_ASSERT(batch != nullptr);
3459 
3460   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
3461   uchar value_buf[Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE] =
3462       {0};
3463   rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
3464   rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
3465   const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3466 
3467   rdb_netbuf_store_uint16(value_buf, Rdb_key_def::CF_DEFINITION_VERSION);
3468   rdb_netbuf_store_uint32(value_buf + Rdb_key_def::VERSION_SIZE, cf_flags);
3469   const rocksdb::Slice value =
3470       rocksdb::Slice((char *)value_buf, sizeof(value_buf));
3471   batch->Put(m_system_cfh, key, value);
3472 }
3473 
delete_index_info(rocksdb::WriteBatch * batch,const GL_INDEX_ID & gl_index_id) const3474 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
3475                                          const GL_INDEX_ID &gl_index_id) const {
3476   delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
3477 }
3478 
get_index_info(const GL_INDEX_ID & gl_index_id,uint16_t * m_index_dict_version,uchar * m_index_type,uint16_t * kv_version) const3479 bool Rdb_dict_manager::get_index_info(const GL_INDEX_ID &gl_index_id,
3480                                       uint16_t *m_index_dict_version,
3481                                       uchar *m_index_type,
3482                                       uint16_t *kv_version) const {
3483   bool found = false;
3484   bool error = false;
3485   std::string value;
3486   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3487   dump_index_id(key_buf, Rdb_key_def::INDEX_INFO, gl_index_id);
3488   const rocksdb::Slice &key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3489 
3490   const rocksdb::Status &status = get_value(key, &value);
3491   if (status.ok()) {
3492     const uchar *const val = (const uchar *)value.c_str();
3493     const uchar *ptr = val;
3494     *m_index_dict_version = rdb_netbuf_to_uint16(val);
3495     *kv_version = 0;
3496     *m_index_type = 0;
3497     ptr += 2;
3498     switch (*m_index_dict_version) {
3499 
3500     case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
3501     case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
3502       *m_index_type = rdb_netbuf_to_byte(ptr);
3503       ptr += 1;
3504       *kv_version = rdb_netbuf_to_uint16(ptr);
3505       found = true;
3506       break;
3507 
3508     default:
3509       error = true;
3510       break;
3511     }
3512 
3513     switch (*m_index_type) {
3514     case Rdb_key_def::INDEX_TYPE_PRIMARY:
3515     case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
3516       error = *kv_version > Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
3517       break;
3518     }
3519     case Rdb_key_def::INDEX_TYPE_SECONDARY:
3520       error = *kv_version > Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
3521       break;
3522     default:
3523       error = true;
3524       break;
3525     }
3526   }
3527 
3528   if (error) {
3529     // NO_LINT_DEBUG
3530     sql_print_error("RocksDB: Found invalid key version number (%u, %u, %u) "
3531                     "from data dictionary. This should never happen "
3532                     "and it may be a bug.",
3533                     *m_index_dict_version, *m_index_type, *kv_version);
3534     abort_with_stack_traces();
3535   }
3536 
3537   return found;
3538 }
3539 
get_cf_flags(const uint32_t & cf_id,uint32_t * const cf_flags) const3540 bool Rdb_dict_manager::get_cf_flags(const uint32_t &cf_id,
3541                                     uint32_t *const cf_flags) const {
3542   DBUG_ASSERT(cf_flags != nullptr);
3543 
3544   bool found = false;
3545   std::string value;
3546   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
3547 
3548   rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
3549   rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
3550 
3551   const rocksdb::Slice key =
3552       rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
3553   const rocksdb::Status status = get_value(key, &value);
3554 
3555   if (status.ok()) {
3556     const uchar *val = (const uchar *)value.c_str();
3557     DBUG_ASSERT(val);
3558 
3559     const uint16_t version = rdb_netbuf_to_uint16(val);
3560 
3561     if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
3562       *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
3563       found = true;
3564     }
3565   }
3566 
3567   return found;
3568 }
3569 
3570 /*
3571   Returning index ids that were marked as deleted (via DROP TABLE) but
3572   still not removed by drop_index_thread yet, or indexes that are marked as
3573   ongoing creation.
3574  */
get_ongoing_index_operation(std::unordered_set<GL_INDEX_ID> * gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const3575 void Rdb_dict_manager::get_ongoing_index_operation(
3576     std::unordered_set<GL_INDEX_ID> *gl_index_ids,
3577     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3578   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3579               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3580 
3581   uchar index_buf[Rdb_key_def::INDEX_NUMBER_SIZE];
3582   rdb_netbuf_store_uint32(index_buf, dd_type);
3583   const rocksdb::Slice index_slice(reinterpret_cast<char *>(index_buf),
3584                                    Rdb_key_def::INDEX_NUMBER_SIZE);
3585 
3586   rocksdb::Iterator *it = new_iterator();
3587   for (it->Seek(index_slice); it->Valid(); it->Next()) {
3588     rocksdb::Slice key = it->key();
3589     const uchar *const ptr = (const uchar *)key.data();
3590 
3591     /*
3592       Ongoing drop/create index operations require key to be of the form:
3593       dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
3594 
3595       This may need to be changed in the future if we want to process a new
3596       ddl_type with different format.
3597     */
3598     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
3599         rdb_netbuf_to_uint32(ptr) != dd_type) {
3600       break;
3601     }
3602 
3603     // We don't check version right now since currently we always store only
3604     // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
3605     // If increasing version number, we need to add version check logic here.
3606     GL_INDEX_ID gl_index_id;
3607     gl_index_id.cf_id =
3608         rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
3609     gl_index_id.index_id =
3610         rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
3611     gl_index_ids->insert(gl_index_id);
3612   }
3613   delete it;
3614 }
3615 
3616 /*
3617   Returning true if index_id is create/delete ongoing (undergoing creation or
3618   marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
3619   or not.
3620  */
is_index_operation_ongoing(const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const3621 bool Rdb_dict_manager::is_index_operation_ongoing(
3622     const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3623   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3624               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3625 
3626   bool found = false;
3627   std::string value;
3628   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3629   dump_index_id(key_buf, dd_type, gl_index_id);
3630   const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3631 
3632   const rocksdb::Status status = get_value(key, &value);
3633   if (status.ok()) {
3634     found = true;
3635   }
3636   return found;
3637 }
3638 
3639 /*
3640   Adding index_id to data dictionary so that the index id is removed
3641   by drop_index_thread, or to track online index creation.
3642  */
start_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const3643 void Rdb_dict_manager::start_ongoing_index_operation(
3644     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
3645     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3646   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3647               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3648 
3649   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3650   uchar value_buf[Rdb_key_def::VERSION_SIZE] = {0};
3651   dump_index_id(key_buf, dd_type, gl_index_id);
3652 
3653   // version as needed
3654   if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
3655     rdb_netbuf_store_uint16(value_buf,
3656                             Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
3657   } else {
3658     rdb_netbuf_store_uint16(value_buf,
3659                             Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
3660   }
3661 
3662   const rocksdb::Slice key = rocksdb::Slice((char *)key_buf, sizeof(key_buf));
3663   const rocksdb::Slice value =
3664       rocksdb::Slice((char *)value_buf, sizeof(value_buf));
3665   batch->Put(m_system_cfh, key, value);
3666 }
3667 
3668 /*
3669   Removing index_id from data dictionary to confirm drop_index_thread
3670   completed dropping entire key/values of the index_id
3671  */
end_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const3672 void Rdb_dict_manager::end_ongoing_index_operation(
3673     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
3674     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3675   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3676               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3677 
3678   delete_with_prefix(batch, dd_type, gl_index_id);
3679 }
3680 
3681 /*
3682   Returning true if there is no target index ids to be removed
3683   by drop_index_thread
3684  */
is_drop_index_empty() const3685 bool Rdb_dict_manager::is_drop_index_empty() const {
3686   std::unordered_set<GL_INDEX_ID> gl_index_ids;
3687   get_ongoing_drop_indexes(&gl_index_ids);
3688   return gl_index_ids.empty();
3689 }
3690 
3691 /*
3692   This function is supposed to be called by DROP TABLE. Logging messages
3693   that dropping indexes started, and adding data dictionary so that
3694   all associated indexes to be removed
3695  */
add_drop_table(std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 & n_keys,rocksdb::WriteBatch * const batch) const3696 void Rdb_dict_manager::add_drop_table(
3697     std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 &n_keys,
3698     rocksdb::WriteBatch *const batch) const {
3699   std::unordered_set<GL_INDEX_ID> dropped_index_ids;
3700   for (uint32 i = 0; i < n_keys; i++) {
3701     dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
3702   }
3703 
3704   add_drop_index(dropped_index_ids, batch);
3705 }
3706 
3707 /*
3708   Called during inplace index drop operations. Logging messages
3709   that dropping indexes started, and adding data dictionary so that
3710   all associated indexes to be removed
3711  */
add_drop_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const3712 void Rdb_dict_manager::add_drop_index(
3713     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
3714     rocksdb::WriteBatch *const batch) const {
3715   for (const auto &gl_index_id : gl_index_ids) {
3716     log_start_drop_index(gl_index_id, "Begin");
3717     start_drop_index(batch, gl_index_id);
3718   }
3719 }
3720 
3721 /*
3722   Called during inplace index creation operations. Logging messages
3723   that adding indexes started, and updates data dictionary with all associated
3724   indexes to be added.
3725  */
add_create_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const3726 void Rdb_dict_manager::add_create_index(
3727     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
3728     rocksdb::WriteBatch *const batch) const {
3729   for (const auto &gl_index_id : gl_index_ids) {
3730     // NO_LINT_DEBUG
3731     sql_print_information("RocksDB: Begin index creation (%u,%u)",
3732                           gl_index_id.cf_id, gl_index_id.index_id);
3733     start_create_index(batch, gl_index_id);
3734   }
3735 }
3736 
3737 /*
3738   This function is supposed to be called by drop_index_thread, when it
3739   finished dropping any index, or at the completion of online index creation.
3740  */
finish_indexes_operation(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const3741 void Rdb_dict_manager::finish_indexes_operation(
3742     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
3743     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
3744   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
3745               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3746 
3747   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
3748   rocksdb::WriteBatch *const batch = wb.get();
3749 
3750   std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
3751   get_ongoing_create_indexes(&incomplete_create_indexes);
3752 
3753   for (const auto &gl_index_id : gl_index_ids) {
3754     if (is_index_operation_ongoing(gl_index_id, dd_type)) {
3755       // NO_LINT_DEBUG
3756       sql_print_information("RocksDB: Finished %s (%u,%u)",
3757                             dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING
3758                                 ? "filtering dropped index"
3759                                 : "index creation",
3760                             gl_index_id.cf_id, gl_index_id.index_id);
3761 
3762       end_ongoing_index_operation(batch, gl_index_id, dd_type);
3763 
3764       /*
3765         Remove the corresponding incomplete create indexes from data
3766         dictionary as well
3767       */
3768       if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
3769         if (incomplete_create_indexes.count(gl_index_id)) {
3770           end_ongoing_index_operation(batch, gl_index_id,
3771                                       Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
3772         }
3773       }
3774     }
3775 
3776     if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
3777       delete_index_info(batch, gl_index_id);
3778     }
3779   }
3780   commit(batch);
3781 }
3782 
3783 /*
3784   This function is supposed to be called when initializing
3785   Rdb_dict_manager (at startup). If there is any index ids that are
3786   drop ongoing, printing out messages for diagnostics purposes.
3787  */
resume_drop_indexes() const3788 void Rdb_dict_manager::resume_drop_indexes() const {
3789   std::unordered_set<GL_INDEX_ID> gl_index_ids;
3790   get_ongoing_drop_indexes(&gl_index_ids);
3791 
3792   uint max_index_id_in_dict = 0;
3793   get_max_index_id(&max_index_id_in_dict);
3794 
3795   for (const auto &gl_index_id : gl_index_ids) {
3796     log_start_drop_index(gl_index_id, "Resume");
3797     if (max_index_id_in_dict < gl_index_id.index_id) {
3798       sql_print_error("RocksDB: Found max index id %u from data dictionary "
3799                       "but also found dropped index id (%u,%u) from drop_index "
3800                       "dictionary. This should never happen and is possibly a "
3801                       "bug.",
3802                       max_index_id_in_dict, gl_index_id.cf_id,
3803                       gl_index_id.index_id);
3804       abort_with_stack_traces();
3805     }
3806   }
3807 }
3808 
rollback_ongoing_index_creation() const3809 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
3810   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
3811   rocksdb::WriteBatch *const batch = wb.get();
3812 
3813   std::unordered_set<GL_INDEX_ID> gl_index_ids;
3814   get_ongoing_create_indexes(&gl_index_ids);
3815 
3816   for (const auto &gl_index_id : gl_index_ids) {
3817     // NO_LINT_DEBUG
3818     sql_print_information("RocksDB: Removing incomplete create index (%u,%u)",
3819                           gl_index_id.cf_id, gl_index_id.index_id);
3820 
3821     start_drop_index(batch, gl_index_id);
3822   }
3823 
3824   commit(batch);
3825 }
3826 
log_start_drop_table(const std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 & n_keys,const char * const log_action) const3827 void Rdb_dict_manager::log_start_drop_table(
3828     const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 &n_keys,
3829     const char *const log_action) const {
3830   for (uint32 i = 0; i < n_keys; i++) {
3831     log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
3832   }
3833 }
3834 
log_start_drop_index(GL_INDEX_ID gl_index_id,const char * log_action) const3835 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
3836                                             const char *log_action) const {
3837   uint16 m_index_dict_version = 0;
3838   uchar m_index_type = 0;
3839   uint16 kv_version = 0;
3840 
3841   if (!get_index_info(gl_index_id, &m_index_dict_version, &m_index_type,
3842                       &kv_version)) {
3843     /*
3844       If we don't find the index info, it could be that it's because it was a
3845       partially created index that isn't in the data dictionary yet that needs
3846       to be rolled back.
3847     */
3848     std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
3849     get_ongoing_create_indexes(&incomplete_create_indexes);
3850 
3851     if (!incomplete_create_indexes.count(gl_index_id)) {
3852       /* If it's not a partially created index, something is very wrong. */
3853       sql_print_error("RocksDB: Failed to get column family info "
3854                       "from index id (%u,%u). MyRocks data dictionary may "
3855                       "get corrupted.",
3856                       gl_index_id.cf_id, gl_index_id.index_id);
3857       abort_with_stack_traces();
3858     }
3859   }
3860   sql_print_information("RocksDB: %s filtering dropped index (%u,%u)",
3861                         log_action, gl_index_id.cf_id, gl_index_id.index_id);
3862 }
3863 
get_max_index_id(uint32_t * const index_id) const3864 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
3865   bool found = false;
3866   std::string value;
3867 
3868   const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
3869   if (status.ok()) {
3870     const uchar *const val = (const uchar *)value.c_str();
3871     const uint16_t &version = rdb_netbuf_to_uint16(val);
3872     if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
3873       *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
3874       found = true;
3875     }
3876   }
3877   return found;
3878 }
3879 
update_max_index_id(rocksdb::WriteBatch * const batch,const uint32_t & index_id) const3880 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
3881                                            const uint32_t &index_id) const {
3882   DBUG_ASSERT(batch != nullptr);
3883 
3884   uint32_t old_index_id = -1;
3885   if (get_max_index_id(&old_index_id)) {
3886     if (old_index_id > index_id) {
3887       sql_print_error("RocksDB: Found max index id %u from data dictionary "
3888                       "but trying to update to older value %u. This should "
3889                       "never happen and possibly a bug.",
3890                       old_index_id, index_id);
3891       return true;
3892     }
3893   }
3894 
3895   uchar value_buf[Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE] =
3896       {0};
3897   rdb_netbuf_store_uint16(value_buf, Rdb_key_def::MAX_INDEX_ID_VERSION);
3898   rdb_netbuf_store_uint32(value_buf + Rdb_key_def::VERSION_SIZE, index_id);
3899   const rocksdb::Slice value =
3900       rocksdb::Slice((char *)value_buf, sizeof(value_buf));
3901   batch->Put(m_system_cfh, m_key_slice_max_index_id, value);
3902   return false;
3903 }
3904 
add_stats(rocksdb::WriteBatch * const batch,const std::vector<Rdb_index_stats> & stats) const3905 void Rdb_dict_manager::add_stats(
3906     rocksdb::WriteBatch *const batch,
3907     const std::vector<Rdb_index_stats> &stats) const {
3908   DBUG_ASSERT(batch != nullptr);
3909 
3910   for (const auto &it : stats) {
3911     uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3912     dump_index_id(key_buf, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
3913 
3914     // IndexStats::materialize takes complete care of serialization including
3915     // storing the version
3916     const auto value =
3917         Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it}, 1.);
3918 
3919     batch->Put(m_system_cfh, rocksdb::Slice((char *)key_buf, sizeof(key_buf)),
3920                value);
3921   }
3922 }
3923 
get_stats(GL_INDEX_ID gl_index_id) const3924 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
3925   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 3] = {0};
3926   dump_index_id(key_buf, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
3927 
3928   std::string value;
3929   const rocksdb::Status status = get_value(
3930       rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf)),
3931       &value);
3932   if (status.ok()) {
3933     std::vector<Rdb_index_stats> v;
3934     // unmaterialize checks if the version matches
3935     if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
3936       return v[0];
3937     }
3938   }
3939 
3940   return Rdb_index_stats();
3941 }
3942 
get_and_update_next_number(Rdb_dict_manager * const dict)3943 uint Rdb_seq_generator::get_and_update_next_number(
3944     Rdb_dict_manager *const dict) {
3945   DBUG_ASSERT(dict != nullptr);
3946 
3947   uint res;
3948   mysql_mutex_lock(&m_mutex);
3949 
3950   res = m_next_number++;
3951 
3952   const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
3953   rocksdb::WriteBatch *const batch = wb.get();
3954 
3955   DBUG_ASSERT(batch != nullptr);
3956   dict->update_max_index_id(batch, res);
3957   dict->commit(batch);
3958 
3959   mysql_mutex_unlock(&m_mutex);
3960 
3961   return res;
3962 }
3963 
3964 } // namespace myrocks
3965