1 /*
2    Copyright (c) 2012,2013 Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16 
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation  // gcc: Class implementation
19 #endif
20 
21 /* For use of 'PRIu64': */
22 #define __STDC_FORMAT_MACROS
23 
24 #include <my_global.h>
25 /* This C++ file's header file */
26 #include "./rdb_datadic.h"
27 
28 #include <inttypes.h>
29 /* C++ standard header files */
30 #include <algorithm>
31 #include <array>
32 #include <limits>
33 #include <map>
34 #include <set>
35 #include <string>
36 #include <utility>
37 #include <vector>
38 
39 /* MySQL header files */
40 #include "./field.h"
41 #include "./key.h"
42 #include "./m_ctype.h"
43 #include "./my_bit.h"
44 #include "./my_bitmap.h"
45 #include "./sql_table.h"
46 
47 /* MyRocks header files */
48 #include "./ha_rocksdb.h"
49 #include "./ha_rocksdb_proto.h"
50 #include "./my_stacktrace.h"
51 #include "./rdb_cf_manager.h"
52 #include "./rdb_psi.h"
53 #include "./rdb_utils.h"
54 
55 namespace myrocks {
56 
57 void get_mem_comparable_space(const CHARSET_INFO *cs,
58                               const std::vector<uchar> **xfrm, size_t *xfrm_len,
59                               size_t *mb_len);
60 
61 /*
62   MariaDB's replacement for FB/MySQL Field::check_field_name_match :
63 */
64 inline bool field_check_field_name_match(Field *field, const char *name)
65 {
66   return (0 == my_strcasecmp(system_charset_info,
67                              field->field_name.str,
68                              name));
69 }
70 
71 
72 /*
73   Decode  current key field
74   @param  fpi               IN      data structure contains field metadata
75   @param  field             IN      current field
76   @param  reader            IN      key slice reader
77   @param  unp_reader        IN      unpack information reader
78   @return
79     HA_EXIT_SUCCESS    OK
80     other              HA_ERR error code
81 */
82 int Rdb_convert_to_record_key_decoder::decode_field(
83     Rdb_field_packing *fpi, Field *field, Rdb_string_reader *reader,
84     const uchar *const default_value, Rdb_string_reader *unpack_reader) {
85   if (fpi->m_maybe_null) {
86     const char *nullp;
87     if (!(nullp = reader->read(1))) {
88       return HA_EXIT_FAILURE;
89     }
90 
91     if (*nullp == 0) {
92       /* Set the NULL-bit of this field */
93       field->set_null();
94       /* Also set the field to its default value */
95       memcpy(field->ptr, default_value, field->pack_length());
96       return HA_EXIT_SUCCESS;
97     } else if (*nullp == 1) {
98       field->set_notnull();
99     } else {
100       return HA_EXIT_FAILURE;
101     }
102   }
103 
104   return (fpi->m_unpack_func)(fpi, field, field->ptr, reader, unpack_reader);
105 }
106 
107 /*
108   Decode  current key field
109 
110   @param  buf               OUT     the buf starting address
111   @param  offset            OUT     the bytes offset when data is written
112   @param  fpi               IN      data structure contains field metadata
113   @param  table             IN      current table
114   @param  field             IN      current field
115   @param  has_unpack_inf    IN      whether contains unpack inf
116   @param  reader            IN      key slice reader
117   @param  unp_reader        IN      unpack information reader
118   @return
119     HA_EXIT_SUCCESS    OK
120     other              HA_ERR error code
121 */
122 int Rdb_convert_to_record_key_decoder::decode(
123     uchar *const buf, uint *offset, Rdb_field_packing *fpi, TABLE *table,
124     Field *field, bool has_unpack_info, Rdb_string_reader *reader,
125     Rdb_string_reader *unpack_reader) {
126   DBUG_ASSERT(buf != nullptr);
127   DBUG_ASSERT(offset != nullptr);
128 
129   uint field_offset = field->ptr - table->record[0];
130   *offset = field_offset;
131   uint null_offset = field->null_offset();
132   bool maybe_null = field->real_maybe_null();
133 
134   field->move_field(buf + field_offset,
135                     maybe_null ? buf + null_offset : nullptr, field->null_bit);
136 
137   // If we need unpack info, but there is none, tell the unpack function
138   // this by passing unp_reader as nullptr. If we never read unpack_info
139   // during unpacking anyway, then there won't an error.
140   bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
141 
142   int res =
143       decode_field(fpi, field, reader, table->s->default_values + field_offset,
144                    maybe_missing_unpack ? nullptr : unpack_reader);
145 
146   // Restore field->ptr and field->null_ptr
147   field->move_field(table->record[0] + field_offset,
148                     maybe_null ? table->record[0] + null_offset : nullptr,
149                     field->null_bit);
150   if (res != UNPACK_SUCCESS) {
151     return HA_ERR_ROCKSDB_CORRUPT_DATA;
152   }
153   return HA_EXIT_SUCCESS;
154 }
155 
156 /*
157   Skip current key field
158 
159   @param  fpi          IN    data structure contains field metadata
160   @param  field        IN    current field
161   @param  reader       IN    key slice reader
162   @param  unp_reader   IN    unpack information reader
163   @return
164     HA_EXIT_SUCCESS    OK
165     other              HA_ERR error code
166 */
167 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
168                                             const Field *field,
169                                             Rdb_string_reader *reader,
170                                             Rdb_string_reader *unp_reader) {
171   /* It is impossible to unpack the column. Skip it. */
172   if (fpi->m_maybe_null) {
173     const char *nullp;
174     if (!(nullp = reader->read(1))) {
175       return HA_ERR_ROCKSDB_CORRUPT_DATA;
176     }
177     if (*nullp == 0) {
178       /* This is a NULL value */
179       return HA_EXIT_SUCCESS;
180     }
181     /* If NULL marker is not '0', it can be only '1'  */
182     if (*nullp != 1) {
183       return HA_ERR_ROCKSDB_CORRUPT_DATA;
184     }
185   }
186   if ((fpi->m_skip_func)(fpi, field, reader)) {
187     return HA_ERR_ROCKSDB_CORRUPT_DATA;
188   }
189   // If this is a space padded varchar, we need to skip the indicator
190   // bytes for trailing bytes. They're useless since we can't restore the
191   // field anyway.
192   //
193   // There is a special case for prefixed varchars where we do not
194   // generate unpack info, because we know prefixed varchars cannot be
195   // unpacked. In this case, it is not necessary to skip.
196   if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
197       !fpi->m_unpack_info_stores_value) {
198     unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
199   }
200   return HA_EXIT_SUCCESS;
201 }
202 
203 Rdb_key_field_iterator::Rdb_key_field_iterator(
204     const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
205     Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
206     bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
207   m_key_def = key_def;
208   m_pack_info = pack_info;
209   m_iter_index = 0;
210   m_iter_end = key_def->get_key_parts();
211   m_reader = reader;
212   m_unp_reader = unp_reader;
213   m_table = table;
214   m_has_unpack_info = has_unpack_info;
215   m_covered_bitmap = covered_bitmap;
216   m_buf = buf;
217   m_secondary_key =
218       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
219   m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
220   m_is_hidden_pk =
221       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
222   m_curr_bitmap_pos = 0;
223   m_offset = 0;
224 }
225 
226 void *Rdb_key_field_iterator::get_dst() const { return m_buf + m_offset; }
227 
228 int Rdb_key_field_iterator::get_field_index() const {
229   DBUG_ASSERT(m_field != nullptr);
230   return m_field->field_index;
231 }
232 
233 bool Rdb_key_field_iterator::get_is_null() const { return m_is_null; }
234 Field *Rdb_key_field_iterator::get_field() const {
235   DBUG_ASSERT(m_field != nullptr);
236   return m_field;
237 }
238 
239 bool Rdb_key_field_iterator::has_next() { return m_iter_index < m_iter_end; }
240 
241 /**
242  Iterate each field in the key and decode/skip one by one
243 */
244 int Rdb_key_field_iterator::next() {
245   int status = HA_EXIT_SUCCESS;
246   while (m_iter_index < m_iter_end) {
247     int curr_index = m_iter_index++;
248 
249     m_fpi = &m_pack_info[curr_index];
250     /*
251       Hidden pk field is packed at the end of the secondary keys, but the SQL
252       layer does not know about it. Skip retrieving field if hidden pk.
253     */
254     if ((m_secondary_key && m_hidden_pk_exists &&
255          curr_index + 1 == m_iter_end) ||
256         m_is_hidden_pk) {
257       DBUG_ASSERT(m_fpi->m_unpack_func);
258       if ((m_fpi->m_skip_func)(m_fpi, nullptr, m_reader)) {
259         return HA_ERR_ROCKSDB_CORRUPT_DATA;
260       }
261       return HA_EXIT_SUCCESS;
262     }
263 
264     m_field = m_fpi->get_field_in_table(m_table);
265 
266     bool covered_column = true;
267     if (m_covered_bitmap != nullptr &&
268         m_field->real_type() == MYSQL_TYPE_VARCHAR && !m_fpi->m_covered) {
269       uint tmp= m_curr_bitmap_pos++;
270       covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
271                        bitmap_is_set(m_covered_bitmap, tmp);
272     }
273 
274     if (m_fpi->m_unpack_func && covered_column) {
275       /* It is possible to unpack this column. Do it. */
276       status = Rdb_convert_to_record_key_decoder::decode(
277           m_buf, &m_offset, m_fpi, m_table, m_field, m_has_unpack_info,
278           m_reader, m_unp_reader);
279       if (status) {
280         return status;
281       }
282       break;
283     } else {
284       status = Rdb_convert_to_record_key_decoder::skip(m_fpi, m_field, m_reader,
285                                                        m_unp_reader);
286       if (status) {
287         return status;
288       }
289     }
290   }
291   return HA_EXIT_SUCCESS;
292 }
293 
294 /*
295   Rdb_key_def class implementation
296 */
297 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
298                          rocksdb::ColumnFamilyHandle *cf_handle_arg,
299                          uint16_t index_dict_version_arg, uchar index_type_arg,
300                          uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
301                          bool is_per_partition_cf_arg, const char *_name,
302                          Rdb_index_stats _stats, uint32 index_flags_bitmap,
303                          uint32 ttl_rec_offset, uint64 ttl_duration)
304     : m_index_number(indexnr_arg),
305       m_cf_handle(cf_handle_arg),
306       m_index_dict_version(index_dict_version_arg),
307       m_index_type(index_type_arg),
308       m_kv_format_version(kv_format_version_arg),
309       m_is_reverse_cf(is_reverse_cf_arg),
310       m_is_per_partition_cf(is_per_partition_cf_arg),
311       m_name(_name),
312       m_stats(_stats),
313       m_index_flags_bitmap(index_flags_bitmap),
314       m_ttl_rec_offset(ttl_rec_offset),
315       m_ttl_duration(ttl_duration),
316       m_ttl_column(""),
317       m_pk_part_no(nullptr),
318       m_pack_info(nullptr),
319       m_keyno(keyno_arg),
320       m_key_parts(0),
321       m_ttl_pk_key_part_offset(UINT_MAX),
322       m_ttl_field_index(UINT_MAX),
323       m_prefix_extractor(nullptr),
324       m_maxlength(0)  // means 'not intialized'
325 {
326   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
327   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
328   m_total_index_flags_length =
329       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
330   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
331                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
332                   m_total_index_flags_length == 0);
333   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
334                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
335                   m_total_index_flags_length == 0);
336   DBUG_ASSERT(m_cf_handle != nullptr);
337 }
338 
339 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
340     : m_index_number(k.m_index_number),
341       m_cf_handle(k.m_cf_handle),
342       m_is_reverse_cf(k.m_is_reverse_cf),
343       m_is_per_partition_cf(k.m_is_per_partition_cf),
344       m_name(k.m_name),
345       m_stats(k.m_stats),
346       m_index_flags_bitmap(k.m_index_flags_bitmap),
347       m_ttl_rec_offset(k.m_ttl_rec_offset),
348       m_ttl_duration(k.m_ttl_duration),
349       m_ttl_column(k.m_ttl_column),
350       m_pk_part_no(k.m_pk_part_no),
351       m_pack_info(k.m_pack_info),
352       m_keyno(k.m_keyno),
353       m_key_parts(k.m_key_parts),
354       m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
355       m_ttl_field_index(UINT_MAX),
356       m_prefix_extractor(k.m_prefix_extractor),
357       m_maxlength(k.m_maxlength) {
358   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
359   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
360   m_total_index_flags_length =
361       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
362   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
363                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
364                   m_total_index_flags_length == 0);
365   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
366                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
367                   m_total_index_flags_length == 0);
368   if (k.m_pack_info) {
369     const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
370     void *pack_info= my_malloc(size, MYF(0));
371     memcpy(pack_info, k.m_pack_info, size);
372     m_pack_info = reinterpret_cast<Rdb_field_packing *>(pack_info);
373   }
374 
375   if (k.m_pk_part_no) {
376     const size_t size = sizeof(uint) * m_key_parts;
377     m_pk_part_no = reinterpret_cast<uint *>(my_malloc(size, MYF(0)));
378     memcpy(m_pk_part_no, k.m_pk_part_no, size);
379   }
380 }
381 
382 Rdb_key_def::~Rdb_key_def() {
383   mysql_mutex_destroy(&m_mutex);
384 
385   my_free(m_pk_part_no);
386   m_pk_part_no = nullptr;
387 
388   my_free(m_pack_info);
389   m_pack_info = nullptr;
390 }
391 
392 void Rdb_key_def::setup(const TABLE *const tbl,
393                         const Rdb_tbl_def *const tbl_def) {
394   DBUG_ASSERT(tbl != nullptr);
395   DBUG_ASSERT(tbl_def != nullptr);
396 
397   /*
398     Set max_length based on the table.  This can be called concurrently from
399     multiple threads, so there is a mutex to protect this code.
400   */
401   const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
402   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
403   const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
404   if (!m_maxlength) {
405     RDB_MUTEX_LOCK_CHECK(m_mutex);
406     if (m_maxlength != 0) {
407       RDB_MUTEX_UNLOCK_CHECK(m_mutex);
408       return;
409     }
410 
411     KEY *key_info = nullptr;
412     KEY *pk_info = nullptr;
413     if (!is_hidden_pk) {
414       key_info = &tbl->key_info[m_keyno];
415       if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
416       m_name = std::string(key_info->name.str);
417     } else {
418       m_name = HIDDEN_PK_NAME;
419     }
420 
421     if (secondary_key) {
422       m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts;
423     } else {
424       pk_info = nullptr;
425       m_pk_key_parts = 0;
426     }
427 
428     // "unique" secondary keys support:
429     m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts;
430 
431     if (secondary_key) {
432       /*
433         In most cases, SQL layer puts PK columns as invisible suffix at the
434         end of secondary key. There are cases where this doesn't happen:
435         - unique secondary indexes.
436         - partitioned tables.
437 
438         Internally, we always need PK columns as suffix (and InnoDB does,
439         too, if you were wondering).
440 
441         The loop below will attempt to put all PK columns at the end of key
442         definition.  Columns that are already included in the index (either
443         by the user or by "extended keys" feature) are not included for the
444         second time.
445       */
446       m_key_parts += m_pk_key_parts;
447     }
448 
449     if (secondary_key) {
450       m_pk_part_no = reinterpret_cast<uint *>(
451           my_malloc(sizeof(uint) * m_key_parts, MYF(0)));
452     } else {
453       m_pk_part_no = nullptr;
454     }
455 
456     const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
457     m_pack_info =
458         reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
459 
460     /*
461       Guaranteed not to error here as checks have been made already during
462       table creation.
463     */
464     Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
465                                  &m_ttl_field_index, true);
466 
467     size_t max_len = INDEX_NUMBER_SIZE;
468     int unpack_len = 0;
469     int max_part_len = 0;
470     bool simulating_extkey = false;
471     uint dst_i = 0;
472 
473     uint keyno_to_set = m_keyno;
474     uint keypart_to_set = 0;
475 
476     if (is_hidden_pk) {
477       Field *field = nullptr;
478       m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
479       m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
480       max_len += m_pack_info[dst_i].m_max_image_len;
481       max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
482       dst_i++;
483     } else {
484       KEY_PART_INFO *key_part = key_info->key_part;
485 
486       /* this loop also loops over the 'extended key' tail */
487       for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
488         Field *const field = key_part ? key_part->field : nullptr;
489 
490         if (simulating_extkey && !hidden_pk_exists) {
491           DBUG_ASSERT(secondary_key);
492           /* Check if this field is already present in the key definition */
493           bool found = false;
494           for (uint j= 0; j < key_info->ext_key_parts; j++) {
495             if (field->field_index ==
496                     key_info->key_part[j].field->field_index &&
497                 key_part->length == key_info->key_part[j].length) {
498               found = true;
499               break;
500             }
501           }
502 
503           if (found) {
504             key_part++;
505             continue;
506           }
507         }
508 
509         if (field && field->real_maybe_null()) max_len += 1;  // NULL-byte
510 
511         m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
512                                  key_part ? key_part->length : 0);
513         m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
514 
515         if (pk_info) {
516           m_pk_part_no[dst_i] = -1;
517           for (uint j = 0; j < m_pk_key_parts; j++) {
518             if (field->field_index == pk_info->key_part[j].field->field_index) {
519               m_pk_part_no[dst_i] = j;
520               break;
521             }
522           }
523         } else if (secondary_key && hidden_pk_exists) {
524           /*
525             The hidden pk can never be part of the sk.  So it is always
526             appended to the end of the sk.
527           */
528           m_pk_part_no[dst_i] = -1;
529           if (simulating_extkey) m_pk_part_no[dst_i] = 0;
530         }
531 
532         max_len += m_pack_info[dst_i].m_max_image_len;
533 
534         max_part_len =
535             std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
536 
537         /*
538           Check key part name here, if it matches the TTL column then we store
539           the offset of the TTL key part here.
540         */
541         if (!m_ttl_column.empty() &&
542             field_check_field_name_match(field, m_ttl_column.c_str())) {
543           DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
544           DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
545           DBUG_ASSERT(!field->real_maybe_null());
546           m_ttl_pk_key_part_offset = dst_i;
547         }
548 
549         key_part++;
550         /*
551           For "unique" secondary indexes, pretend they have
552           "index extensions".
553 
554           MariaDB also has this property: if an index has a partially-covered
555           column like KEY(varchar_col(N)), then the SQL layer will think it is
556           not "extended" with PK columns. The code below handles this case,
557           also.
558          */
559         if (secondary_key && src_i+1 == key_info->ext_key_parts) {
560           simulating_extkey = true;
561           if (!hidden_pk_exists) {
562             keyno_to_set = tbl->s->primary_key;
563             key_part = pk_info->key_part;
564             keypart_to_set = (uint)-1;
565           } else {
566             keyno_to_set = tbl_def->m_key_count - 1;
567             key_part = nullptr;
568             keypart_to_set = 0;
569           }
570         }
571 
572         dst_i++;
573       }
574     }
575 
576     m_key_parts = dst_i;
577 
578     /* Initialize the memory needed by the stats structure */
579     m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
580 
581     /* Cache prefix extractor for bloom filter usage later */
582     rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
583     m_prefix_extractor = opt.prefix_extractor;
584 
585     /*
586       This should be the last member variable set before releasing the mutex
587       so that other threads can't see the object partially set up.
588      */
589     m_maxlength = max_len;
590 
591     RDB_MUTEX_UNLOCK_CHECK(m_mutex);
592   }
593 }
594 
595 /*
596   Determine if the table has TTL enabled by parsing the table comment.
597 
598   @param[IN]  table_arg
599   @param[IN]  tbl_def_arg
600   @param[OUT] ttl_duration        Default TTL value parsed from table comment
601 */
602 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
603                                        const Rdb_tbl_def *const tbl_def_arg,
604                                        uint64 *ttl_duration) {
605   DBUG_ASSERT(table_arg != nullptr);
606   DBUG_ASSERT(tbl_def_arg != nullptr);
607   DBUG_ASSERT(ttl_duration != nullptr);
608   std::string table_comment(table_arg->s->comment.str,
609                             table_arg->s->comment.length);
610 
611   bool ttl_duration_per_part_match_found = false;
612   std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
613       table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
614       RDB_TTL_DURATION_QUALIFIER);
615 
616   /* If we don't have a ttl duration, nothing to do here. */
617   if (ttl_duration_str.empty()) {
618     return HA_EXIT_SUCCESS;
619   }
620 
621   /*
622     Catch errors where a non-integral value was used as ttl duration, strtoull
623     will return 0.
624   */
625   *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
626   if (!*ttl_duration) {
627     my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
628     return HA_EXIT_FAILURE;
629   }
630 
631   return HA_EXIT_SUCCESS;
632 }
633 
634 /*
635   Determine if the table has TTL enabled by parsing the table comment.
636 
637   @param[IN]  table_arg
638   @param[IN]  tbl_def_arg
639   @param[OUT] ttl_column          TTL column in the table
640   @param[IN]  skip_checks         Skip validation checks (when called in
641                                   setup())
642 */
643 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
644                                   const Rdb_tbl_def *const tbl_def_arg,
645                                   std::string *ttl_column,
646                                   uint *ttl_field_index, bool skip_checks) {
647   std::string table_comment(table_arg->s->comment.str,
648                             table_arg->s->comment.length);
649   /*
650     Check if there is a TTL column specified. Note that this is not required
651     and if omitted, an 8-byte ttl field will be prepended to each record
652     implicitly.
653   */
654   bool ttl_col_per_part_match_found = false;
655   std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
656       table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
657       RDB_TTL_COL_QUALIFIER);
658 
659   if (skip_checks) {
660     for (uint i = 0; i < table_arg->s->fields; i++) {
661       Field *const field = table_arg->field[i];
662       if (field_check_field_name_match(field, ttl_col_str.c_str())) {
663         *ttl_column = ttl_col_str;
664         *ttl_field_index = i;
665       }
666     }
667     return HA_EXIT_SUCCESS;
668   }
669 
670   /* Check if TTL column exists in table */
671   if (!ttl_col_str.empty()) {
672     bool found = false;
673     for (uint i = 0; i < table_arg->s->fields; i++) {
674       Field *const field = table_arg->field[i];
675       if (field_check_field_name_match(field, ttl_col_str.c_str()) &&
676           field->real_type() == MYSQL_TYPE_LONGLONG &&
677           field->key_type() == HA_KEYTYPE_ULONGLONG &&
678           !field->real_maybe_null()) {
679         *ttl_column = ttl_col_str;
680         *ttl_field_index = i;
681         found = true;
682         break;
683       }
684     }
685 
686     if (!found) {
687       my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
688       return HA_EXIT_FAILURE;
689     }
690   }
691 
692   return HA_EXIT_SUCCESS;
693 }
694 
695 const std::string Rdb_key_def::gen_qualifier_for_table(
696     const char *const qualifier, const std::string &partition_name) {
697   bool has_partition = !partition_name.empty();
698   std::string qualifier_str = "";
699 
700   if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
701     return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
702                          : qualifier_str + RDB_CF_NAME_QUALIFIER +
703                                RDB_QUALIFIER_VALUE_SEP;
704   } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
705     return has_partition
706                ? gen_ttl_duration_qualifier_for_partition(partition_name)
707                : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
708                      RDB_QUALIFIER_VALUE_SEP;
709   } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
710     return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
711                          : qualifier_str + RDB_TTL_COL_QUALIFIER +
712                                RDB_QUALIFIER_VALUE_SEP;
713   } else {
714     DBUG_ASSERT(0);
715   }
716 
717   return qualifier_str;
718 }
719 
720 /*
721   Formats the string and returns the column family name assignment part for a
722   specific partition.
723 */
724 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
725     const std::string &prefix) {
726   DBUG_ASSERT(!prefix.empty());
727 
728   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
729          RDB_QUALIFIER_VALUE_SEP;
730 }
731 
732 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
733     const std::string &prefix) {
734   DBUG_ASSERT(!prefix.empty());
735 
736   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
737          RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
738 }
739 
740 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
741     const std::string &prefix) {
742   DBUG_ASSERT(!prefix.empty());
743 
744   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
745          RDB_QUALIFIER_VALUE_SEP;
746 }
747 
748 const std::string Rdb_key_def::parse_comment_for_qualifier(
749     const std::string &comment, const TABLE *const table_arg,
750     const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
751     const char *const qualifier) {
752   DBUG_ASSERT(table_arg != nullptr);
753   DBUG_ASSERT(tbl_def_arg != nullptr);
754   DBUG_ASSERT(per_part_match_found != nullptr);
755   DBUG_ASSERT(qualifier != nullptr);
756 
757   std::string empty_result;
758 
759   // Flag which marks if partition specific options were found.
760   *per_part_match_found = false;
761 
762   if (comment.empty()) {
763     return empty_result;
764   }
765 
766   // Let's fetch the comment for a index and check if there's a custom key
767   // name specified for a partition we are handling.
768   std::vector<std::string> v =
769       myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
770 
771   std::string search_str = gen_qualifier_for_table(qualifier);
772 
773   // If table has partitions then we need to check if user has requested
774   // qualifiers on a per partition basis.
775   //
776   // NOTE: this means if you specify a qualifier for a specific partition it
777   // will take precedence the 'table level' qualifier if one exists.
778   std::string search_str_part;
779   if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) {
780     std::string partition_name = tbl_def_arg->base_partition();
781     DBUG_ASSERT(!partition_name.empty());
782     search_str_part = gen_qualifier_for_table(qualifier, partition_name);
783   }
784 
785   DBUG_ASSERT(!search_str.empty());
786 
787   // Basic O(N) search for a matching assignment. At most we expect maybe
788   // ten or so elements here.
789   if (!search_str_part.empty()) {
790     for (const auto &it : v) {
791       if (it.substr(0, search_str_part.length()) == search_str_part) {
792         // We found a prefix match. Try to parse it as an assignment.
793         std::vector<std::string> tokens =
794             myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
795 
796         // We found a custom qualifier, it was in the form we expected it to be.
797         // Return that instead of whatever we initially wanted to return. In
798         // a case below the `foo` part will be returned to the caller.
799         //
800         // p3_cfname=foo
801         //
802         // If no value was specified then we'll return an empty string which
803         // later gets translated into using a default CF.
804         if (tokens.size() == 2) {
805           *per_part_match_found = true;
806           return tokens[1];
807         } else {
808           return empty_result;
809         }
810       }
811     }
812   }
813 
814   // Do this loop again, this time searching for 'table level' qualifiers if we
815   // didn't find any partition level qualifiers above.
816   for (const auto &it : v) {
817     if (it.substr(0, search_str.length()) == search_str) {
818       std::vector<std::string> tokens =
819           myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
820       if (tokens.size() == 2) {
821         return tokens[1];
822       } else {
823         return empty_result;
824       }
825     }
826   }
827 
828   // If we didn't find any partitioned/non-partitioned qualifiers, return an
829   // empty string.
830   return empty_result;
831 }
832 
833 /**
834   Read a memcmp key part from a slice using the passed in reader.
835 
836   Returns -1 if field was null, 1 if error, 0 otherwise.
837 */
838 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
839                                       Rdb_string_reader *reader,
840                                       const uint part_num) const {
841   /* It is impossible to unpack the column. Skip it. */
842   if (m_pack_info[part_num].m_maybe_null) {
843     const char *nullp;
844     if (!(nullp = reader->read(1))) return 1;
845     if (*nullp == 0) {
846       /* This is a NULL value */
847       return -1;
848     } else {
849       /* If NULL marker is not '0', it can be only '1'  */
850       if (*nullp != 1) return 1;
851     }
852   }
853 
854   Rdb_field_packing *fpi = &m_pack_info[part_num];
855   DBUG_ASSERT(table_arg->s != nullptr);
856 
857   bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
858                            (table_arg->s->primary_key == MAX_INDEXES);
859   Field *field = nullptr;
860   if (!is_hidden_pk_part) {
861     field = fpi->get_field_in_table(table_arg);
862   }
863   if ((fpi->m_skip_func)(fpi, field, reader)) {
864     return 1;
865   }
866   return 0;
867 }
868 
869 /**
870   Get a mem-comparable form of Primary Key from mem-comparable form of this key
871 
872   @param
873     pk_descr        Primary Key descriptor
874     key             Index tuple from this key in mem-comparable form
875     pk_buffer  OUT  Put here mem-comparable form of the Primary Key.
876 
877   @note
878     It may or may not be possible to restore primary key columns to their
879     mem-comparable form.  To handle all cases, this function copies mem-
880     comparable forms directly.
881 
882     RocksDB SE supports "Extended keys". This means that PK columns are present
883     at the end of every key.  If the key already includes PK columns, then
884     these columns are not present at the end of the key.
885 
886     Because of the above, we copy each primary key column.
887 
888   @todo
889     If we checked crc32 checksums in this function, we would catch some CRC
890     violations that we currently don't. On the other hand, there is a broader
891     set of queries for which we would check the checksum twice.
892 */
893 
894 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
895                                         const Rdb_key_def &pk_descr,
896                                         const rocksdb::Slice *const key,
897                                         uchar *const pk_buffer) const {
898   DBUG_ASSERT(table != nullptr);
899   DBUG_ASSERT(key != nullptr);
900   DBUG_ASSERT(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
901   DBUG_ASSERT(pk_buffer);
902 
903   uint size = 0;
904   uchar *buf = pk_buffer;
905   DBUG_ASSERT(m_pk_key_parts);
906 
907   /* Put the PK number */
908   rdb_netbuf_store_index(buf, pk_descr.m_index_number);
909   buf += INDEX_NUMBER_SIZE;
910   size += INDEX_NUMBER_SIZE;
911 
912   const char *start_offs[MAX_REF_PARTS];
913   const char *end_offs[MAX_REF_PARTS];
914   int pk_key_part;
915   uint i;
916   Rdb_string_reader reader(key);
917 
918   // Skip the index number
919   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
920 
921   for (i = 0; i < m_key_parts; i++) {
922     if ((pk_key_part = m_pk_part_no[i]) != -1) {
923       start_offs[pk_key_part] = reader.get_current_ptr();
924     }
925 
926     if (read_memcmp_key_part(table, &reader, i) > 0) {
927       return RDB_INVALID_KEY_LEN;
928     }
929 
930     if (pk_key_part != -1) {
931       end_offs[pk_key_part] = reader.get_current_ptr();
932     }
933   }
934 
935   for (i = 0; i < m_pk_key_parts; i++) {
936     const uint part_size = end_offs[i] - start_offs[i];
937     memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
938     buf += part_size;
939     size += part_size;
940   }
941 
942   return size;
943 }
944 
945 /**
946   Get a mem-comparable form of Secondary Key from mem-comparable form of this
947   key, without the extended primary key tail.
948 
949   @param
950     key                Index tuple from this key in mem-comparable form
951     sk_buffer     OUT  Put here mem-comparable form of the Secondary Key.
952     n_null_fields OUT  Put number of null fields contained within sk entry
953 */
954 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
955                                       const rocksdb::Slice &key,
956                                       uchar *sk_buffer,
957                                       uint *n_null_fields) const {
958   DBUG_ASSERT(table != nullptr);
959   DBUG_ASSERT(sk_buffer != nullptr);
960   DBUG_ASSERT(n_null_fields != nullptr);
961   DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
962 
963   uchar *buf = sk_buffer;
964 
965   int res;
966   Rdb_string_reader reader(&key);
967   const char *start = reader.get_current_ptr();
968 
969   // Skip the index number
970   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
971 
972   for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
973     if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
974       return RDB_INVALID_KEY_LEN;
975     } else if (res == -1) {
976       (*n_null_fields)++;
977     }
978   }
979 
980   uint sk_memcmp_len = reader.get_current_ptr() - start;
981   memcpy(buf, start, sk_memcmp_len);
982   return sk_memcmp_len;
983 }
984 
985 /**
986   Convert index tuple into storage (i.e. mem-comparable) format
987 
988   @detail
989     Currently this is done by unpacking into table->record[0] and then
990     packing index columns into storage format.
991 
992   @param pack_buffer Temporary area for packing varchar columns. Its
993                      size is at least max_storage_fmt_length() bytes.
994 */
995 
996 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
997                                    uchar *const packed_tuple,
998                                    const uchar *const key_tuple,
999                                    const key_part_map &keypart_map) const {
1000   DBUG_ASSERT(tbl != nullptr);
1001   DBUG_ASSERT(pack_buffer != nullptr);
1002   DBUG_ASSERT(packed_tuple != nullptr);
1003   DBUG_ASSERT(key_tuple != nullptr);
1004 
1005   /* We were given a record in KeyTupleFormat. First, save it to record */
1006   const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
1007   key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
1008 
1009   uint n_used_parts = my_count_bits(keypart_map);
1010   if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0;  // Full key is used
1011 
1012   /* Then, convert the record into a mem-comparable form */
1013   return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
1014                      false, 0, n_used_parts);
1015 }
1016 
1017 /**
1018   @brief
1019     Check if "unpack info" data includes checksum.
1020 
1021   @detail
1022     This is used only by CHECK TABLE to count the number of rows that have
1023     checksums.
1024 */
1025 
1026 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
1027   size_t size = unpack_info.size();
1028   if (size == 0) {
1029     return false;
1030   }
1031   const uchar *ptr = (const uchar *)unpack_info.data();
1032 
1033   // Skip unpack info if present.
1034   if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
1035     const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
1036     SHIP_ASSERT(size >= skip_len);
1037 
1038     size -= skip_len;
1039     ptr += skip_len;
1040   }
1041 
1042   return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
1043 }
1044 
1045 /*
1046   @return Number of bytes that were changed
1047 */
1048 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
1049   DBUG_ASSERT(packed_tuple != nullptr);
1050 
1051   int changed = 0;
1052   uchar *p = packed_tuple + len - 1;
1053   for (; p > packed_tuple; p--) {
1054     changed++;
1055     if (*p != uchar(0xFF)) {
1056       *p = *p + 1;
1057       break;
1058     }
1059     *p = '\0';
1060   }
1061   return changed;
1062 }
1063 
1064 /*
1065   @return Number of bytes that were changed
1066 */
1067 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
1068   DBUG_ASSERT(packed_tuple != nullptr);
1069 
1070   int changed = 0;
1071   uchar *p = packed_tuple + len - 1;
1072   for (; p > packed_tuple; p--) {
1073     changed++;
1074     if (*p != uchar(0x00)) {
1075       *p = *p - 1;
1076       break;
1077     }
1078     *p = 0xFF;
1079   }
1080   return changed;
1081 }
1082 
1083 static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
1084     {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
1085     {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
1086 
1087 /*
1088   @return The length in bytes of the header specified by the given tag
1089 */
1090 size_t Rdb_key_def::get_unpack_header_size(char tag) {
1091   DBUG_ASSERT(is_unpack_data_tag(tag));
1092   return UNPACK_HEADER_SIZES.at(tag);
1093 }
1094 
1095 /*
1096   Get a bitmap indicating which varchar columns must be covered for this
1097   lookup to be covered. If the bitmap is a subset of the covered bitmap, then
1098   the lookup is covered. If it can already be determined that the lookup is
1099   not covered, map->bitmap will be set to null.
1100  */
1101 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
1102   DBUG_ASSERT(map->bitmap == nullptr);
1103   bitmap_init(map, nullptr, MAX_REF_PARTS, false);
1104   uint curr_bitmap_pos = 0;
1105 
1106   // Indicates which columns in the read set might be covered.
1107   MY_BITMAP maybe_covered_bitmap;
1108   bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
1109 
1110   for (uint i = 0; i < m_key_parts; i++) {
1111     if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
1112       continue;
1113     }
1114 
1115     Field *const field = m_pack_info[i].get_field_in_table(table);
1116 
1117     // Columns which are always covered are not stored in the covered bitmap so
1118     // we can ignore them here too.
1119     if (m_pack_info[i].m_covered &&
1120         bitmap_is_set(table->read_set, field->field_index)) {
1121       bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1122       continue;
1123     }
1124 
1125     switch (field->real_type()) {
1126       // This type may be covered depending on the record. If it was requested,
1127       // we require the covered bitmap to have this bit set.
1128       case MYSQL_TYPE_VARCHAR:
1129         if (curr_bitmap_pos < MAX_REF_PARTS) {
1130           if (bitmap_is_set(table->read_set, field->field_index)) {
1131             bitmap_set_bit(map, curr_bitmap_pos);
1132             bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1133           }
1134           curr_bitmap_pos++;
1135         } else {
1136           bitmap_free(&maybe_covered_bitmap);
1137           bitmap_free(map);
1138           return;
1139         }
1140         break;
1141       // This column is a type which is never covered. If it was requested, we
1142       // know this lookup will never be covered.
1143       default:
1144         if (bitmap_is_set(table->read_set, field->field_index)) {
1145           bitmap_free(&maybe_covered_bitmap);
1146           bitmap_free(map);
1147           return;
1148         }
1149         break;
1150     }
1151   }
1152 
1153   // If there are columns which are not covered in the read set, the lookup
1154   // can't be covered.
1155   if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
1156     bitmap_free(map);
1157   }
1158   bitmap_free(&maybe_covered_bitmap);
1159 }
1160 
1161 /*
1162   Return true if for this secondary index
1163   - All of the requested columns are in the index
1164   - All values for columns that are prefix-only indexes are shorter or equal
1165     in length to the prefix
1166  */
1167 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
1168                                 const MY_BITMAP *const lookup_bitmap) const {
1169   DBUG_ASSERT(lookup_bitmap != nullptr);
1170   if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
1171     return false;
1172   }
1173 
1174   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1175 
1176   // Check if this unpack_info has a covered_bitmap
1177   const char *unpack_header = unp_reader.get_current_ptr();
1178   const bool has_covered_unpack_info =
1179       unp_reader.remaining_bytes() &&
1180       unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
1181   if (!has_covered_unpack_info ||
1182       !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
1183     return false;
1184   }
1185 
1186   MY_BITMAP covered_bitmap;
1187   my_bitmap_map covered_bits;
1188   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1189   covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1190                                       sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1191                                       RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1192 
1193   return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
1194 }
1195 
1196 /* Indicates that all key parts can be unpacked to cover a secondary lookup */
1197 bool Rdb_key_def::can_cover_lookup() const {
1198   for (uint i = 0; i < m_key_parts; i++) {
1199     if (!m_pack_info[i].m_covered) return false;
1200   }
1201   return true;
1202 }
1203 
1204 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
1205                                uchar *tuple, uchar *const packed_tuple,
1206                                uchar *const pack_buffer,
1207                                Rdb_string_writer *const unpack_info,
1208                                uint *const n_null_fields) const {
1209   if (field->real_maybe_null()) {
1210     DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
1211     if (field->is_real_null()) {
1212       /* NULL value. store '\0' so that it sorts before non-NULL values */
1213       *tuple++ = 0;
1214       /* That's it, don't store anything else */
1215       if (n_null_fields) (*n_null_fields)++;
1216       return tuple;
1217     } else {
1218       /* Not a NULL value. Store '1' */
1219       *tuple++ = 1;
1220     }
1221   }
1222 
1223   const bool create_unpack_info =
1224       (unpack_info &&  // we were requested to generate unpack_info
1225        pack_info->uses_unpack_info());  // and this keypart uses it
1226   Rdb_pack_field_context pack_ctx(unpack_info);
1227 
1228   // Set the offset for methods which do not take an offset as an argument
1229   DBUG_ASSERT(
1230       is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
1231 
1232   (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
1233 
1234   /* Make "unpack info" to be stored in the value */
1235   if (create_unpack_info) {
1236     (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
1237                                          &pack_ctx);
1238   }
1239 
1240   return tuple;
1241 }
1242 
1243 /**
1244   Get index columns from the record and pack them into mem-comparable form.
1245 
1246   @param
1247     tbl                   Table we're working on
1248     record           IN   Record buffer with fields in table->record format
1249     pack_buffer      IN   Temporary area for packing varchars. The size is
1250                           at least max_storage_fmt_length() bytes.
1251     packed_tuple     OUT  Key in the mem-comparable form
1252     unpack_info      OUT  Unpack data
1253     unpack_info_len  OUT  Unpack data length
1254     n_key_parts           Number of keyparts to process. 0 means all of them.
1255     n_null_fields    OUT  Number of key fields with NULL value.
1256     ttl_bytes        IN   Previous ttl bytes from old record for update case or
1257                           current ttl bytes from just packed primary key/value
1258   @detail
1259     Some callers do not need the unpack information, they can pass
1260     unpack_info=nullptr, unpack_info_len=nullptr.
1261 
1262   @return
1263     Length of the packed tuple
1264 */
1265 
1266 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
1267                               const uchar *const record,
1268                               uchar *const packed_tuple,
1269                               Rdb_string_writer *const unpack_info,
1270                               const bool should_store_row_debug_checksums,
1271                               const longlong hidden_pk_id, uint n_key_parts,
1272                               uint *const n_null_fields,
1273                               const char *const ttl_bytes) const {
1274   DBUG_ASSERT(tbl != nullptr);
1275   DBUG_ASSERT(pack_buffer != nullptr);
1276   DBUG_ASSERT(record != nullptr);
1277   DBUG_ASSERT(packed_tuple != nullptr);
1278   // Checksums for PKs are made when record is packed.
1279   // We should never attempt to make checksum just from PK values
1280   DBUG_ASSERT_IMP(should_store_row_debug_checksums,
1281                   (m_index_type == INDEX_TYPE_SECONDARY));
1282 
1283   uchar *tuple = packed_tuple;
1284   size_t unpack_start_pos = size_t(-1);
1285   size_t unpack_len_pos = size_t(-1);
1286   size_t covered_bitmap_pos = size_t(-1);
1287   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1288 
1289   rdb_netbuf_store_index(tuple, m_index_number);
1290   tuple += INDEX_NUMBER_SIZE;
1291 
1292   // If n_key_parts is 0, it means all columns.
1293   // The following includes the 'extended key' tail.
1294   // The 'extended key' includes primary key. This is done to 'uniqify'
1295   // non-unique indexes
1296   const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1297 
1298   // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1299   // hidden key part.  So we skip it (its always 1 part).
1300   if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
1301     n_key_parts = m_key_parts - 1;
1302   } else if (use_all_columns) {
1303     n_key_parts = m_key_parts;
1304   }
1305 
1306   if (n_null_fields) *n_null_fields = 0;
1307 
1308   // Check if we need a covered bitmap. If it is certain that all key parts are
1309   // covering, we don't need one.
1310   bool store_covered_bitmap = false;
1311   if (unpack_info && use_covered_bitmap_format()) {
1312     for (uint i = 0; i < n_key_parts; i++) {
1313       if (!m_pack_info[i].m_covered) {
1314         store_covered_bitmap = true;
1315         break;
1316       }
1317     }
1318   }
1319 
1320   const char tag =
1321       store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1322 
1323   if (unpack_info) {
1324     unpack_info->clear();
1325 
1326     if (m_index_type == INDEX_TYPE_SECONDARY &&
1327         m_total_index_flags_length > 0) {
1328       // Reserve space for index flag fields
1329       unpack_info->allocate(m_total_index_flags_length);
1330 
1331       // Insert TTL timestamp
1332       if (has_ttl() && ttl_bytes) {
1333         write_index_flag_field(unpack_info,
1334                                reinterpret_cast<const uchar *>(ttl_bytes),
1335                                Rdb_key_def::TTL_FLAG);
1336       }
1337     }
1338 
1339     unpack_start_pos = unpack_info->get_current_pos();
1340     unpack_info->write_uint8(tag);
1341     unpack_len_pos = unpack_info->get_current_pos();
1342     // we don't know the total length yet, so write a zero
1343     unpack_info->write_uint16(0);
1344 
1345     if (store_covered_bitmap) {
1346       // Reserve two bytes for the covered bitmap. This will store, for key
1347       // parts which are not always covering, whether or not it is covering
1348       // for this record.
1349       covered_bitmap_pos = unpack_info->get_current_pos();
1350       unpack_info->write_uint16(0);
1351     }
1352   }
1353 
1354   MY_BITMAP covered_bitmap;
1355   my_bitmap_map covered_bits;
1356   uint curr_bitmap_pos = 0;
1357   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1358 
1359   for (uint i = 0; i < n_key_parts; i++) {
1360     // Fill hidden pk id into the last key part for secondary keys for tables
1361     // with no pk
1362     if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1363       m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1364       break;
1365     }
1366 
1367     Field *const field = m_pack_info[i].get_field_in_table(tbl);
1368     DBUG_ASSERT(field != nullptr);
1369 
1370     uint field_offset = field->ptr - tbl->record[0];
1371     uint null_offset = field->null_offset(tbl->record[0]);
1372     bool maybe_null = field->real_maybe_null();
1373 
1374     field->move_field(
1375         const_cast<uchar *>(record) + field_offset,
1376         maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
1377         field->null_bit);
1378     // WARNING! Don't return without restoring field->ptr and field->null_ptr
1379 
1380     tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1381                        unpack_info, n_null_fields);
1382 
1383     // If this key part is a prefix of a VARCHAR field, check if it's covered.
1384     if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1385         !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1386       size_t data_length = field->data_length();
1387       uint16 key_length;
1388       if (m_pk_part_no[i] == (uint)-1) {
1389         key_length = tbl->key_info[get_keyno()].key_part[i].length;
1390       } else {
1391         key_length =
1392             tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1393       }
1394 
1395       if (m_pack_info[i].m_unpack_func != nullptr &&
1396           data_length <= key_length) {
1397         bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1398       }
1399       curr_bitmap_pos++;
1400     }
1401 
1402     // Restore field->ptr and field->null_ptr
1403     field->move_field(tbl->record[0] + field_offset,
1404                       maybe_null ? tbl->record[0] + null_offset : nullptr,
1405                       field->null_bit);
1406   }
1407 
1408   if (unpack_info) {
1409     const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1410     DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
1411 
1412     // Don't store the unpack_info if it has only the header (that is, there's
1413     // no meaningful content).
1414     // Primary Keys are special: for them, store the unpack_info even if it's
1415     // empty (provided m_maybe_unpack_info==true, see
1416     // ha_rocksdb::convert_record_to_storage_format)
1417     if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1418       if (len == get_unpack_header_size(tag) && !covered_bits) {
1419         unpack_info->truncate(unpack_start_pos);
1420       } else if (store_covered_bitmap) {
1421         unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1422       }
1423     } else {
1424       unpack_info->write_uint16_at(unpack_len_pos, len);
1425     }
1426 
1427     //
1428     // Secondary keys have key and value checksums in the value part
1429     // Primary key is a special case (the value part has non-indexed columns),
1430     // so the checksums are computed and stored by
1431     // ha_rocksdb::convert_record_to_storage_format
1432     //
1433     if (should_store_row_debug_checksums) {
1434       const uint32_t key_crc32 = crc32(0, packed_tuple, tuple - packed_tuple);
1435       const uint32_t val_crc32 =
1436           crc32(0, unpack_info->ptr(), unpack_info->get_current_pos());
1437 
1438       unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1439       unpack_info->write_uint32(key_crc32);
1440       unpack_info->write_uint32(val_crc32);
1441     }
1442   }
1443 
1444   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1445 
1446   return tuple - packed_tuple;
1447 }
1448 
1449 /**
1450   Pack the hidden primary key into mem-comparable form.
1451 
1452   @param
1453     tbl                   Table we're working on
1454     hidden_pk_id     IN   New value to be packed into key
1455     packed_tuple     OUT  Key in the mem-comparable form
1456 
1457   @return
1458     Length of the packed tuple
1459 */
1460 
1461 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
1462                                  uchar *const packed_tuple) const {
1463   DBUG_ASSERT(packed_tuple != nullptr);
1464 
1465   uchar *tuple = packed_tuple;
1466   rdb_netbuf_store_index(tuple, m_index_number);
1467   tuple += INDEX_NUMBER_SIZE;
1468   DBUG_ASSERT(m_key_parts == 1);
1469   DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
1470                                    m_pack_info[0].m_max_image_len));
1471 
1472   m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1473 
1474   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1475   return tuple - packed_tuple;
1476 }
1477 
1478 /*
1479   Function of type rdb_index_field_pack_t
1480 */
1481 
1482 void Rdb_key_def::pack_with_make_sort_key(
1483     Rdb_field_packing *const fpi, Field *const field,
1484     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1485     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1486   DBUG_ASSERT(fpi != nullptr);
1487   DBUG_ASSERT(field != nullptr);
1488   DBUG_ASSERT(dst != nullptr);
1489   DBUG_ASSERT(*dst != nullptr);
1490 
1491   const int max_len = fpi->m_max_image_len;
1492   MY_BITMAP*old_map;
1493 
1494   old_map= dbug_tmp_use_all_columns(field->table,
1495                                     &field->table->read_set);
1496   field->sort_string(*dst, max_len);
1497   dbug_tmp_restore_column_map(&field->table->read_set, old_map);
1498   *dst += max_len;
1499 }
1500 
1501 /*
1502   Compares two keys without unpacking
1503 
1504   @detail
1505   @return
1506     0 - Ok. column_index is the index of the first column which is different.
1507           -1 if two kes are equal
1508     1 - Data format error.
1509 */
1510 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
1511                               const rocksdb::Slice *key2,
1512                               std::size_t *const column_index) const {
1513   DBUG_ASSERT(key1 != nullptr);
1514   DBUG_ASSERT(key2 != nullptr);
1515   DBUG_ASSERT(column_index != nullptr);
1516 
1517   // the caller should check the return value and
1518   // not rely on column_index being valid
1519   *column_index = 0xbadf00d;
1520 
1521   Rdb_string_reader reader1(key1);
1522   Rdb_string_reader reader2(key2);
1523 
1524   // Skip the index number
1525   if ((!reader1.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1526 
1527   if ((!reader2.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1528 
1529   for (uint i = 0; i < m_key_parts; i++) {
1530     const Rdb_field_packing *const fpi = &m_pack_info[i];
1531     if (fpi->m_maybe_null) {
1532       const auto nullp1 = reader1.read(1);
1533       const auto nullp2 = reader2.read(1);
1534 
1535       if (nullp1 == nullptr || nullp2 == nullptr) {
1536         return HA_EXIT_FAILURE;
1537       }
1538 
1539       if (*nullp1 != *nullp2) {
1540         *column_index = i;
1541         return HA_EXIT_SUCCESS;
1542       }
1543 
1544       if (*nullp1 == 0) {
1545         /* This is a NULL value */
1546         continue;
1547       }
1548     }
1549 
1550     const auto before_skip1 = reader1.get_current_ptr();
1551     const auto before_skip2 = reader2.get_current_ptr();
1552     DBUG_ASSERT(fpi->m_skip_func);
1553     if ((fpi->m_skip_func)(fpi, nullptr, &reader1)) {
1554       return HA_EXIT_FAILURE;
1555     }
1556     if ((fpi->m_skip_func)(fpi, nullptr, &reader2)) {
1557       return HA_EXIT_FAILURE;
1558     }
1559     const auto size1 = reader1.get_current_ptr() - before_skip1;
1560     const auto size2 = reader2.get_current_ptr() - before_skip2;
1561     if (size1 != size2) {
1562       *column_index = i;
1563       return HA_EXIT_SUCCESS;
1564     }
1565 
1566     if (memcmp(before_skip1, before_skip2, size1) != 0) {
1567       *column_index = i;
1568       return HA_EXIT_SUCCESS;
1569     }
1570   }
1571 
1572   *column_index = m_key_parts;
1573   return HA_EXIT_SUCCESS;
1574 }
1575 
1576 /*
1577   @brief
1578     Given a zero-padded key, determine its real key length
1579 
1580   @detail
1581     Fixed-size skip functions just read.
1582 */
1583 
1584 size_t Rdb_key_def::key_length(const TABLE *const table,
1585                                const rocksdb::Slice &key) const {
1586   DBUG_ASSERT(table != nullptr);
1587 
1588   Rdb_string_reader reader(&key);
1589 
1590   if ((!reader.read(INDEX_NUMBER_SIZE))) {
1591     return size_t(-1);
1592   }
1593   for (uint i = 0; i < m_key_parts; i++) {
1594     const Rdb_field_packing *fpi = &m_pack_info[i];
1595     const Field *field = nullptr;
1596     if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY) {
1597       field = fpi->get_field_in_table(table);
1598     }
1599     if ((fpi->m_skip_func)(fpi, field, &reader)) {
1600       return size_t(-1);
1601     }
1602   }
1603   return key.size() - reader.remaining_bytes();
1604 }
1605 
1606 /*
1607   Take mem-comparable form and unpack_info and unpack it to Table->record
1608 
1609   @detail
1610     not all indexes support this
1611 
1612   @return
1613     HA_EXIT_SUCCESS    OK
1614     other              HA_ERR error code
1615 */
1616 
1617 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
1618                                const rocksdb::Slice *const packed_key,
1619                                const rocksdb::Slice *const unpack_info,
1620                                const bool verify_row_debug_checksums) const {
1621   Rdb_string_reader reader(packed_key);
1622   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1623 
1624   // There is no checksuming data after unpack_info for primary keys, because
1625   // the layout there is different. The checksum is verified in
1626   // ha_rocksdb::convert_record_from_storage_format instead.
1627   DBUG_ASSERT_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
1628                   !verify_row_debug_checksums);
1629 
1630   // Skip the index number
1631   if ((!reader.read(INDEX_NUMBER_SIZE))) {
1632     return HA_ERR_ROCKSDB_CORRUPT_DATA;
1633   }
1634 
1635   // For secondary keys, we expect the value field to contain index flags,
1636   // unpack data, and checksum data in that order. One or all can be missing,
1637   // but they cannot be reordered.
1638   if (unp_reader.remaining_bytes()) {
1639     if (m_index_type == INDEX_TYPE_SECONDARY &&
1640         m_total_index_flags_length > 0 &&
1641         !unp_reader.read(m_total_index_flags_length)) {
1642       return HA_ERR_ROCKSDB_CORRUPT_DATA;
1643     }
1644   }
1645 
1646   const char *unpack_header = unp_reader.get_current_ptr();
1647   bool has_unpack_info =
1648       unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
1649   if (has_unpack_info) {
1650     if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
1651       return HA_ERR_ROCKSDB_CORRUPT_DATA;
1652     }
1653   }
1654 
1655   // Read the covered bitmap
1656   MY_BITMAP covered_bitmap;
1657   my_bitmap_map covered_bits;
1658   bool has_covered_bitmap =
1659       has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
1660   if (has_covered_bitmap) {
1661     bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1662     covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1663                                         sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1664                                         RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1665   }
1666 
1667   int err = HA_EXIT_SUCCESS;
1668 
1669 
1670   Rdb_key_field_iterator iter(
1671       this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
1672       has_covered_bitmap ? &covered_bitmap : nullptr, buf);
1673   while (iter.has_next()) {
1674     err = iter.next();
1675     if (err) {
1676       return err;
1677     }
1678   }
1679 
1680   /*
1681     Check checksum values if present
1682   */
1683   const char *ptr;
1684   if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
1685     if (verify_row_debug_checksums) {
1686       uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
1687           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1688       const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
1689           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1690 
1691       const uint32_t computed_key_chksum =
1692           crc32(0, (const uchar *)packed_key->data(), packed_key->size());
1693       const uint32_t computed_val_chksum =
1694           crc32(0, (const uchar *)unpack_info->data(),
1695                 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1696 
1697       DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
1698                       stored_key_chksum++;);
1699 
1700       if (stored_key_chksum != computed_key_chksum) {
1701         report_checksum_mismatch(true, packed_key->data(), packed_key->size());
1702         return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1703       }
1704 
1705       if (stored_val_chksum != computed_val_chksum) {
1706         report_checksum_mismatch(false, unpack_info->data(),
1707                                  unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1708         return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1709       }
1710     } else {
1711       /* The checksums are present but we are not checking checksums */
1712     }
1713   }
1714 
1715   if (reader.remaining_bytes()) return HA_ERR_ROCKSDB_CORRUPT_DATA;
1716 
1717   return HA_EXIT_SUCCESS;
1718 }
1719 
1720 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
1721   return table->s->primary_key == MAX_INDEXES;
1722 }
1723 
1724 void Rdb_key_def::report_checksum_mismatch(const bool is_key,
1725                                            const char *const data,
1726                                            const size_t data_size) const {
1727   // NO_LINT_DEBUG
1728   sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
1729                   is_key ? "key" : "value", get_index_number());
1730 
1731   const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
1732   // NO_LINT_DEBUG
1733   sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
1734                   (uint64_t)data_size, buf.c_str());
1735 
1736   my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1737 }
1738 
1739 bool Rdb_key_def::index_format_min_check(const int pk_min,
1740                                          const int sk_min) const {
1741   switch (m_index_type) {
1742     case INDEX_TYPE_PRIMARY:
1743     case INDEX_TYPE_HIDDEN_PRIMARY:
1744       return (m_kv_format_version >= pk_min);
1745     case INDEX_TYPE_SECONDARY:
1746       return (m_kv_format_version >= sk_min);
1747     default:
1748       DBUG_ASSERT(0);
1749       return false;
1750   }
1751 }
1752 
1753 ///////////////////////////////////////////////////////////////////////////////////////////
1754 // Rdb_field_packing
1755 ///////////////////////////////////////////////////////////////////////////////////////////
1756 
1757 /*
1758   Function of type rdb_index_field_skip_t
1759 */
1760 
1761 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
1762                                  const Field *const field
1763                                      MY_ATTRIBUTE((__unused__)),
1764                                  Rdb_string_reader *const reader) {
1765   if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
1766   return HA_EXIT_SUCCESS;
1767 }
1768 
1769 /*
1770   (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1771   split in the middle of an UTF-8 character. See the implementation of
1772   unpack_binary_or_utf8_varchar.
1773 */
1774 #define RDB_ESCAPE_LENGTH 9
1775 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
1776 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1777               "RDB_ESCAPE_LENGTH-1 must be even.");
1778 
1779 #define RDB_ENCODED_SIZE(len)                                   \
1780   ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
1781       RDB_ESCAPE_LENGTH
1782 
1783 #define RDB_LEGACY_ENCODED_SIZE(len)                                          \
1784   ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
1785       RDB_LEGACY_ESCAPE_LENGTH
1786 
1787 /*
1788   Function of type rdb_index_field_skip_t
1789 */
1790 
1791 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
1792                                       const Field *const field,
1793                                       Rdb_string_reader *const reader) {
1794   const uchar *ptr;
1795   bool finished = false;
1796 
1797   size_t dst_len; /* How much data can be there */
1798   if (field) {
1799     const Field_varstring *const field_var =
1800         static_cast<const Field_varstring *>(field);
1801     dst_len = field_var->pack_length() - field_var->length_bytes;
1802   } else {
1803     dst_len = UINT_MAX;
1804   }
1805 
1806   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
1807 
1808   /* Decode the length-emitted encoding here */
1809   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1810     uint used_bytes;
1811 
1812     /* See pack_with_varchar_encoding. */
1813     if (use_legacy_format) {
1814       used_bytes = calc_unpack_legacy_variable_format(
1815           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1816     } else {
1817       used_bytes =
1818           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1819     }
1820 
1821     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
1822       return HA_EXIT_FAILURE;  // Corruption in the data
1823     }
1824 
1825     if (finished) {
1826       break;
1827     }
1828 
1829     dst_len -= used_bytes;
1830   }
1831 
1832   if (!finished) {
1833     return HA_EXIT_FAILURE;
1834   }
1835 
1836   return HA_EXIT_SUCCESS;
1837 }
1838 
1839 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1840 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1841 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1842 
1843 /*
1844   Skip a keypart that uses Variable-Length Space-Padded encoding
1845 */
1846 
1847 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
1848                                          const Field *const field,
1849                                          Rdb_string_reader *const reader) {
1850   const uchar *ptr;
1851   bool finished = false;
1852 
1853   size_t dst_len = UINT_MAX; /* How much data can be there */
1854 
1855   if (field) {
1856     const Field_varstring *const field_var =
1857         static_cast<const Field_varstring *>(field);
1858     dst_len = field_var->pack_length() - field_var->length_bytes;
1859   }
1860 
1861   /* Decode the length-emitted encoding here */
1862   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1863     // See pack_with_varchar_space_pad
1864     const uchar c = ptr[fpi->m_segment_size - 1];
1865     if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1866       // This is the last segment
1867       finished = true;
1868       break;
1869     } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1870                c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1871       // This is not the last segment
1872       if ((fpi->m_segment_size - 1) > dst_len) {
1873         // The segment is full of data but the table field can't hold that
1874         // much! This must be data corruption.
1875         return HA_EXIT_FAILURE;
1876       }
1877       dst_len -= (fpi->m_segment_size - 1);
1878     } else {
1879       // Encountered a value that's none of the VARCHAR_CMP* constants
1880       // It's data corruption.
1881       return HA_EXIT_FAILURE;
1882     }
1883   }
1884   return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1885 }
1886 
1887 /*
1888   Function of type rdb_index_field_unpack_t
1889 */
1890 
1891 int Rdb_key_def::unpack_integer(
1892     Rdb_field_packing *const fpi, Field *const field, uchar *const to,
1893     Rdb_string_reader *const reader,
1894     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
1895   const int length = fpi->m_max_image_len;
1896 
1897   const uchar *from;
1898   if (!(from = (const uchar *)reader->read(length))) {
1899     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1900   }
1901 
1902 #ifdef WORDS_BIGENDIAN
1903   {
1904     if (static_cast<Field_num *>(field)->unsigned_flag) {
1905       to[0] = from[0];
1906     } else {
1907       to[0] = static_cast<char>(from[0] ^ 128);  // Reverse the sign bit.
1908     }
1909     memcpy(to + 1, from + 1, length - 1);
1910   }
1911 #else
1912   {
1913     const int sign_byte = from[0];
1914     if (static_cast<Field_num *>(field)->unsigned_flag) {
1915       to[length - 1] = sign_byte;
1916     } else {
1917       to[length - 1] =
1918           static_cast<char>(sign_byte ^ 128);  // Reverse the sign bit.
1919     }
1920     for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
1921   }
1922 #endif
1923   return UNPACK_SUCCESS;
1924 }
1925 
1926 #if !defined(WORDS_BIGENDIAN)
1927 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1928 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1929   // A few systems store the most-significant _word_ first on little-endian
1930   dst[0] = src[3];
1931   dst[1] = src[2];
1932   dst[2] = src[1];
1933   dst[3] = src[0];
1934   dst[4] = src[7];
1935   dst[5] = src[6];
1936   dst[6] = src[5];
1937   dst[7] = src[4];
1938 #else
1939   dst[0] = src[7];
1940   dst[1] = src[6];
1941   dst[2] = src[5];
1942   dst[3] = src[4];
1943   dst[4] = src[3];
1944   dst[5] = src[2];
1945   dst[6] = src[1];
1946   dst[7] = src[0];
1947 #endif
1948 }
1949 
1950 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1951   dst[0] = src[3];
1952   dst[1] = src[2];
1953   dst[2] = src[1];
1954   dst[3] = src[0];
1955 }
1956 #else
1957 #define rdb_swap_double_bytes nullptr
1958 #define rdb_swap_float_bytes nullptr
1959 #endif
1960 
1961 int Rdb_key_def::unpack_floating_point(
1962     uchar *const dst, Rdb_string_reader *const reader, const size_t size,
1963     const int exp_digit, const uchar *const zero_pattern,
1964     const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
1965   const uchar *const from = (const uchar *)reader->read(size);
1966   if (from == nullptr) {
1967     /* Mem-comparable image doesn't have enough bytes */
1968     return UNPACK_FAILURE;
1969   }
1970 
1971   /* Check to see if the value is zero */
1972   if (memcmp(from, zero_pattern, size) == 0) {
1973     memcpy(dst, zero_val, size);
1974     return UNPACK_SUCCESS;
1975   }
1976 
1977 #if defined(WORDS_BIGENDIAN)
1978   // On big-endian, output can go directly into result
1979   uchar *const tmp = dst;
1980 #else
1981   // Otherwise use a temporary buffer to make byte-swapping easier later
1982   uchar tmp[8];
1983 #endif
1984 
1985   memcpy(tmp, from, size);
1986 
1987   if (tmp[0] & 0x80) {
1988     // If the high bit is set the original value was positive so
1989     // remove the high bit and subtract one from the exponent.
1990     ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1991     exp_part &= 0x7FFF;                             // clear high bit;
1992     exp_part -= (ushort)1 << (16 - 1 - exp_digit);  // subtract from exponent
1993     tmp[0] = (uchar)(exp_part >> 8);
1994     tmp[1] = (uchar)exp_part;
1995   } else {
1996     // Otherwise the original value was negative and all bytes have been
1997     // negated.
1998     for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
1999   }
2000 
2001 #if !defined(WORDS_BIGENDIAN)
2002   // On little-endian, swap the bytes around
2003   swap_func(dst, tmp);
2004 #else
2005   DBUG_ASSERT(swap_func == nullptr);
2006 #endif
2007 
2008   return UNPACK_SUCCESS;
2009 }
2010 
2011 #if !defined(DBL_EXP_DIG)
2012 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
2013 #endif
2014 
2015 /*
2016   Function of type rdb_index_field_unpack_t
2017 
2018   Unpack a double by doing the reverse action of change_double_for_sort
2019   (sql/filesort.cc).  Note that this only works on IEEE values.
2020   Note also that this code assumes that NaN and +/-Infinity are never
2021   allowed in the database.
2022 */
2023 int Rdb_key_def::unpack_double(
2024     Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
2025     Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr,
2026     Rdb_string_reader *const reader,
2027     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2028   static double zero_val = 0.0;
2029   static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
2030 
2031   return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
2032                                zero_pattern, (const uchar *)&zero_val,
2033                                rdb_swap_double_bytes);
2034 }
2035 
2036 #if !defined(FLT_EXP_DIG)
2037 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
2038 #endif
2039 
2040 /*
2041   Function of type rdb_index_field_unpack_t
2042 
2043   Unpack a float by doing the reverse action of Field_float::make_sort_key
2044   (sql/field.cc).  Note that this only works on IEEE values.
2045   Note also that this code assumes that NaN and +/-Infinity are never
2046   allowed in the database.
2047 */
2048 int Rdb_key_def::unpack_float(
2049     Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2050     uchar *const field_ptr, Rdb_string_reader *const reader,
2051     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2052   static float zero_val = 0.0;
2053   static const uchar zero_pattern[4] = {128, 0, 0, 0};
2054 
2055   return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
2056                                zero_pattern, (const uchar *)&zero_val,
2057                                rdb_swap_float_bytes);
2058 }
2059 
2060 /*
2061   Function of type rdb_index_field_unpack_t used to
2062   Unpack by doing the reverse action to Field_newdate::make_sort_key.
2063 */
2064 
2065 int Rdb_key_def::unpack_newdate(
2066     Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2067     uchar *const field_ptr, Rdb_string_reader *const reader,
2068     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2069   const char *from;
2070   DBUG_ASSERT(fpi->m_max_image_len == 3);
2071 
2072   if (!(from = reader->read(3))) {
2073     /* Mem-comparable image doesn't have enough bytes */
2074     return UNPACK_FAILURE;
2075   }
2076 
2077   field_ptr[0] = from[2];
2078   field_ptr[1] = from[1];
2079   field_ptr[2] = from[0];
2080   return UNPACK_SUCCESS;
2081 }
2082 
2083 /*
2084   Function of type rdb_index_field_unpack_t, used to
2085   Unpack the string by copying it over.
2086   This is for BINARY(n) where the value occupies the whole length.
2087 */
2088 
2089 int Rdb_key_def::unpack_binary_str(
2090     Rdb_field_packing *const fpi, Field *const field, uchar *const to,
2091     Rdb_string_reader *const reader,
2092     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2093   const char *from;
2094   if (!(from = reader->read(fpi->m_max_image_len))) {
2095     /* Mem-comparable image doesn't have enough bytes */
2096     return UNPACK_FAILURE;
2097   }
2098 
2099   memcpy(to, from, fpi->m_max_image_len);
2100   return UNPACK_SUCCESS;
2101 }
2102 
2103 /*
2104   Function of type rdb_index_field_unpack_t.
2105   For UTF-8, we need to convert 2-byte wide-character entities back into
2106   UTF8 sequences.
2107 */
2108 
2109 int Rdb_key_def::unpack_utf8_str(
2110     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2111     Rdb_string_reader *const reader,
2112     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2113   my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
2114   const uchar *src;
2115   if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
2116     /* Mem-comparable image doesn't have enough bytes */
2117     return UNPACK_FAILURE;
2118   }
2119 
2120   const uchar *const src_end = src + fpi->m_max_image_len;
2121   uchar *const dst_end = dst + field->pack_length();
2122 
2123   while (src < src_end) {
2124     my_wc_t wc = (src[0] << 8) | src[1];
2125     src += 2;
2126     int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2127     DBUG_ASSERT(res > 0 && res <= 3);
2128     if (res < 0) return UNPACK_FAILURE;
2129     dst += res;
2130   }
2131 
2132   cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
2133                    cset->pad_char);
2134   return UNPACK_SUCCESS;
2135 }
2136 
2137 /*
2138   This is the original algorithm to encode a variable binary field.  It
2139   sets a flag byte every Nth byte.  The flag value is (255 - #pad) where
2140   #pad is the number of padding bytes that were needed (0 if all N-1
2141   bytes were used).
2142 
2143   If N=8 and the field is:
2144   * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2145   * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2146   And the 4 byte string compares as greater than the 3 byte string
2147 
2148   Unfortunately the algorithm has a flaw.  If the input is exactly a
2149   multiple of N-1, an extra N bytes are written.  Since we usually use
2150   N=9, an 8 byte input will generate 18 bytes of output instead of the
2151   9 bytes of output that is optimal.
2152 
2153   See pack_variable_format for the newer algorithm.
2154 */
2155 void Rdb_key_def::pack_legacy_variable_format(
2156     const uchar *src,  // The data to encode
2157     size_t src_len,    // The length of the data to encode
2158     uchar **dst)       // The location to encode the data
2159 {
2160   size_t copy_len;
2161   size_t padding_bytes;
2162   uchar *ptr = *dst;
2163 
2164   do {
2165     copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2166     padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2167     memcpy(ptr, src, copy_len);
2168     ptr += copy_len;
2169     src += copy_len;
2170     // pad with zeros if necessary
2171     if (padding_bytes > 0) {
2172       memset(ptr, 0, padding_bytes);
2173       ptr += padding_bytes;
2174     }
2175 
2176     *(ptr++) = 255 - padding_bytes;
2177 
2178     src_len -= copy_len;
2179   } while (padding_bytes == 0);
2180 
2181   *dst = ptr;
2182 }
2183 
2184 /*
2185   This is the new algorithm.  Similarly to the legacy format the input
2186   is split up into N-1 bytes and a flag byte is used as the Nth byte
2187   in the output.
2188 
2189   - If the previous segment needed any padding the flag is set to the
2190     number of bytes used (0..N-2).  0 is possible in the first segment
2191     if the input is 0 bytes long.
2192   - If no padding was used and there is no more data left in the input
2193     the flag is set to N-1
2194   - If no padding was used and there is still data left in the input the
2195     flag is set to N.
2196 
2197   For N=9, the following input values encode to the specified
2198   outout (where 'X' indicates a byte of the original input):
2199   - 0 bytes  is encoded as 0 0 0 0 0 0 0 0 0
2200   - 1 byte   is encoded as X 0 0 0 0 0 0 0 1
2201   - 2 bytes  is encoded as X X 0 0 0 0 0 0 2
2202   - 7 bytes  is encoded as X X X X X X X 0 7
2203   - 8 bytes  is encoded as X X X X X X X X 8
2204   - 9 bytes  is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2205   - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2206 */
2207 void Rdb_key_def::pack_variable_format(
2208     const uchar *src,  // The data to encode
2209     size_t src_len,    // The length of the data to encode
2210     uchar **dst)       // The location to encode the data
2211 {
2212   uchar *ptr = *dst;
2213 
2214   for (;;) {
2215     // Figure out how many bytes to copy, copy them and adjust pointers
2216     const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2217     memcpy(ptr, src, copy_len);
2218     ptr += copy_len;
2219     src += copy_len;
2220     src_len -= copy_len;
2221 
2222     // Are we at the end of the input?
2223     if (src_len == 0) {
2224       // pad with zeros if necessary;
2225       const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2226       if (padding_bytes > 0) {
2227         memset(ptr, 0, padding_bytes);
2228         ptr += padding_bytes;
2229       }
2230 
2231       // Put the flag byte (0 - N-1) in the output
2232       *(ptr++) = (uchar)copy_len;
2233       break;
2234     }
2235 
2236     // We have more data - put the flag byte (N) in and continue
2237     *(ptr++) = RDB_ESCAPE_LENGTH;
2238   }
2239 
2240   *dst = ptr;
2241 }
2242 
2243 /*
2244   Function of type rdb_index_field_pack_t
2245 */
2246 
2247 void Rdb_key_def::pack_with_varchar_encoding(
2248     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2249     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
2250   const CHARSET_INFO *const charset = field->charset();
2251   Field_varstring *const field_var = (Field_varstring *)field;
2252 
2253   const size_t value_length = (field_var->length_bytes == 1)
2254                                   ? (uint)*field->ptr
2255                                   : uint2korr(field->ptr);
2256   size_t xfrm_len = charset->coll->strnxfrm(
2257       charset, buf, fpi->m_max_image_len, field_var->char_length(),
2258       field_var->ptr + field_var->length_bytes, value_length, 0);
2259 
2260   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2261   if (fpi->m_use_legacy_varbinary_format) {
2262     pack_legacy_variable_format(buf, xfrm_len, dst);
2263   } else {
2264     pack_variable_format(buf, xfrm_len, dst);
2265   }
2266 }
2267 
2268 /*
2269   Compare the string in [buf..buf_end) with a string that is an infinite
2270   sequence of strings in space_xfrm
2271 */
2272 
2273 static int rdb_compare_string_with_spaces(
2274     const uchar *buf, const uchar *const buf_end,
2275     const std::vector<uchar> *const space_xfrm) {
2276   int cmp = 0;
2277   while (buf < buf_end) {
2278     size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2279     if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
2280     buf += bytes;
2281   }
2282   return cmp;
2283 }
2284 
2285 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2286 /*
2287   Pack the data with Variable-Length Space-Padded Encoding.
2288 
2289   The encoding is there to meet two goals:
2290 
2291   Goal#1. Comparison. The SQL standard says
2292 
2293     " If the collation for the comparison has the PAD SPACE characteristic,
2294     for the purposes of the comparison, the shorter value is effectively
2295     extended to the length of the longer by concatenation of <space>s on the
2296     right.
2297 
2298   At the moment, all MySQL collations except one have the PAD SPACE
2299   characteristic.  The exception is the "binary" collation that is used by
2300   [VAR]BINARY columns. (Note that binary collations for specific charsets,
2301   like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2302   the PAD SPACE characteristic).
2303 
2304   Goal#2 is to preserve the number of trailing spaces in the original value.
2305 
2306   This is achieved by using the following encoding:
2307   The key part:
2308   - Stores mem-comparable image of the column
2309   - It is stored in chunks of fpi->m_segment_size bytes (*)
2310     = If the remainder of the chunk is not occupied, it is padded with mem-
2311       comparable image of the space character (cs->pad_char to be precise).
2312   - The last byte of the chunk shows how the rest of column's mem-comparable
2313     image would compare to mem-comparable image of the column extended with
2314     spaces. There are three possible values.
2315      - VARCHAR_CMP_LESS_THAN_SPACES,
2316      - VARCHAR_CMP_EQUAL_TO_SPACES
2317      - VARCHAR_CMP_GREATER_THAN_SPACES
2318 
2319   VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2320   is spaces, or something that sorts as spaces, so there is no reason to store
2321   it).
2322 
2323   Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2324 
2325    'abcd\0'   => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0    ' <VARCHAR_CMP_EQUAL> ]
2326    'abcd'     => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2327    'abcd   '  => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2328    'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2329 
2330   As mentioned above, the last chunk is padded with mem-comparable images of
2331   cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2332 
2333   fpi->m_segment_size depends on the used collation. It is chosen to be such
2334   that no mem-comparable image of space will ever stretch across the segments
2335   (see get_segment_size_from_collation).
2336 
2337   == The value part (aka unpack_info) ==
2338   The value part stores the number of space characters that one needs to add
2339   when unpacking the string.
2340   - If the number is positive, it means add this many spaces at the end
2341   - If the number is negative, it means padding has added extra spaces which
2342     must be removed.
2343 
2344   Storage considerations
2345   - depending on column's max size, the number may occupy 1 or 2 bytes
2346   - the number of spaces that need to be removed is not more than
2347     RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2348     then store it as unsigned.
2349 
2350   @seealso
2351     unpack_binary_or_utf8_varchar_space_pad
2352     unpack_simple_varchar_space_pad
2353     dummy_make_unpack_info
2354     skip_variable_space_pad
2355 */
2356 
2357 void Rdb_key_def::pack_with_varchar_space_pad(
2358     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2359     Rdb_pack_field_context *const pack_ctx) {
2360   Rdb_string_writer *const unpack_info = pack_ctx->writer;
2361   const CHARSET_INFO *const charset = field->charset();
2362   const auto field_var = static_cast<Field_varstring *>(field);
2363 
2364   const size_t value_length = (field_var->length_bytes == 1)
2365                                   ? (uint)*field->ptr
2366                                   : uint2korr(field->ptr);
2367 
2368   const size_t trimmed_len = charset->cset->lengthsp(
2369       charset, (const char *)field_var->ptr + field_var->length_bytes,
2370       value_length);
2371   const size_t xfrm_len = charset->coll->strnxfrm(
2372       charset, buf, fpi->m_max_image_len, field_var->char_length(),
2373       field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2374 
2375   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2376   uchar *const buf_end = buf + xfrm_len;
2377 
2378   size_t encoded_size = 0;
2379   uchar *ptr = *dst;
2380   size_t padding_bytes;
2381   while (true) {
2382     const size_t copy_len =
2383         std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2384     padding_bytes = fpi->m_segment_size - 1 - copy_len;
2385     memcpy(ptr, buf, copy_len);
2386     ptr += copy_len;
2387     buf += copy_len;
2388 
2389     if (padding_bytes) {
2390       memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2391       ptr += padding_bytes;
2392       *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;  // last segment
2393     } else {
2394       // Compare the string suffix with a hypothetical infinite string of
2395       // spaces. It could be that the first difference is beyond the end of
2396       // current chunk.
2397       const int cmp =
2398           rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2399 
2400       if (cmp < 0) {
2401         *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2402       } else if (cmp > 0) {
2403         *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2404       } else {
2405         // It turns out all the rest are spaces.
2406         *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2407       }
2408     }
2409     encoded_size += fpi->m_segment_size;
2410 
2411     if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
2412   }
2413 
2414   // m_unpack_info_stores_value means unpack_info stores the whole original
2415   // value. There is no need to store the number of trimmed/padded endspaces
2416   // in that case.
2417   if (unpack_info && !fpi->m_unpack_info_stores_value) {
2418     // (value_length - trimmed_len) is the number of trimmed space *characters*
2419     // then, padding_bytes is the number of *bytes* added as padding
2420     // then, we add 8, because we don't store negative values.
2421     DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
2422     DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
2423     const size_t removed_chars =
2424         RDB_TRIMMED_CHARS_OFFSET +
2425         (value_length - trimmed_len) / fpi->space_mb_len -
2426         padding_bytes / fpi->space_xfrm_len;
2427 
2428     if (fpi->m_unpack_info_uses_two_bytes) {
2429       unpack_info->write_uint16(removed_chars);
2430     } else {
2431       DBUG_ASSERT(removed_chars < 0x100);
2432       unpack_info->write_uint8(removed_chars);
2433     }
2434   }
2435 
2436   *dst += encoded_size;
2437 }
2438 
2439 /*
2440   Calculate the number of used bytes in the chunk and whether this is the
2441   last chunk in the input.  This is based on the old legacy format - see
2442   pack_legacy_variable_format.
2443  */
2444 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
2445   uint pad = 255 - flag;
2446   uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2447   if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2448     return (uint)-1;
2449   }
2450 
2451   *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2452   return used_bytes;
2453 }
2454 
2455 /*
2456   Calculate the number of used bytes in the chunk and whether this is the
2457   last chunk in the input.  This is based on the new format - see
2458   pack_variable_format.
2459  */
2460 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
2461   // Check for invalid flag values
2462   if (flag > RDB_ESCAPE_LENGTH) {
2463     return (uint)-1;
2464   }
2465 
2466   // Values from 1 to N-1 indicate this is the last chunk and that is how
2467   // many bytes were used
2468   if (flag < RDB_ESCAPE_LENGTH) {
2469     *done = true;
2470     return flag;
2471   }
2472 
2473   // A value of N means we used N-1 bytes and had more to go
2474   *done = false;
2475   return RDB_ESCAPE_LENGTH - 1;
2476 }
2477 
2478 /*
2479   Unpack data that has charset information.  Each two bytes of the input is
2480   treated as a wide-character and converted to its multibyte equivalent in
2481   the output.
2482  */
2483 static int unpack_charset(
2484     const CHARSET_INFO *cset,  // character set information
2485     const uchar *src,          // source data to unpack
2486     uint src_len,              // length of source data
2487     uchar *dst,                // destination of unpacked data
2488     uint dst_len,              // length of destination data
2489     uint *used_bytes)          // output number of bytes used
2490 {
2491   if (src_len & 1) {
2492     /*
2493       UTF-8 characters are encoded into two-byte entities. There is no way
2494       we can have an odd number of bytes after encoding.
2495     */
2496     return UNPACK_FAILURE;
2497   }
2498 
2499   uchar *dst_end = dst + dst_len;
2500   uint used = 0;
2501 
2502   for (uint ii = 0; ii < src_len; ii += 2) {
2503     my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2504     int res = cset->cset->wc_mb(cset, wc, dst + used, dst_end);
2505     DBUG_ASSERT(res > 0 && res <= 3);
2506     if (res < 0) {
2507       return UNPACK_FAILURE;
2508     }
2509 
2510     used += res;
2511   }
2512 
2513   *used_bytes = used;
2514   return UNPACK_SUCCESS;
2515 }
2516 
2517 /*
2518   Function of type rdb_index_field_unpack_t
2519 */
2520 
2521 int Rdb_key_def::unpack_binary_or_utf8_varchar(
2522     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2523     Rdb_string_reader *const reader,
2524     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2525   const uchar *ptr;
2526   size_t len = 0;
2527   bool finished = false;
2528   uchar *d0 = dst;
2529   Field_varstring *const field_var = (Field_varstring *)field;
2530   dst += field_var->length_bytes;
2531   // How much we can unpack
2532   size_t dst_len = field_var->pack_length() - field_var->length_bytes;
2533 
2534   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
2535 
2536   /* Decode the length-emitted encoding here */
2537   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2538     uint used_bytes;
2539 
2540     /* See pack_with_varchar_encoding. */
2541     if (use_legacy_format) {
2542       used_bytes = calc_unpack_legacy_variable_format(
2543           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2544     } else {
2545       used_bytes =
2546           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2547     }
2548 
2549     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2550       return UNPACK_FAILURE;  // Corruption in the data
2551     }
2552 
2553     /*
2554       Now, we need to decode used_bytes of data and append them to the value.
2555     */
2556     if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2557       int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst,
2558                                dst_len, &used_bytes);
2559       if (err != UNPACK_SUCCESS) {
2560         return err;
2561       }
2562     } else {
2563       memcpy(dst, ptr, used_bytes);
2564     }
2565 
2566     dst += used_bytes;
2567     dst_len -= used_bytes;
2568     len += used_bytes;
2569 
2570     if (finished) {
2571       break;
2572     }
2573   }
2574 
2575   if (!finished) {
2576     return UNPACK_FAILURE;
2577   }
2578 
2579   /* Save the length */
2580   if (field_var->length_bytes == 1) {
2581     d0[0] = (uchar)len;
2582   } else {
2583     DBUG_ASSERT(field_var->length_bytes == 2);
2584     int2store(d0, len);
2585   }
2586   return UNPACK_SUCCESS;
2587 }
2588 
2589 /*
2590   @seealso
2591     pack_with_varchar_space_pad - packing function
2592     unpack_simple_varchar_space_pad - unpacking function for 'simple'
2593     charsets.
2594     skip_variable_space_pad - skip function
2595 */
2596 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
2597     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2598     Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2599   const uchar *ptr;
2600   size_t len = 0;
2601   bool finished = false;
2602   Field_varstring *const field_var = static_cast<Field_varstring *>(field);
2603   uchar *d0 = dst;
2604   uchar *dst_end = dst + field_var->pack_length();
2605   dst += field_var->length_bytes;
2606 
2607   uint space_padding_bytes = 0;
2608   uint extra_spaces;
2609   if ((fpi->m_unpack_info_uses_two_bytes
2610            ? unp_reader->read_uint16(&extra_spaces)
2611            : unp_reader->read_uint8(&extra_spaces))) {
2612     return UNPACK_FAILURE;
2613   }
2614 
2615   if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
2616     space_padding_bytes =
2617         -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
2618     extra_spaces = 0;
2619   } else {
2620     extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
2621   }
2622 
2623   space_padding_bytes *= fpi->space_xfrm_len;
2624 
2625   /* Decode the length-emitted encoding here */
2626   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2627     const char last_byte = ptr[fpi->m_segment_size - 1];
2628     size_t used_bytes;
2629     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES)  // this is the last segment
2630     {
2631       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2632         return UNPACK_FAILURE;  // Cannot happen, corrupted data
2633       }
2634       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2635       finished = true;
2636     } else {
2637       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2638           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2639         return UNPACK_FAILURE;  // Invalid value
2640       }
2641       used_bytes = fpi->m_segment_size - 1;
2642     }
2643 
2644     // Now, need to decode used_bytes of data and append them to the value.
2645     if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2646       if (used_bytes & 1) {
2647         /*
2648           UTF-8 characters are encoded into two-byte entities. There is no way
2649           we can have an odd number of bytes after encoding.
2650         */
2651         return UNPACK_FAILURE;
2652       }
2653 
2654       const uchar *src = ptr;
2655       const uchar *const src_end = ptr + used_bytes;
2656       while (src < src_end) {
2657         my_wc_t wc = (src[0] << 8) | src[1];
2658         src += 2;
2659         const CHARSET_INFO *cset = fpi->m_varchar_charset;
2660         int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2661         DBUG_ASSERT(res <= 3);
2662         if (res <= 0) return UNPACK_FAILURE;
2663         dst += res;
2664         len += res;
2665       }
2666     } else {
2667       if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
2668       memcpy(dst, ptr, used_bytes);
2669       dst += used_bytes;
2670       len += used_bytes;
2671     }
2672 
2673     if (finished) {
2674       if (extra_spaces) {
2675         // Both binary and UTF-8 charset store space as ' ',
2676         // so the following is ok:
2677         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2678         memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
2679         len += extra_spaces;
2680       }
2681       break;
2682     }
2683   }
2684 
2685   if (!finished) return UNPACK_FAILURE;
2686 
2687   /* Save the length */
2688   if (field_var->length_bytes == 1) {
2689     d0[0] = (uchar)len;
2690   } else {
2691     DBUG_ASSERT(field_var->length_bytes == 2);
2692     int2store(d0, len);
2693   }
2694   return UNPACK_SUCCESS;
2695 }
2696 
2697 /////////////////////////////////////////////////////////////////////////
2698 
2699 /*
2700   Function of type rdb_make_unpack_info_t
2701 */
2702 
2703 void Rdb_key_def::make_unpack_unknown(
2704     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2705     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2706   pack_ctx->writer->write(field->ptr, field->pack_length());
2707 }
2708 
2709 /*
2710   This point of this function is only to indicate that unpack_info is
2711   available.
2712 
2713   The actual unpack_info data is produced by the function that packs the key,
2714   that is, pack_with_varchar_space_pad.
2715 */
2716 
2717 void Rdb_key_def::dummy_make_unpack_info(
2718     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2719     const Field *field MY_ATTRIBUTE((__unused__)),
2720     Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
2721   // Do nothing
2722 }
2723 
2724 /*
2725   Function of type rdb_index_field_unpack_t
2726 */
2727 
2728 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi,
2729                                 Field *const field, uchar *const dst,
2730                                 Rdb_string_reader *const reader,
2731                                 Rdb_string_reader *const unp_reader) {
2732   const uchar *ptr;
2733   const uint len = fpi->m_unpack_data_len;
2734   // We don't use anything from the key, so skip over it.
2735   if (skip_max_length(fpi, field, reader)) {
2736     return UNPACK_FAILURE;
2737   }
2738 
2739   DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
2740 
2741   if ((ptr = (const uchar *)unp_reader->read(len))) {
2742     memcpy(dst, ptr, len);
2743     return UNPACK_SUCCESS;
2744   }
2745   return UNPACK_FAILURE;
2746 }
2747 
2748 /*
2749   Function of type rdb_make_unpack_info_t
2750 */
2751 
2752 void Rdb_key_def::make_unpack_unknown_varchar(
2753     const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
2754     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2755   const auto f = static_cast<const Field_varstring *>(field);
2756   uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2757   len += f->length_bytes;
2758   pack_ctx->writer->write(field->ptr, len);
2759 }
2760 
2761 /*
2762   Function of type rdb_index_field_unpack_t
2763 
2764   @detail
2765   Unpack a key part in an "unknown" collation from its
2766   (mem_comparable_form, unpack_info) form.
2767 
2768   "Unknown" means we have no clue about how mem_comparable_form is made from
2769   the original string, so we keep the whole original string in the unpack_info.
2770 
2771   @seealso
2772     make_unpack_unknown, unpack_unknown
2773 */
2774 
2775 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
2776                                         Field *const field, uchar *dst,
2777                                         Rdb_string_reader *const reader,
2778                                         Rdb_string_reader *const unp_reader) {
2779   const uchar *ptr;
2780   uchar *const d0 = dst;
2781   const auto f = static_cast<Field_varstring *>(field);
2782   dst += f->length_bytes;
2783   const uint len_bytes = f->length_bytes;
2784   // We don't use anything from the key, so skip over it.
2785   if ((fpi->m_skip_func)(fpi, field, reader)) {
2786     return UNPACK_FAILURE;
2787   }
2788 
2789   DBUG_ASSERT(len_bytes > 0);
2790   DBUG_ASSERT(unp_reader != nullptr);
2791 
2792   if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
2793     memcpy(d0, ptr, len_bytes);
2794     const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
2795     if ((ptr = (const uchar *)unp_reader->read(len))) {
2796       memcpy(dst, ptr, len);
2797       return UNPACK_SUCCESS;
2798     }
2799   }
2800   return UNPACK_FAILURE;
2801 }
2802 
2803 /*
2804   Write unpack_data for a "simple" collation
2805 */
2806 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
2807                                     const Rdb_collation_codec *const codec,
2808                                     const uchar *const src,
2809                                     const size_t src_len) {
2810   for (uint i = 0; i < src_len; i++) {
2811     writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
2812   }
2813 }
2814 
2815 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
2816                                    const Rdb_collation_codec *const codec,
2817                                    const uchar *const src, const size_t src_len,
2818                                    uchar *const dst) {
2819   for (uint i = 0; i < src_len; i++) {
2820     if (codec->m_dec_size[src[i]] > 0) {
2821       uint *ret;
2822       DBUG_ASSERT(reader != nullptr);
2823 
2824       if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
2825         return UNPACK_FAILURE;
2826       }
2827       dst[i] = codec->m_dec_idx[*ret][src[i]];
2828     } else {
2829       dst[i] = codec->m_dec_idx[0][src[i]];
2830     }
2831   }
2832 
2833   return UNPACK_SUCCESS;
2834 }
2835 
2836 /*
2837   Function of type rdb_make_unpack_info_t
2838 
2839   @detail
2840     Make unpack_data for VARCHAR(n) in a "simple" charset.
2841 */
2842 
2843 void Rdb_key_def::make_unpack_simple_varchar(
2844     const Rdb_collation_codec *const codec, const Field *const field,
2845     Rdb_pack_field_context *const pack_ctx) {
2846   const auto f = static_cast<const Field_varstring *>(field);
2847   uchar *const src = f->ptr + f->length_bytes;
2848   const size_t src_len =
2849       f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2850   Rdb_bit_writer bit_writer(pack_ctx->writer);
2851   // The std::min compares characters with bytes, but for simple collations,
2852   // mbmaxlen = 1.
2853   rdb_write_unpack_simple(&bit_writer, codec, src,
2854                           std::min((size_t)f->char_length(), src_len));
2855 }
2856 
2857 /*
2858   Function of type rdb_index_field_unpack_t
2859 
2860   @seealso
2861     pack_with_varchar_space_pad - packing function
2862     unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
2863 */
2864 
2865 int Rdb_key_def::unpack_simple_varchar_space_pad(
2866     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2867     Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2868   const uchar *ptr;
2869   size_t len = 0;
2870   bool finished = false;
2871   uchar *d0 = dst;
2872   const Field_varstring *const field_var =
2873       static_cast<Field_varstring *>(field);
2874   // For simple collations, char_length is also number of bytes.
2875   DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
2876   uchar *dst_end = dst + field_var->pack_length();
2877   dst += field_var->length_bytes;
2878   Rdb_bit_reader bit_reader(unp_reader);
2879 
2880   uint space_padding_bytes = 0;
2881   uint extra_spaces;
2882   DBUG_ASSERT(unp_reader != nullptr);
2883 
2884   if ((fpi->m_unpack_info_uses_two_bytes
2885            ? unp_reader->read_uint16(&extra_spaces)
2886            : unp_reader->read_uint8(&extra_spaces))) {
2887     return UNPACK_FAILURE;
2888   }
2889 
2890   if (extra_spaces <= 8) {
2891     space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2892     extra_spaces = 0;
2893   } else {
2894     extra_spaces -= 8;
2895   }
2896 
2897   space_padding_bytes *= fpi->space_xfrm_len;
2898 
2899   /* Decode the length-emitted encoding here */
2900   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2901     const char last_byte =
2902         ptr[fpi->m_segment_size - 1];  // number of padding bytes
2903     size_t used_bytes;
2904     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2905       // this is the last one
2906       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2907         return UNPACK_FAILURE;  // Cannot happen, corrupted data
2908       }
2909       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2910       finished = true;
2911     } else {
2912       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2913           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2914         return UNPACK_FAILURE;
2915       }
2916       used_bytes = fpi->m_segment_size - 1;
2917     }
2918 
2919     if (dst + used_bytes > dst_end) {
2920       // The value on disk is longer than the field definition allows?
2921       return UNPACK_FAILURE;
2922     }
2923 
2924     uint ret;
2925     if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2926                                       used_bytes, dst)) != UNPACK_SUCCESS) {
2927       return ret;
2928     }
2929 
2930     dst += used_bytes;
2931     len += used_bytes;
2932 
2933     if (finished) {
2934       if (extra_spaces) {
2935         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2936         // pad_char has a 1-byte form in all charsets that
2937         // are handled by rdb_init_collation_mapping.
2938         memset(dst, field_var->charset()->pad_char, extra_spaces);
2939         len += extra_spaces;
2940       }
2941       break;
2942     }
2943   }
2944 
2945   if (!finished) return UNPACK_FAILURE;
2946 
2947   /* Save the length */
2948   if (field_var->length_bytes == 1) {
2949     d0[0] = (uchar)len;
2950   } else {
2951     DBUG_ASSERT(field_var->length_bytes == 2);
2952     int2store(d0, len);
2953   }
2954   return UNPACK_SUCCESS;
2955 }
2956 
2957 /*
2958   Function of type rdb_make_unpack_info_t
2959 
2960   @detail
2961     Make unpack_data for CHAR(n) value in a "simple" charset.
2962     It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2963 
2964   @seealso
2965     The VARCHAR variant is in make_unpack_simple_varchar
2966 */
2967 
2968 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
2969                                      const Field *const field,
2970                                      Rdb_pack_field_context *const pack_ctx) {
2971   const uchar *const src = field->ptr;
2972   Rdb_bit_writer bit_writer(pack_ctx->writer);
2973   rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2974 }
2975 
2976 /*
2977   Function of type rdb_index_field_unpack_t
2978 */
2979 
2980 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi,
2981                                Field *const field MY_ATTRIBUTE((__unused__)),
2982                                uchar *const dst,
2983                                Rdb_string_reader *const reader,
2984                                Rdb_string_reader *const unp_reader) {
2985   const uchar *ptr;
2986   const uint len = fpi->m_max_image_len;
2987   Rdb_bit_reader bit_reader(unp_reader);
2988 
2989   if (!(ptr = (const uchar *)reader->read(len))) {
2990     return UNPACK_FAILURE;
2991   }
2992 
2993   return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2994                                 fpi->m_charset_codec, ptr, len, dst);
2995 }
2996 
2997 // See Rdb_charset_space_info::spaces_xfrm
2998 const int RDB_SPACE_XFRM_SIZE = 32;
2999 
3000 // A class holding information about how space character is represented in a
3001 // charset.
3002 class Rdb_charset_space_info {
3003  public:
3004   Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
3005   Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
3006   Rdb_charset_space_info() = default;
3007 
3008   // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
3009   std::vector<uchar> spaces_xfrm;
3010 
3011   // length(strxfrm(' '))
3012   size_t space_xfrm_len;
3013 
3014   // length of the space character itself
3015   // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
3016   // (length=2)
3017   size_t space_mb_len;
3018 };
3019 
3020 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
3021     rdb_mem_comparable_space;
3022 
3023 /*
3024   @brief
3025   For a given charset, get
3026    - strxfrm('    '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
3027    - length of strxfrm(charset, ' ')
3028    - length of the space character in the charset
3029 
3030   @param cs  IN    Charset to get the space for
3031   @param ptr OUT   A few space characters
3032   @param len OUT   Return length of the space (in bytes)
3033 
3034   @detail
3035     It is tempting to pre-generate mem-comparable form of space character for
3036     every charset on server startup.
3037     One can't do that: some charsets are not initialized until somebody
3038     attempts to use them (e.g. create or open a table that has a field that
3039     uses the charset).
3040 */
3041 
3042 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
3043                                          const std::vector<uchar> **xfrm,
3044                                          size_t *const xfrm_len,
3045                                          size_t *const mb_len) {
3046   DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
3047   if (!rdb_mem_comparable_space[cs->number].get()) {
3048     RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
3049     if (!rdb_mem_comparable_space[cs->number].get()) {
3050       // Upper bound of how many bytes can be occupied by multi-byte form of a
3051       // character in any charset.
3052       const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
3053       DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
3054 
3055       // multi-byte form of the ' ' (space) character
3056       uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
3057 
3058       const size_t space_mb_len = cs->cset->wc_mb(
3059           cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
3060 
3061       // mem-comparable image of the space character
3062       std::array<uchar, 20> space;
3063 
3064       const size_t space_len = cs->coll->strnxfrm(
3065           cs, space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
3066       Rdb_charset_space_info *const info = new Rdb_charset_space_info;
3067       info->space_xfrm_len = space_len;
3068       info->space_mb_len = space_mb_len;
3069       while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
3070         info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
3071                                  space.data() + space_len);
3072       }
3073       rdb_mem_comparable_space[cs->number].reset(info);
3074     }
3075     RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
3076   }
3077 
3078   *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
3079   *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
3080   *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
3081 }
3082 
3083 mysql_mutex_t rdb_mem_cmp_space_mutex;
3084 
3085 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
3086     rdb_collation_data;
3087 mysql_mutex_t rdb_collation_data_mutex;
3088 
3089 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
3090   return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 &&
3091          !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD));
3092 }
3093 
3094 static const Rdb_collation_codec *rdb_init_collation_mapping(
3095     const my_core::CHARSET_INFO *const cs) {
3096   DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
3097   const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
3098 
3099   if (codec == nullptr && rdb_is_collation_supported(cs)) {
3100     RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
3101 
3102     codec = rdb_collation_data[cs->number];
3103     if (codec == nullptr) {
3104       Rdb_collation_codec *cur = nullptr;
3105 
3106       // Compute reverse mapping for simple collations.
3107       if (rdb_is_collation_supported(cs)) {
3108         cur = new Rdb_collation_codec;
3109         std::map<uchar, std::vector<uchar>> rev_map;
3110         size_t max_conflict_size = 0;
3111         for (int src = 0; src < 256; src++) {
3112           uchar dst = cs->sort_order[src];
3113           rev_map[dst].push_back(src);
3114           max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
3115         }
3116         cur->m_dec_idx.resize(max_conflict_size);
3117 
3118         for (auto const &p : rev_map) {
3119           uchar dst = p.first;
3120           for (uint idx = 0; idx < p.second.size(); idx++) {
3121             uchar src = p.second[idx];
3122             uchar bits =
3123                 my_bit_log2(my_round_up_to_next_power(p.second.size()));
3124             cur->m_enc_idx[src] = idx;
3125             cur->m_enc_size[src] = bits;
3126             cur->m_dec_size[dst] = bits;
3127             cur->m_dec_idx[idx][dst] = src;
3128           }
3129         }
3130 
3131         cur->m_make_unpack_info_func = {Rdb_key_def::make_unpack_simple_varchar,
3132                                         Rdb_key_def::make_unpack_simple};
3133         cur->m_unpack_func = {Rdb_key_def::unpack_simple_varchar_space_pad,
3134                               Rdb_key_def::unpack_simple};
3135       } else {
3136         // Out of luck for now.
3137       }
3138 
3139       if (cur != nullptr) {
3140         codec = cur;
3141         cur->m_cs = cs;
3142         rdb_collation_data[cs->number] = cur;
3143       }
3144     }
3145 
3146     RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3147   }
3148 
3149   return codec;
3150 }
3151 
3152 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3153   int ret;
3154   if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN ||
3155       cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) {
3156     /*
3157       In these collations, a character produces one weight, which is 3 bytes.
3158       Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3159       get 3*3+1=10
3160     */
3161     ret = 10;
3162   } else {
3163     /*
3164       All other collations. There are two classes:
3165       - Unicode-based, except for collations mentioned in the if-condition.
3166         For these all weights are 2 bytes long, a character may produce 0..8
3167         weights.
3168         in any case, 8 bytes of payload in the segment guarantee that the last
3169         space character won't span across segments.
3170 
3171       - Collations not based on unicode. These have length(strxfrm(' '))=1,
3172         there nothing to worry about.
3173 
3174       In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3175     */
3176     ret = 9;
3177   }
3178   DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
3179   return ret;
3180 }
3181 
3182 /*
3183   @brief
3184     Setup packing of index field into its mem-comparable form
3185 
3186   @detail
3187     - It is possible produce mem-comparable form for any datatype.
3188     - Some datatypes also allow to unpack the original value from its
3189       mem-comparable form.
3190       = Some of these require extra information to be stored in "unpack_info".
3191         unpack_info is not a part of mem-comparable form, it is only used to
3192         restore the original value
3193 
3194   @param
3195     field  IN  field to be packed/un-packed
3196 
3197   @return
3198     TRUE  -  Field can be read with index-only reads
3199     FALSE -  Otherwise
3200 */
3201 
3202 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3203                               const Field *const field, const uint keynr_arg,
3204                               const uint key_part_arg,
3205                               const uint16 key_length) {
3206   int res = false;
3207   enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3208 
3209   m_keynr = keynr_arg;
3210   m_key_part = key_part_arg;
3211 
3212   m_maybe_null = field ? field->real_maybe_null() : false;
3213   m_unpack_func = nullptr;
3214   m_make_unpack_info_func = nullptr;
3215   m_unpack_data_len = 0;
3216   space_xfrm = nullptr;  // safety
3217   // whether to use legacy format for varchar
3218   m_use_legacy_varbinary_format = false;
3219   // ha_rocksdb::index_flags() will pass key_descr == null to
3220   // see whether field(column) can be read-only reads through return value,
3221   // but the legacy vs. new varchar format doesn't affect return value.
3222   // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
3223   if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3224     m_use_legacy_varbinary_format = true;
3225   }
3226   /* Calculate image length. By default, is is pack_length() */
3227   m_max_image_len =
3228       field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3229   m_skip_func = Rdb_key_def::skip_max_length;
3230   m_pack_func = Rdb_key_def::pack_with_make_sort_key;
3231 
3232   m_covered = false;
3233 
3234   switch (type) {
3235     case MYSQL_TYPE_LONGLONG:
3236     case MYSQL_TYPE_LONG:
3237     case MYSQL_TYPE_INT24:
3238     case MYSQL_TYPE_SHORT:
3239     case MYSQL_TYPE_TINY:
3240       m_unpack_func = Rdb_key_def::unpack_integer;
3241       m_covered = true;
3242       return true;
3243 
3244     case MYSQL_TYPE_DOUBLE:
3245       m_unpack_func = Rdb_key_def::unpack_double;
3246       m_covered = true;
3247       return true;
3248 
3249     case MYSQL_TYPE_FLOAT:
3250       m_unpack_func = Rdb_key_def::unpack_float;
3251       m_covered = true;
3252       return true;
3253 
3254     case MYSQL_TYPE_NEWDECIMAL:
3255     /*
3256       Decimal is packed with Field_new_decimal::make_sort_key, which just
3257       does memcpy.
3258       Unpacking decimal values was supported only after fix for issue#253,
3259       because of that ha_rocksdb::get_storage_type() handles decimal values
3260       in a special way.
3261     */
3262     case MYSQL_TYPE_DATETIME2:
3263     case MYSQL_TYPE_TIMESTAMP2:
3264     /* These are packed with Field_temporal_with_date_and_timef::make_sort_key
3265      */
3266     case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
3267     case MYSQL_TYPE_YEAR:  /* YEAR is packed with  Field_tiny::make_sort_key */
3268       /* Everything that comes here is packed with just a memcpy(). */
3269       m_unpack_func = Rdb_key_def::unpack_binary_str;
3270       m_covered = true;
3271       return true;
3272 
3273     case MYSQL_TYPE_NEWDATE:
3274       /*
3275         This is packed by Field_newdate::make_sort_key. It assumes the data is
3276         3 bytes, and packing is done by swapping the byte order (for both big-
3277         and little-endian)
3278       */
3279       m_unpack_func = Rdb_key_def::unpack_newdate;
3280       m_covered = true;
3281       return true;
3282     case MYSQL_TYPE_TINY_BLOB:
3283     case MYSQL_TYPE_MEDIUM_BLOB:
3284     case MYSQL_TYPE_LONG_BLOB:
3285     case MYSQL_TYPE_BLOB: {
3286       if (key_descr) {
3287         // The my_charset_bin collation is special in that it will consider
3288         // shorter strings sorting as less than longer strings.
3289         //
3290         // See Field_blob::make_sort_key for details.
3291         m_max_image_len =
3292           key_length + (field->charset()->number == COLLATION_BINARY
3293                               ? reinterpret_cast<const Field_blob *>(field)
3294                                     ->pack_length_no_ptr()
3295                               : 0);
3296         // Return false because indexes on text/blob will always require
3297         // a prefix. With a prefix, the optimizer will not be able to do an
3298         // index-only scan since there may be content occuring after the prefix
3299         // length.
3300         return false;
3301       }
3302       break;
3303     }
3304     default:
3305       break;
3306   }
3307 
3308   m_unpack_info_stores_value = false;
3309   /* Handle [VAR](CHAR|BINARY) */
3310 
3311   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3312     /*
3313       For CHAR-based columns, check how strxfrm image will take.
3314       field->field_length = field->char_length() * cs->mbmaxlen.
3315     */
3316     const CHARSET_INFO *cs = field->charset();
3317     m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
3318   }
3319   const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3320   const CHARSET_INFO *cs = field->charset();
3321   // max_image_len before chunking is taken into account
3322   const int max_image_len_before_chunks = m_max_image_len;
3323 
3324   if (is_varchar) {
3325     // The default for varchar is variable-length, without space-padding for
3326     // comparisons
3327     m_varchar_charset = cs;
3328     m_skip_func = Rdb_key_def::skip_variable_length;
3329     m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3330     if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3331       m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3332     } else {
3333       // Calculate the maximum size of the short section plus the
3334       // maximum size of the long section
3335       m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3336     }
3337 
3338     const auto field_var = static_cast<const Field_varstring *>(field);
3339     m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3340   }
3341 
3342   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3343     // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3344     // information about character-based datatypes are compared.
3345     bool use_unknown_collation = false;
3346     DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3347                     use_unknown_collation = true;);
3348 
3349     if (cs->number == COLLATION_BINARY) {
3350       // - SQL layer pads BINARY(N) so that it always is N bytes long.
3351       // - For VARBINARY(N), values may have different lengths, so we're using
3352       //   variable-length encoding. This is also the only charset where the
3353       //   values are not space-padded for comparison.
3354       m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
3355                                  : Rdb_key_def::unpack_binary_str;
3356       res = true;
3357     } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) {
3358       // For _bin collations, mem-comparable form of the string is the string
3359       // itself.
3360 
3361       if (is_varchar) {
3362         // VARCHARs - are compared as if they were space-padded - but are
3363         // not actually space-padded (reading the value back produces the
3364         // original value, without the padding)
3365         m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3366         m_skip_func = Rdb_key_def::skip_variable_space_pad;
3367         m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3368         m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
3369         m_segment_size = get_segment_size_from_collation(cs);
3370         m_max_image_len =
3371             (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3372             m_segment_size;
3373         rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3374                                      &space_mb_len);
3375       } else {
3376         // SQL layer pads CHAR(N) values to their maximum length.
3377         // We just store that and restore it back.
3378         m_unpack_func = (cs->number == COLLATION_LATIN1_BIN)
3379                             ? Rdb_key_def::unpack_binary_str
3380                             : Rdb_key_def::unpack_utf8_str;
3381       }
3382       res = true;
3383     } else {
3384       // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3385 
3386       res = true;  // index-only scans are possible
3387       m_unpack_data_len = is_varchar ? 0 : field->field_length;
3388       const uint idx = is_varchar ? 0 : 1;
3389       const Rdb_collation_codec *codec = nullptr;
3390 
3391       if (is_varchar) {
3392         // VARCHAR requires space-padding for doing comparisons
3393         //
3394         // The check for cs->levels_for_order is to catch
3395         // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3396         // that Variable-Length Space Padded Encoding can't handle.
3397         // It is not expected to work for any other multi-level collations,
3398         // either.
3399         // Currently we handle these collations as NO_PAD, even if they have
3400         // PAD_SPACE attribute.
3401         if (cs->levels_for_order == 1) {
3402           m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3403           m_skip_func = Rdb_key_def::skip_variable_space_pad;
3404           m_segment_size = get_segment_size_from_collation(cs);
3405           m_max_image_len =
3406               (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3407               m_segment_size;
3408           rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3409                                        &space_mb_len);
3410         } else {
3411           //  NO_LINT_DEBUG
3412           sql_print_warning(
3413               "RocksDB: you're trying to create an index "
3414               "with a multi-level collation %s",
3415               cs->name);
3416           //  NO_LINT_DEBUG
3417           sql_print_warning(
3418               "MyRocks will handle this collation internally "
3419               " as if it had a NO_PAD attribute.");
3420           m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3421           m_skip_func = Rdb_key_def::skip_variable_length;
3422         }
3423       }
3424 
3425       if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
3426         // The collation allows to store extra information in the unpack_info
3427         // which can be used to restore the original value from the
3428         // mem-comparable form.
3429         m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
3430         m_unpack_func = codec->m_unpack_func[idx];
3431         m_charset_codec = codec;
3432       } else if (use_unknown_collation) {
3433         // We have no clue about how this collation produces mem-comparable
3434         // form. Our way of restoring the original value is to keep a copy of
3435         // the original value in unpack_info.
3436         m_unpack_info_stores_value = true;
3437         m_make_unpack_info_func = is_varchar
3438                                       ? Rdb_key_def::make_unpack_unknown_varchar
3439                                       : Rdb_key_def::make_unpack_unknown;
3440         m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
3441                                    : Rdb_key_def::unpack_unknown;
3442       } else {
3443         // Same as above: we don't know how to restore the value from its
3444         // mem-comparable form.
3445         // Here, we just indicate to the SQL layer we can't do it.
3446         DBUG_ASSERT(m_unpack_func == nullptr);
3447         m_unpack_info_stores_value = false;
3448         res = false;  // Indicate that index-only reads are not possible
3449       }
3450     }
3451 
3452     // Make an adjustment: if this column is partially covered, tell the SQL
3453     // layer we can't do index-only scans. Later when we perform an index read,
3454     // we'll check on a record-by-record basis if we can do an index-only scan
3455     // or not.
3456     uint field_length;
3457     if (field->table) {
3458       field_length = field->table->field[field->field_index]->field_length;
3459     } else {
3460       field_length = field->field_length;
3461     }
3462 
3463     if (field_length != key_length) {
3464       res = false;
3465       // If this index doesn't support covered bitmaps, then we won't know
3466       // during a read if the column is actually covered or not. If so, we need
3467       // to assume the column isn't covered and skip it during unpacking.
3468       //
3469       // If key_descr == NULL, then this is a dummy field and we probably don't
3470       // need to perform this step. However, to preserve the behavior before
3471       // this change, we'll only skip this step if we have an index which
3472       // supports covered bitmaps.
3473       if (!key_descr || !key_descr->use_covered_bitmap_format()) {
3474         m_unpack_func = nullptr;
3475         m_make_unpack_info_func = nullptr;
3476         m_unpack_info_stores_value = true;
3477       }
3478     }
3479   }
3480 
3481   m_covered = res;
3482   return res;
3483 }
3484 
3485 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
3486   return tbl->key_info[m_keynr].key_part[m_key_part].field;
3487 }
3488 
3489 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
3490                                            const longlong hidden_pk_id) const {
3491   DBUG_ASSERT(m_max_image_len == 8);
3492 
3493   String to;
3494   rdb_netstr_append_uint64(&to, hidden_pk_id);
3495   memcpy(*dst, to.ptr(), m_max_image_len);
3496 
3497   *dst += m_max_image_len;
3498 }
3499 
3500 ///////////////////////////////////////////////////////////////////////////////////////////
3501 // Rdb_ddl_manager
3502 ///////////////////////////////////////////////////////////////////////////////////////////
3503 
3504 Rdb_tbl_def::~Rdb_tbl_def() {
3505   auto ddl_manager = rdb_get_ddl_manager();
3506   /* Don't free key definitions */
3507   if (m_key_descr_arr) {
3508     for (uint i = 0; i < m_key_count; i++) {
3509       if (ddl_manager && m_key_descr_arr[i]) {
3510         ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
3511       }
3512 
3513       m_key_descr_arr[i] = nullptr;
3514     }
3515 
3516     delete[] m_key_descr_arr;
3517     m_key_descr_arr = nullptr;
3518   }
3519 }
3520 
3521 /*
3522   Put table definition DDL entry. Actual write is done at
3523   Rdb_dict_manager::commit.
3524 
3525   We write
3526     dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
3527 
3528   Where key entries are a tuple of
3529     ( cf_id, index_nr )
3530 */
3531 
3532 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
3533                            rocksdb::WriteBatch *const batch,
3534                            const rocksdb::Slice &key) {
3535   StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
3536   indexes.alloc(Rdb_key_def::VERSION_SIZE +
3537                 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
3538   rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
3539 
3540   for (uint i = 0; i < m_key_count; i++) {
3541     const Rdb_key_def &kd = *m_key_descr_arr[i];
3542 
3543     uchar flags =
3544         (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
3545         (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0);
3546 
3547     const uint cf_id = kd.get_cf()->GetID();
3548     /*
3549       If cf_id already exists, cf_flags must be the same.
3550       To prevent race condition, reading/modifying/committing CF flags
3551       need to be protected by mutex (dict_manager->lock()).
3552       When RocksDB supports transaction with pessimistic concurrency
3553       control, we can switch to use it and removing mutex.
3554     */
3555     uint existing_cf_flags;
3556     const std::string cf_name = kd.get_cf()->GetName();
3557 
3558     if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
3559       // For the purposes of comparison we'll clear the partitioning bit. The
3560       // intent here is to make sure that both partitioned and non-partitioned
3561       // tables can refer to the same CF.
3562       existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3563       flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3564 
3565       if (existing_cf_flags != flags) {
3566         my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags,
3567                  existing_cf_flags);
3568         return true;
3569       }
3570     } else {
3571       dict->add_cf_flags(batch, cf_id, flags);
3572     }
3573 
3574     rdb_netstr_append_uint32(&indexes, cf_id);
3575 
3576     uint32 index_number = kd.get_index_number();
3577     rdb_netstr_append_uint32(&indexes, index_number);
3578 
3579     struct Rdb_index_info index_info;
3580     index_info.m_gl_index_id = {cf_id, index_number};
3581     index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
3582     index_info.m_index_type = kd.m_index_type;
3583     index_info.m_kv_version = kd.m_kv_format_version;
3584     index_info.m_index_flags = kd.m_index_flags_bitmap;
3585     index_info.m_ttl_duration = kd.m_ttl_duration;
3586 
3587     dict->add_or_update_index_cf_mapping(batch, &index_info);
3588   }
3589 
3590   const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
3591 
3592   dict->put_key(batch, key, svalue);
3593   return false;
3594 }
3595 
3596 time_t Rdb_tbl_def::get_create_time() {
3597   time_t create_time = m_create_time;
3598 
3599   if (create_time == CREATE_TIME_UNKNOWN) {
3600     // Read it from the .frm file. It's not a problem if several threads do this
3601     // concurrently
3602     char path[FN_REFLEN];
3603     snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
3604              m_dbname.c_str(), m_tablename.c_str(), reg_ext);
3605     unpack_filename(path,path);
3606     MY_STAT f_stat;
3607     if (my_stat(path, &f_stat, MYF(0)))
3608       create_time = f_stat.st_ctime;
3609     else
3610       create_time = 0; // will be shown as SQL NULL
3611     m_create_time = create_time;
3612   }
3613   return create_time;
3614 }
3615 
3616 // Length that each index flag takes inside the record.
3617 // Each index in the array maps to the enum INDEX_FLAG
3618 static const std::array<uint, 1> index_flag_lengths = {
3619     {ROCKSDB_SIZEOF_TTL_RECORD}};
3620 
3621 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
3622   return flag & index_flags;
3623 }
3624 
3625 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
3626                                                 enum INDEX_FLAG flag,
3627                                                 uint *const length) {
3628   DBUG_ASSERT_IMP(flag != MAX_FLAG,
3629                   Rdb_key_def::has_index_flag(index_flags, flag));
3630 
3631   uint offset = 0;
3632   for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
3633     int mask = 1 << bit;
3634 
3635     /* Exit once we've reached the proper flag */
3636     if (flag & mask) {
3637       if (length != nullptr) {
3638         *length = index_flag_lengths[bit];
3639       }
3640       break;
3641     }
3642 
3643     if (index_flags & mask) {
3644       offset += index_flag_lengths[bit];
3645     }
3646   }
3647 
3648   return offset;
3649 }
3650 
3651 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
3652                                          const uchar *const val,
3653                                          enum INDEX_FLAG flag) const {
3654   uint len;
3655   uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
3656   DBUG_ASSERT(offset + len <= buf->get_current_pos());
3657   memcpy(buf->ptr() + offset, val, len);
3658 }
3659 
3660 void Rdb_tbl_def::check_if_is_mysql_system_table() {
3661   static const char *const system_dbs[] = {
3662       "mysql",
3663       "performance_schema",
3664       "information_schema",
3665   };
3666 
3667   m_is_mysql_system_table = false;
3668   for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
3669     if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
3670       m_is_mysql_system_table = true;
3671       break;
3672     }
3673   }
3674 }
3675 
3676 void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
3677   m_is_read_free_rpl_table =
3678 #if 0 // MARIAROCKS_NOT_YET : read-free replication is not supported
3679       rdb_read_free_regex_handler.matches(base_tablename());
3680 #else
3681       false;
3682 #endif
3683 }
3684 
3685 void Rdb_tbl_def::set_name(const std::string &name) {
3686   int err MY_ATTRIBUTE((__unused__));
3687 
3688   m_dbname_tablename = name;
3689   err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
3690                                        &m_partition);
3691   DBUG_ASSERT(err == 0);
3692 
3693   check_if_is_mysql_system_table();
3694 }
3695 
3696 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
3697   for (uint i = 0; i < m_key_count; i++) {
3698     auto &k = m_key_descr_arr[i];
3699     if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
3700         k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
3701       return k->get_gl_index_id();
3702     }
3703   }
3704 
3705   // Every table must have a primary key, even if it's hidden.
3706   abort();
3707   return GL_INDEX_ID();
3708 }
3709 
3710 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
3711   m_index_num_to_keydef.erase(gl_index_id);
3712 }
3713 
3714 void Rdb_ddl_manager::add_uncommitted_keydefs(
3715     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3716   mysql_rwlock_wrlock(&m_rwlock);
3717   for (const auto &index : indexes) {
3718     m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
3719   }
3720   mysql_rwlock_unlock(&m_rwlock);
3721 }
3722 
3723 void Rdb_ddl_manager::remove_uncommitted_keydefs(
3724     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3725   mysql_rwlock_wrlock(&m_rwlock);
3726   for (const auto &index : indexes) {
3727     m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
3728   }
3729   mysql_rwlock_unlock(&m_rwlock);
3730 }
3731 
3732 namespace  // anonymous namespace = not visible outside this source file
3733 {
3734 struct Rdb_validate_tbls : public Rdb_tables_scanner {
3735   using tbl_info_t = std::pair<std::string, bool>;
3736   using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
3737 
3738   tbl_list_t m_list;
3739 
3740   int add_table(Rdb_tbl_def *tdef) override;
3741 
3742   bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
3743 
3744   bool scan_for_frms(const std::string &datadir, const std::string &dbname,
3745                      bool *has_errors);
3746 
3747   bool check_frm_file(const std::string &fullpath, const std::string &dbname,
3748                       const std::string &tablename, bool *has_errors);
3749 };
3750 }  // anonymous namespace
3751 
3752 /*
3753   Get a list of tables that we expect to have .frm files for.  This will use the
3754   information just read from the RocksDB data dictionary.
3755 */
3756 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
3757   DBUG_ASSERT(tdef != nullptr);
3758 
3759   /* Add the database/table into the list that are not temp table */
3760   if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) {
3761     bool is_partition = tdef->base_partition().size() != 0;
3762     m_list[tdef->base_dbname()].insert(
3763         tbl_info_t(tdef->base_tablename(), is_partition));
3764   }
3765 
3766   return HA_EXIT_SUCCESS;
3767 }
3768 
3769 /*
3770   Access the .frm file for this dbname/tablename and see if it is a RocksDB
3771   table (or partition table).
3772 */
3773 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
3774                                        const std::string &dbname,
3775                                        const std::string &tablename,
3776                                        bool *has_errors) {
3777   /* Check this .frm file to see what engine it uses */
3778   String fullfilename(fullpath.c_str(), &my_charset_bin);
3779   fullfilename.append(FN_DIRSEP);
3780   fullfilename.append(tablename.c_str());
3781   fullfilename.append(".frm");
3782 
3783   /*
3784     This function will return the legacy_db_type of the table.  Currently
3785     it does not reference the first parameter (THD* thd), but if it ever
3786     did in the future we would need to make a version that does it without
3787     the connection handle as we don't have one here.
3788   */
3789   char eng_type_buf[NAME_CHAR_LEN+1];
3790   LEX_CSTRING eng_type_str = {eng_type_buf, 0};
3791   bool is_sequence;
3792   enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type_str, &is_sequence);
3793   if (type == TABLE_TYPE_UNKNOWN) {
3794     // NO_LINT_DEBUG
3795     sql_print_warning("RocksDB: Failed to open/read .from file: %s",
3796                       fullfilename.ptr());
3797     return false;
3798   }
3799 
3800   if (type == TABLE_TYPE_NORMAL) {
3801     /* For a RocksDB table do we have a reference in the data dictionary? */
3802     if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) {
3803       /*
3804         Attempt to remove the table entry from the list of tables.  If this
3805         fails then we know we had a .frm file that wasn't registered in RocksDB.
3806       */
3807       tbl_info_t element(tablename, false);
3808       if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
3809         // NO_LINT_DEBUG
3810         sql_print_warning(
3811             "RocksDB: Schema mismatch - "
3812             "A .frm file exists for table %s.%s, "
3813             "but that table is not registered in RocksDB",
3814             dbname.c_str(), tablename.c_str());
3815         *has_errors = true;
3816       }
3817     } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) {
3818       /*
3819         For partition tables, see if it is in the m_list as a partition,
3820         but don't generate an error if it isn't there - we don't know that the
3821         .frm is for RocksDB.
3822       */
3823       if (m_list.count(dbname) > 0) {
3824         m_list[dbname].erase(tbl_info_t(tablename, true));
3825       }
3826     }
3827   }
3828 
3829   return true;
3830 }
3831 
3832 /* Scan the database subdirectory for .frm files */
3833 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
3834                                       const std::string &dbname,
3835                                       bool *has_errors) {
3836   bool result = true;
3837   std::string fullpath = datadir + dbname;
3838   struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
3839 
3840   /* Access the directory */
3841   if (dir_info == nullptr) {
3842     // NO_LINT_DEBUG
3843     sql_print_warning("RocksDB: Could not open database directory: %s",
3844                       fullpath.c_str());
3845     return false;
3846   }
3847 
3848   /* Scan through the files in the directory */
3849   struct fileinfo *file_info = dir_info->dir_entry;
3850   for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3851     /* Find .frm files that are not temp files (those that contain '#sql') */
3852     const char *ext = strrchr(file_info->name, '.');
3853     if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
3854         strcmp(ext, ".frm") == 0) {
3855       std::string tablename =
3856           std::string(file_info->name, ext - file_info->name);
3857 
3858       /* Check to see if the .frm file is from RocksDB */
3859       if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
3860         result = false;
3861         break;
3862       }
3863     }
3864   }
3865 
3866   /* Remove any databases who have no more tables listed */
3867   if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
3868     m_list.erase(dbname);
3869   }
3870 
3871   /* Release the directory entry */
3872   my_dirend(dir_info);
3873 
3874   return result;
3875 }
3876 
3877 /*
3878   Scan the datadir for all databases (subdirectories) and get a list of .frm
3879   files they contain
3880 */
3881 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
3882                                                  bool *has_errors) {
3883   bool result = true;
3884   struct st_my_dir *dir_info;
3885   struct fileinfo *file_info;
3886 
3887   dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
3888   if (dir_info == nullptr) {
3889     // NO_LINT_DEBUG
3890     sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
3891     return false;
3892   }
3893 
3894   file_info = dir_info->dir_entry;
3895   for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3896     /* Ignore files/dirs starting with '.' */
3897     if (file_info->name[0] == '.') continue;
3898 
3899     /* Ignore all non-directory files */
3900     if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
3901 
3902     /* Scan all the .frm files in the directory */
3903     if (!scan_for_frms(datadir, file_info->name, has_errors)) {
3904       result = false;
3905       break;
3906     }
3907   }
3908 
3909   /* Release the directory info */
3910   my_dirend(dir_info);
3911 
3912   return result;
3913 }
3914 
3915 /*
3916   Validate that all auto increment values in the data dictionary are on a
3917   supported version.
3918 */
3919 bool Rdb_ddl_manager::validate_auto_incr() {
3920   std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
3921 
3922   uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
3923   rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
3924   const rocksdb::Slice auto_incr_entry_slice(
3925       reinterpret_cast<char *>(auto_incr_entry),
3926       Rdb_key_def::INDEX_NUMBER_SIZE);
3927   for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
3928     const rocksdb::Slice key = it->key();
3929     const rocksdb::Slice val = it->value();
3930     GL_INDEX_ID gl_index_id;
3931 
3932     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
3933         memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
3934       break;
3935     }
3936 
3937     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
3938       return false;
3939     }
3940 
3941     if (val.size() <= Rdb_key_def::VERSION_SIZE) {
3942       return false;
3943     }
3944 
3945     // Check if we have orphaned entries for whatever reason by cross
3946     // referencing ddl entries.
3947     auto ptr = reinterpret_cast<const uchar *>(key.data());
3948     ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
3949     rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
3950     if (!m_dict->get_index_info(gl_index_id, nullptr)) {
3951       // NO_LINT_DEBUG
3952       sql_print_warning(
3953           "RocksDB: AUTOINC mismatch - "
3954           "Index number (%u, %u) found in AUTOINC "
3955           "but does not exist as a DDL entry",
3956           gl_index_id.cf_id, gl_index_id.index_id);
3957       return false;
3958     }
3959 
3960     ptr = reinterpret_cast<const uchar *>(val.data());
3961     const int version = rdb_netbuf_read_uint16(&ptr);
3962     if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
3963       // NO_LINT_DEBUG
3964       sql_print_warning(
3965           "RocksDB: AUTOINC mismatch - "
3966           "Index number (%u, %u) found in AUTOINC "
3967           "is on unsupported version %d",
3968           gl_index_id.cf_id, gl_index_id.index_id, version);
3969       return false;
3970     }
3971   }
3972 
3973   if (!it->status().ok()) {
3974     return false;
3975   }
3976 
3977   return true;
3978 }
3979 
3980 /*
3981   Validate that all the tables in the RocksDB database dictionary match the .frm
3982   files in the datadir
3983 */
3984 bool Rdb_ddl_manager::validate_schemas(void) {
3985   bool has_errors = false;
3986   const std::string datadir = std::string(mysql_real_data_home);
3987   Rdb_validate_tbls table_list;
3988 
3989   /* Get the list of tables from the database dictionary */
3990   if (scan_for_tables(&table_list) != 0) {
3991     return false;
3992   }
3993 
3994   /* Compare that to the list of actual .frm files */
3995   if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
3996     return false;
3997   }
3998 
3999   /*
4000     Any tables left in the tables list are ones that are registered in RocksDB
4001     but don't have .frm files.
4002   */
4003   for (const auto &db : table_list.m_list) {
4004     for (const auto &table : db.second) {
4005       // NO_LINT_DEBUG
4006       sql_print_warning(
4007           "RocksDB: Schema mismatch - "
4008           "Table %s.%s is registered in RocksDB "
4009           "but does not have a .frm file",
4010           db.first.c_str(), table.first.c_str());
4011       has_errors = true;
4012     }
4013   }
4014 
4015   return !has_errors;
4016 }
4017 
4018 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4019                            Rdb_cf_manager *const cf_manager,
4020                            const uint32_t validate_tables) {
4021   m_dict = dict_arg;
4022   mysql_rwlock_init(0, &m_rwlock);
4023 
4024   /* Read the data dictionary and populate the hash */
4025   uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4026   rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4027   const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
4028                                        Rdb_key_def::INDEX_NUMBER_SIZE);
4029 
4030   /* Reading data dictionary should always skip bloom filter */
4031   rocksdb::Iterator *it = m_dict->new_iterator();
4032   int i = 0;
4033 
4034   uint max_index_id_in_dict = 0;
4035   m_dict->get_max_index_id(&max_index_id_in_dict);
4036 
4037   for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
4038     const uchar *ptr;
4039     const uchar *ptr_end;
4040     const rocksdb::Slice key = it->key();
4041     const rocksdb::Slice val = it->value();
4042 
4043     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4044         memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4045       break;
4046     }
4047 
4048     if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
4049       // NO_LINT_DEBUG
4050       sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
4051                       (int)key.size());
4052       return true;
4053     }
4054 
4055     Rdb_tbl_def *const tdef =
4056         new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
4057 
4058     // Now, read the DDLs.
4059     const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
4060     if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
4061       // NO_LINT_DEBUG
4062       sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
4063                       tdef->full_tablename().c_str());
4064       return true;
4065     }
4066     tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
4067     tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
4068 
4069     ptr = reinterpret_cast<const uchar *>(val.data());
4070     const int version = rdb_netbuf_read_uint16(&ptr);
4071     if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
4072       // NO_LINT_DEBUG
4073       sql_print_error(
4074           "RocksDB: DDL ENTRY Version was not expected."
4075           "Expected: %d, Actual: %d",
4076           Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
4077       return true;
4078     }
4079     ptr_end = ptr + real_val_size;
4080     for (uint keyno = 0; ptr < ptr_end; keyno++) {
4081       GL_INDEX_ID gl_index_id;
4082       rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4083       uint flags = 0;
4084       struct Rdb_index_info index_info;
4085       if (!m_dict->get_index_info(gl_index_id, &index_info)) {
4086         // NO_LINT_DEBUG
4087         sql_print_error(
4088             "RocksDB: Could not get index information "
4089             "for Index Number (%u,%u), table %s",
4090             gl_index_id.cf_id, gl_index_id.index_id,
4091             tdef->full_tablename().c_str());
4092         return true;
4093       }
4094       if (max_index_id_in_dict < gl_index_id.index_id) {
4095         // NO_LINT_DEBUG
4096         sql_print_error(
4097             "RocksDB: Found max index id %u from data dictionary "
4098             "but also found larger index id %u from dictionary. "
4099             "This should never happen and possibly a bug.",
4100             max_index_id_in_dict, gl_index_id.index_id);
4101         return true;
4102       }
4103       if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
4104         // NO_LINT_DEBUG
4105         sql_print_error(
4106             "RocksDB: Could not get Column Family Flags "
4107             "for CF Number %d, table %s",
4108             gl_index_id.cf_id, tdef->full_tablename().c_str());
4109         return true;
4110       }
4111 
4112       if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
4113         // The per-index cf option is deprecated.  Make sure we don't have the
4114         // flag set in any existing database.   NO_LINT_DEBUG
4115         // NO_LINT_DEBUG
4116         sql_print_error(
4117             "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
4118             "number %d, table %s",
4119             gl_index_id.cf_id, tdef->full_tablename().c_str());
4120       }
4121 
4122       rocksdb::ColumnFamilyHandle *const cfh =
4123           cf_manager->get_cf(gl_index_id.cf_id);
4124       DBUG_ASSERT(cfh != nullptr);
4125 
4126       uint32 ttl_rec_offset =
4127           Rdb_key_def::has_index_flag(index_info.m_index_flags,
4128                                       Rdb_key_def::TTL_FLAG)
4129               ? Rdb_key_def::calculate_index_flag_offset(
4130                     index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
4131               : UINT_MAX;
4132 
4133       /*
4134         We can't fully initialize Rdb_key_def object here, because full
4135         initialization requires that there is an open TABLE* where we could
4136         look at Field* objects and set max_length and other attributes
4137       */
4138       tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
4139           gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
4140           index_info.m_index_type, index_info.m_kv_version,
4141           flags & Rdb_key_def::REVERSE_CF_FLAG,
4142           flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
4143           m_dict->get_stats(gl_index_id), index_info.m_index_flags,
4144           ttl_rec_offset, index_info.m_ttl_duration);
4145     }
4146     put(tdef);
4147     i++;
4148   }
4149 
4150   /*
4151     If validate_tables is greater than 0 run the validation.  Only fail the
4152     initialzation if the setting is 1.  If the setting is 2 we continue.
4153   */
4154   if (validate_tables > 0) {
4155     std::string msg;
4156     if (!validate_schemas()) {
4157       msg =
4158           "RocksDB: Problems validating data dictionary "
4159           "against .frm files, exiting";
4160     } else if (!validate_auto_incr()) {
4161       msg =
4162           "RocksDB: Problems validating auto increment values in "
4163           "data dictionary, exiting";
4164     }
4165     if (validate_tables == 1 && !msg.empty()) {
4166       // NO_LINT_DEBUG
4167       sql_print_error("%s", msg.c_str());
4168       return true;
4169     }
4170   }
4171 
4172   // index ids used by applications should not conflict with
4173   // data dictionary index ids
4174   if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4175     max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4176   }
4177 
4178   m_sequence.init(max_index_id_in_dict + 1);
4179 
4180   if (!it->status().ok()) {
4181     rdb_log_status_error(it->status(), "Table_store load error");
4182     return true;
4183   }
4184   delete it;
4185   // NO_LINT_DEBUG
4186   sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4187                         i);
4188   return false;
4189 }
4190 
4191 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4192                                    const bool lock) {
4193   if (lock) {
4194     mysql_rwlock_rdlock(&m_rwlock);
4195   }
4196 
4197   Rdb_tbl_def *rec = nullptr;
4198   const auto it = m_ddl_map.find(table_name);
4199   if (it != m_ddl_map.end()) {
4200     rec = it->second;
4201   }
4202 
4203   if (lock) {
4204     mysql_rwlock_unlock(&m_rwlock);
4205   }
4206 
4207   return rec;
4208 }
4209 
4210 // this is a safe version of the find() function below.  It acquires a read
4211 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4212 // are finding it.  Copying it into 'ret' increments the count making sure
4213 // that the object will not be discarded until we are finished with it.
4214 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
4215     GL_INDEX_ID gl_index_id) {
4216   std::shared_ptr<const Rdb_key_def> ret(nullptr);
4217 
4218   mysql_rwlock_rdlock(&m_rwlock);
4219 
4220   auto it = m_index_num_to_keydef.find(gl_index_id);
4221   if (it != m_index_num_to_keydef.end()) {
4222     const auto table_def = find(it->second.first, false);
4223     if (table_def && it->second.second < table_def->m_key_count) {
4224       const auto &kd = table_def->m_key_descr_arr[it->second.second];
4225       if (kd->max_storage_fmt_length() != 0) {
4226         ret = kd;
4227       }
4228     }
4229   } else {
4230     auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4231     if (it != m_index_num_to_uncommitted_keydef.end()) {
4232       const auto &kd = it->second;
4233       if (kd->max_storage_fmt_length() != 0) {
4234         ret = kd;
4235       }
4236     }
4237   }
4238 
4239   mysql_rwlock_unlock(&m_rwlock);
4240 
4241   return ret;
4242 }
4243 
4244 // this method assumes at least read-only lock on m_rwlock
4245 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
4246     GL_INDEX_ID gl_index_id) {
4247   auto it = m_index_num_to_keydef.find(gl_index_id);
4248   if (it != m_index_num_to_keydef.end()) {
4249     auto table_def = find(it->second.first, false);
4250     if (table_def) {
4251       if (it->second.second < table_def->m_key_count) {
4252         return table_def->m_key_descr_arr[it->second.second];
4253       }
4254     }
4255   } else {
4256     auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4257     if (it != m_index_num_to_uncommitted_keydef.end()) {
4258       return it->second;
4259     }
4260   }
4261 
4262   static std::shared_ptr<Rdb_key_def> empty = nullptr;
4263 
4264   return empty;
4265 }
4266 
4267 // this method returns the name of the table based on an index id. It acquires
4268 // a read lock on m_rwlock.
4269 const std::string Rdb_ddl_manager::safe_get_table_name(
4270     const GL_INDEX_ID &gl_index_id) {
4271   std::string ret;
4272   mysql_rwlock_rdlock(&m_rwlock);
4273   auto it = m_index_num_to_keydef.find(gl_index_id);
4274   if (it != m_index_num_to_keydef.end()) {
4275     ret = it->second.first;
4276   }
4277   mysql_rwlock_unlock(&m_rwlock);
4278   return ret;
4279 }
4280 
4281 void Rdb_ddl_manager::set_stats(
4282     const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4283   mysql_rwlock_wrlock(&m_rwlock);
4284   for (auto src : stats) {
4285     const auto &keydef = find(src.second.m_gl_index_id);
4286     if (keydef) {
4287       keydef->m_stats = src.second;
4288       m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4289     }
4290   }
4291   mysql_rwlock_unlock(&m_rwlock);
4292 }
4293 
4294 void Rdb_ddl_manager::adjust_stats(
4295     const std::vector<Rdb_index_stats> &new_data,
4296     const std::vector<Rdb_index_stats> &deleted_data) {
4297   mysql_rwlock_wrlock(&m_rwlock);
4298   int i = 0;
4299   for (const auto &data : {new_data, deleted_data}) {
4300     for (const auto &src : data) {
4301       const auto &keydef = find(src.m_gl_index_id);
4302       if (keydef) {
4303         keydef->m_stats.m_distinct_keys_per_prefix.resize(
4304             keydef->get_key_parts());
4305         keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4306         m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4307       }
4308     }
4309     i++;
4310   }
4311   const bool should_save_stats = !m_stats2store.empty();
4312   mysql_rwlock_unlock(&m_rwlock);
4313   if (should_save_stats) {
4314     // Queue an async persist_stats(false) call to the background thread.
4315     rdb_queue_save_stats_request();
4316   }
4317 }
4318 
4319 void Rdb_ddl_manager::persist_stats(const bool sync) {
4320   mysql_rwlock_wrlock(&m_rwlock);
4321   const auto local_stats2store = std::move(m_stats2store);
4322   m_stats2store.clear();
4323   mysql_rwlock_unlock(&m_rwlock);
4324 
4325   // Persist stats
4326   const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4327   std::vector<Rdb_index_stats> stats;
4328   std::transform(local_stats2store.begin(), local_stats2store.end(),
4329                  std::back_inserter(stats),
4330                  [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4331                    return s.second;
4332                  });
4333   m_dict->add_stats(wb.get(), stats);
4334   m_dict->commit(wb.get(), sync);
4335 }
4336 
4337 /*
4338   Put table definition of `tbl` into the mapping, and also write it to the
4339   on-disk data dictionary.
4340 */
4341 
4342 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
4343                                    rocksdb::WriteBatch *const batch) {
4344   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
4345 
4346   buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4347 
4348   const std::string &dbname_tablename = tbl->full_tablename();
4349   buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4350 
4351   int res;
4352   if ((res = tbl->put_dict(m_dict, batch, buf_writer.to_slice()))) {
4353     return res;
4354   }
4355   if ((res = put(tbl))) {
4356     return res;
4357   }
4358   return HA_EXIT_SUCCESS;
4359 }
4360 
4361 /* Return 0 - ok, other value - error */
4362 /* TODO:
4363   This function modifies m_ddl_map and m_index_num_to_keydef.
4364   However, these changes need to be reversed if dict_manager.commit fails
4365   See the discussion here: https://reviews.facebook.net/D35925#inline-259167
4366   Tracked by https://github.com/facebook/mysql-5.6/issues/33
4367 */
4368 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
4369   Rdb_tbl_def *rec;
4370   const std::string &dbname_tablename = tbl->full_tablename();
4371 
4372   if (lock) mysql_rwlock_wrlock(&m_rwlock);
4373 
4374   // We have to do this find because 'tbl' is not yet in the list.  We need
4375   // to find the one we are replacing ('rec')
4376   rec = find(dbname_tablename, false);
4377   if (rec) {
4378     // Free the old record.
4379     delete rec;
4380     m_ddl_map.erase(dbname_tablename);
4381   }
4382   m_ddl_map.emplace(dbname_tablename, tbl);
4383 
4384   for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
4385     m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
4386         std::make_pair(dbname_tablename, keyno);
4387   }
4388   tbl->check_and_set_read_free_rpl_table();
4389 
4390   if (lock) mysql_rwlock_unlock(&m_rwlock);
4391   return 0;
4392 }
4393 
4394 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
4395                              rocksdb::WriteBatch *const batch,
4396                              const bool lock) {
4397   if (lock) mysql_rwlock_wrlock(&m_rwlock);
4398 
4399   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
4400   key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4401   const std::string &dbname_tablename = tbl->full_tablename();
4402   key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4403 
4404   m_dict->delete_key(batch, key_writer.to_slice());
4405 
4406   const auto it = m_ddl_map.find(dbname_tablename);
4407   if (it != m_ddl_map.end()) {
4408     // Free Rdb_tbl_def
4409     delete it->second;
4410 
4411     m_ddl_map.erase(it);
4412   }
4413 
4414   if (lock) mysql_rwlock_unlock(&m_rwlock);
4415 }
4416 
4417 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
4418                              rocksdb::WriteBatch *const batch) {
4419   Rdb_tbl_def *rec;
4420   Rdb_tbl_def *new_rec;
4421   bool res = true;
4422   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
4423 
4424   mysql_rwlock_wrlock(&m_rwlock);
4425   if (!(rec = find(from, false))) {
4426     mysql_rwlock_unlock(&m_rwlock);
4427     return true;
4428   }
4429 
4430   new_rec = new Rdb_tbl_def(to);
4431 
4432   new_rec->m_key_count = rec->m_key_count;
4433   new_rec->m_auto_incr_val =
4434       rec->m_auto_incr_val.load(std::memory_order_relaxed);
4435   new_rec->m_key_descr_arr = rec->m_key_descr_arr;
4436 
4437   new_rec->m_hidden_pk_val =
4438       rec->m_hidden_pk_val.load(std::memory_order_relaxed);
4439 
4440   // so that it's not free'd when deleting the old rec
4441   rec->m_key_descr_arr = nullptr;
4442 
4443   // Create a new key
4444   new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4445 
4446   const std::string &dbname_tablename = new_rec->full_tablename();
4447   new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4448 
4449   // Create a key to add
4450   if (!new_rec->put_dict(m_dict, batch, new_buf_writer.to_slice())) {
4451     remove(rec, batch, false);
4452     put(new_rec, false);
4453     res = false;  // ok
4454   }
4455 
4456   mysql_rwlock_unlock(&m_rwlock);
4457   return res;
4458 }
4459 
4460 void Rdb_ddl_manager::cleanup() {
4461   for (const auto &kv : m_ddl_map) {
4462     delete kv.second;
4463   }
4464   m_ddl_map.clear();
4465 
4466   mysql_rwlock_destroy(&m_rwlock);
4467   m_sequence.cleanup();
4468 }
4469 
4470 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
4471   int ret;
4472   Rdb_tbl_def *rec;
4473 
4474   DBUG_ASSERT(tables_scanner != nullptr);
4475 
4476   mysql_rwlock_rdlock(&m_rwlock);
4477 
4478   ret = 0;
4479 
4480   for (const auto &kv : m_ddl_map) {
4481     rec = kv.second;
4482     ret = tables_scanner->add_table(rec);
4483     if (ret) break;
4484   }
4485 
4486   mysql_rwlock_unlock(&m_rwlock);
4487   return ret;
4488 }
4489 
4490 /*
4491   Rdb_binlog_manager class implementation
4492 */
4493 
4494 bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) {
4495   DBUG_ASSERT(dict_arg != nullptr);
4496   m_dict = dict_arg;
4497 
4498   m_key_writer.reset();
4499   m_key_writer.write_index(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER);
4500   m_key_slice = m_key_writer.to_slice();
4501   return false;
4502 }
4503 
4504 void Rdb_binlog_manager::cleanup() {}
4505 
4506 /**
4507   Set binlog name, pos and optionally gtid into WriteBatch.
4508   This function should be called as part of transaction commit,
4509   since binlog info is set only at transaction commit.
4510   Actual write into RocksDB is not done here, so checking if
4511   write succeeded or not is not possible here.
4512   @param binlog_name   Binlog name
4513   @param binlog_pos    Binlog pos
4514   @param batch         WriteBatch
4515 */
4516 void Rdb_binlog_manager::update(const char *const binlog_name,
4517                                 const my_off_t binlog_pos,
4518                                 rocksdb::WriteBatchBase *const batch) {
4519   if (binlog_name && binlog_pos) {
4520     // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024
4521     const size_t RDB_MAX_BINLOG_INFO_LEN = 1024;
4522     Rdb_buf_writer<RDB_MAX_BINLOG_INFO_LEN> value_writer;
4523 
4524     // store version
4525     value_writer.write_uint16(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION);
4526 
4527     // store binlog file name length
4528     DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN);
4529     const uint16_t binlog_name_len = strlen(binlog_name);
4530     value_writer.write_uint16(binlog_name_len);
4531 
4532     // store binlog file name
4533     value_writer.write(binlog_name, binlog_name_len);
4534 
4535     // store binlog pos
4536     value_writer.write_uint32(binlog_pos);
4537 
4538 #ifdef MARIADB_MERGE_2019
4539     // store binlog gtid length.
4540     // If gtid was not set, store 0 instead
4541     const uint16_t binlog_max_gtid_len =
4542         binlog_max_gtid ? strlen(binlog_max_gtid) : 0;
4543     value_writer.write_uint16(binlog_max_gtid_len);
4544 
4545     if (binlog_max_gtid_len > 0) {
4546       // store binlog gtid
4547       value_writer.write(binlog_max_gtid, binlog_max_gtid_len);
4548     }
4549 #endif
4550 
4551     m_dict->put_key(batch, m_key_slice, value_writer.to_slice());
4552   }
4553 }
4554 
4555 /**
4556   Read binlog committed entry stored in RocksDB, then unpack
4557   @param[OUT] binlog_name  Binlog name
4558   @param[OUT] binlog_pos   Binlog pos
4559   @param[OUT] binlog_gtid  Binlog GTID
4560   @return
4561     true is binlog info was found (valid behavior)
4562     false otherwise
4563 */
4564 bool Rdb_binlog_manager::read(char *const binlog_name,
4565                               my_off_t *const binlog_pos,
4566                               char *const binlog_gtid) const {
4567   bool ret = false;
4568   if (binlog_name) {
4569     std::string value;
4570     rocksdb::Status status = m_dict->get_value(m_key_slice, &value);
4571     if (status.ok()) {
4572       if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos,
4573                         binlog_gtid)) {
4574         ret = true;
4575       }
4576     }
4577   }
4578   return ret;
4579 }
4580 
4581 /**
4582   Unpack value then split into binlog_name, binlog_pos (and binlog_gtid)
4583   @param[IN]  value        Binlog state info fetched from RocksDB
4584   @param[OUT] binlog_name  Binlog name
4585   @param[OUT] binlog_pos   Binlog pos
4586   @param[OUT] binlog_gtid  Binlog GTID
4587   @return     true on error
4588 */
4589 bool Rdb_binlog_manager::unpack_value(const uchar *const value,
4590                                       size_t value_size_arg,
4591                                       char *const binlog_name,
4592                                       my_off_t *const binlog_pos,
4593                                       char *const binlog_gtid) const {
4594   uint pack_len = 0;
4595   intmax_t value_size= value_size_arg;
4596 
4597   DBUG_ASSERT(binlog_pos != nullptr);
4598 
4599   if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0)
4600     return true;
4601   // read version
4602   const uint16_t version = rdb_netbuf_to_uint16(value);
4603 
4604   pack_len += Rdb_key_def::VERSION_SIZE;
4605   if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION) return true;
4606 
4607   if ((value_size -= sizeof(uint16)) < 0)
4608     return true;
4609 
4610   // read binlog file name length
4611   const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len);
4612   pack_len += sizeof(uint16);
4613 
4614   if (binlog_name_len >= (FN_REFLEN+1))
4615     return true;
4616 
4617   if ((value_size -= binlog_name_len) < 0)
4618     return true;
4619 
4620   if (binlog_name_len) {
4621     // read and set binlog name
4622     memcpy(binlog_name, value + pack_len, binlog_name_len);
4623     binlog_name[binlog_name_len] = '\0';
4624     pack_len += binlog_name_len;
4625 
4626     if ((value_size -= sizeof(uint32)) < 0)
4627       return true;
4628     // read and set binlog pos
4629     *binlog_pos = rdb_netbuf_to_uint32(value + pack_len);
4630     pack_len += sizeof(uint32);
4631 
4632     if ((value_size -= sizeof(uint16)) < 0)
4633       return true;
4634     // read gtid length
4635     const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len);
4636     pack_len += sizeof(uint16);
4637 
4638     if (binlog_gtid_len >= GTID_BUF_LEN)
4639       return true;
4640     if ((value_size -= binlog_gtid_len) < 0)
4641       return true;
4642 
4643     if (binlog_gtid && binlog_gtid_len > 0) {
4644       // read and set gtid
4645       memcpy(binlog_gtid, value + pack_len, binlog_gtid_len);
4646       binlog_gtid[binlog_gtid_len] = '\0';
4647       pack_len += binlog_gtid_len;
4648     }
4649   }
4650   return false;
4651 }
4652 
4653 /**
4654   Inserts a row into mysql.slave_gtid_info table. Doing this inside
4655   storage engine is more efficient than inserting/updating through MySQL.
4656 
4657   @param[IN] id Primary key of the table.
4658   @param[IN] db Database name. This is column 2 of the table.
4659   @param[IN] gtid Gtid in human readable form. This is column 3 of the table.
4660   @param[IN] write_batch Handle to storage engine writer.
4661 */
4662 void Rdb_binlog_manager::update_slave_gtid_info(
4663     const uint id, const char *const db, const char *const gtid,
4664     rocksdb::WriteBatchBase *const write_batch) {
4665   if (id && db && gtid) {
4666     // Make sure that if the slave_gtid_info table exists we have a
4667     // pointer to it via m_slave_gtid_info_tbl.
4668     if (!m_slave_gtid_info_tbl.load()) {
4669       m_slave_gtid_info_tbl.store(
4670           rdb_get_ddl_manager()->find("mysql.slave_gtid_info"));
4671     }
4672     if (!m_slave_gtid_info_tbl.load()) {
4673       // slave_gtid_info table is not present. Simply return.
4674       return;
4675     }
4676     DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1);
4677 
4678     const std::shared_ptr<const Rdb_key_def> &kd =
4679         m_slave_gtid_info_tbl.load()->m_key_descr_arr[0];
4680     String value;
4681 
4682     // Build key
4683     Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE + 4> key_writer;
4684     key_writer.write_index(kd->get_index_number());
4685     key_writer.write_uint32(id);
4686 
4687     // Build value
4688     Rdb_buf_writer<128> value_writer;
4689     DBUG_ASSERT(gtid);
4690     const uint db_len = strlen(db);
4691     const uint gtid_len = strlen(gtid);
4692     // 1 byte used for flags. Empty here.
4693     value_writer.write_byte(0);
4694 
4695     // Write column 1.
4696     DBUG_ASSERT(strlen(db) <= 64);
4697     value_writer.write_byte(db_len);
4698     value_writer.write(db, db_len);
4699 
4700     // Write column 2.
4701     DBUG_ASSERT(gtid_len <= 56);
4702     value_writer.write_byte(gtid_len);
4703     value_writer.write(gtid, gtid_len);
4704 
4705     write_batch->Put(kd->get_cf(), key_writer.to_slice(),
4706                      value_writer.to_slice());
4707   }
4708 }
4709 
4710 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
4711                             Rdb_cf_manager *const cf_manager) {
4712   DBUG_ASSERT(rdb_dict != nullptr);
4713   DBUG_ASSERT(cf_manager != nullptr);
4714 
4715   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
4716 
4717   m_db = rdb_dict;
4718 
4719   m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME);
4720   rocksdb::ColumnFamilyHandle *default_cfh =
4721       cf_manager->get_cf(DEFAULT_CF_NAME);
4722 
4723   // System CF and default CF should be initialized
4724   if (m_system_cfh == nullptr || default_cfh == nullptr) {
4725     return HA_EXIT_FAILURE;
4726   }
4727 
4728   rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
4729 
4730   m_key_slice_max_index_id =
4731       rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
4732                      Rdb_key_def::INDEX_NUMBER_SIZE);
4733 
4734   resume_drop_indexes();
4735   rollback_ongoing_index_creation();
4736 
4737   // Initialize system CF and default CF flags
4738   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
4739   rocksdb::WriteBatch *const batch = wb.get();
4740 
4741   add_cf_flags(batch, m_system_cfh->GetID(), 0);
4742   add_cf_flags(batch, default_cfh->GetID(), 0);
4743   commit(batch);
4744 
4745   return HA_EXIT_SUCCESS;
4746 }
4747 
4748 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
4749   return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
4750 }
4751 
4752 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
4753                                const rocksdb::Slice &key,
4754                                const rocksdb::Slice &value) const {
4755   batch->Put(m_system_cfh, key, value);
4756 }
4757 
4758 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
4759                                             std::string *const value) const {
4760   rocksdb::ReadOptions options;
4761   options.total_order_seek = true;
4762   return m_db->Get(options, m_system_cfh, key, value);
4763 }
4764 
4765 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
4766                                   const rocksdb::Slice &key) const {
4767   batch->Delete(m_system_cfh, key);
4768 }
4769 
4770 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
4771   /* Reading data dictionary should always skip bloom filter */
4772   rocksdb::ReadOptions read_options;
4773   read_options.total_order_seek = true;
4774   return m_db->NewIterator(read_options, m_system_cfh);
4775 }
4776 
4777 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
4778                              const bool sync) const {
4779   if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
4780   int res = HA_EXIT_SUCCESS;
4781   rocksdb::WriteOptions options;
4782   options.sync = sync;
4783   rocksdb::TransactionDBWriteOptimizations optimize;
4784   optimize.skip_concurrency_control = true;
4785   rocksdb::Status s = m_db->Write(options, optimize, batch);
4786   res = !s.ok();  // we return true when something failed
4787   if (res) {
4788     rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
4789   }
4790   batch->Clear();
4791   return res;
4792 }
4793 
4794 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
4795                                      Rdb_key_def::DATA_DICT_TYPE dict_type,
4796                                      const GL_INDEX_ID &gl_index_id) {
4797   rdb_netbuf_store_uint32(netbuf, dict_type);
4798   rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
4799                           gl_index_id.cf_id);
4800   rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
4801                           gl_index_id.index_id);
4802 }
4803 
4804 void Rdb_dict_manager::delete_with_prefix(
4805     rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
4806     const GL_INDEX_ID &gl_index_id) const {
4807   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4808   dump_index_id(&key_writer, dict_type, gl_index_id);
4809 
4810   delete_key(batch, key_writer.to_slice());
4811 }
4812 
4813 void Rdb_dict_manager::add_or_update_index_cf_mapping(
4814     rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
4815   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4816   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
4817                 index_info->m_gl_index_id);
4818 
4819   Rdb_buf_writer<256> value_writer;
4820 
4821   value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
4822   value_writer.write_byte(index_info->m_index_type);
4823   value_writer.write_uint16(index_info->m_kv_version);
4824   value_writer.write_uint32(index_info->m_index_flags);
4825   value_writer.write_uint64(index_info->m_ttl_duration);
4826 
4827   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4828 }
4829 
4830 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
4831                                     const uint32_t cf_id,
4832                                     const uint32_t cf_flags) const {
4833   DBUG_ASSERT(batch != nullptr);
4834 
4835   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4836   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4837   key_writer.write_uint32(cf_id);
4838 
4839   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
4840       value_writer;
4841   value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
4842   value_writer.write_uint32(cf_flags);
4843 
4844   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4845 }
4846 
4847 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
4848                                          const GL_INDEX_ID &gl_index_id) const {
4849   delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
4850   delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
4851   delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
4852 }
4853 
4854 bool Rdb_dict_manager::get_index_info(
4855     const GL_INDEX_ID &gl_index_id,
4856     struct Rdb_index_info *const index_info) const {
4857   if (index_info) {
4858     index_info->m_gl_index_id = gl_index_id;
4859   }
4860 
4861   bool found = false;
4862   bool error = false;
4863   std::string value;
4864   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4865   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
4866 
4867   const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
4868   if (status.ok()) {
4869     if (!index_info) {
4870       return true;
4871     }
4872 
4873     const uchar *const val = (const uchar *)value.c_str();
4874     const uchar *ptr = val;
4875     index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
4876     ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
4877 
4878     switch (index_info->m_index_dict_version) {
4879       case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
4880         /* Sanity check to prevent reading bogus TTL record. */
4881         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4882                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4883                                 RDB_SIZEOF_INDEX_FLAGS +
4884                                 ROCKSDB_SIZEOF_TTL_RECORD) {
4885           error = true;
4886           break;
4887         }
4888         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4889         ptr += RDB_SIZEOF_INDEX_TYPE;
4890         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4891         ptr += RDB_SIZEOF_KV_VERSION;
4892         index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
4893         ptr += RDB_SIZEOF_INDEX_FLAGS;
4894         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4895         found = true;
4896         break;
4897 
4898       case Rdb_key_def::INDEX_INFO_VERSION_TTL:
4899         /* Sanity check to prevent reading bogus into TTL record. */
4900         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4901                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4902                                 ROCKSDB_SIZEOF_TTL_RECORD) {
4903           error = true;
4904           break;
4905         }
4906         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4907         ptr += RDB_SIZEOF_INDEX_TYPE;
4908         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4909         ptr += RDB_SIZEOF_KV_VERSION;
4910         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4911         if ((index_info->m_kv_version ==
4912              Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
4913             index_info->m_ttl_duration > 0) {
4914           index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
4915         }
4916         found = true;
4917         break;
4918 
4919       case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
4920       case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
4921         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4922         ptr += RDB_SIZEOF_INDEX_TYPE;
4923         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4924         found = true;
4925         break;
4926 
4927       default:
4928         error = true;
4929         break;
4930     }
4931 
4932     switch (index_info->m_index_type) {
4933       case Rdb_key_def::INDEX_TYPE_PRIMARY:
4934       case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
4935         error = index_info->m_kv_version >
4936                 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
4937         break;
4938       }
4939       case Rdb_key_def::INDEX_TYPE_SECONDARY:
4940         error = index_info->m_kv_version >
4941                 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
4942         break;
4943       default:
4944         error = true;
4945         break;
4946     }
4947   }
4948 
4949   if (error) {
4950     // NO_LINT_DEBUG
4951     sql_print_error(
4952         "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
4953         "from data dictionary. This should never happen "
4954         "and it may be a bug.",
4955         index_info->m_index_dict_version, index_info->m_index_type,
4956         index_info->m_kv_version, index_info->m_ttl_duration);
4957     abort();
4958   }
4959 
4960   return found;
4961 }
4962 
4963 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
4964                                     uint32_t *const cf_flags) const {
4965   DBUG_ASSERT(cf_flags != nullptr);
4966 
4967   bool found = false;
4968   std::string value;
4969   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4970 
4971   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4972   key_writer.write_uint32(cf_id);
4973 
4974   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
4975 
4976   if (status.ok()) {
4977     const uchar *val = (const uchar *)value.c_str();
4978     DBUG_ASSERT(val);
4979 
4980     const uint16_t version = rdb_netbuf_to_uint16(val);
4981 
4982     if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
4983       *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
4984       found = true;
4985     }
4986   }
4987 
4988   return found;
4989 }
4990 
4991 /*
4992   Returning index ids that were marked as deleted (via DROP TABLE) but
4993   still not removed by drop_index_thread yet, or indexes that are marked as
4994   ongoing creation.
4995  */
4996 void Rdb_dict_manager::get_ongoing_index_operation(
4997     std::unordered_set<GL_INDEX_ID> *gl_index_ids,
4998     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
4999   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5000               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5001 
5002   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
5003   index_writer.write_uint32(dd_type);
5004   const rocksdb::Slice index_slice = index_writer.to_slice();
5005 
5006   rocksdb::Iterator *it = new_iterator();
5007   for (it->Seek(index_slice); it->Valid(); it->Next()) {
5008     rocksdb::Slice key = it->key();
5009     const uchar *const ptr = (const uchar *)key.data();
5010 
5011     /*
5012       Ongoing drop/create index operations require key to be of the form:
5013       dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
5014 
5015       This may need to be changed in the future if we want to process a new
5016       ddl_type with different format.
5017     */
5018     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
5019         rdb_netbuf_to_uint32(ptr) != dd_type) {
5020       break;
5021     }
5022 
5023     // We don't check version right now since currently we always store only
5024     // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
5025     // If increasing version number, we need to add version check logic here.
5026     GL_INDEX_ID gl_index_id;
5027     gl_index_id.cf_id =
5028         rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5029     gl_index_id.index_id =
5030         rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
5031     gl_index_ids->insert(gl_index_id);
5032   }
5033   delete it;
5034 }
5035 
5036 /*
5037   Returning true if index_id is create/delete ongoing (undergoing creation or
5038   marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
5039   or not.
5040  */
5041 bool Rdb_dict_manager::is_index_operation_ongoing(
5042     const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5043   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5044               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5045 
5046   bool found = false;
5047   std::string value;
5048   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5049   dump_index_id(&key_writer, dd_type, gl_index_id);
5050 
5051   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5052   if (status.ok()) {
5053     found = true;
5054   }
5055   return found;
5056 }
5057 
5058 /*
5059   Adding index_id to data dictionary so that the index id is removed
5060   by drop_index_thread, or to track online index creation.
5061  */
5062 void Rdb_dict_manager::start_ongoing_index_operation(
5063     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5064     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5065   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5066               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5067 
5068   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5069   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
5070 
5071   dump_index_id(&key_writer, dd_type, gl_index_id);
5072 
5073   // version as needed
5074   if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5075     value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
5076   } else {
5077     value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
5078   }
5079 
5080   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5081 }
5082 
5083 /*
5084   Removing index_id from data dictionary to confirm drop_index_thread
5085   completed dropping entire key/values of the index_id
5086  */
5087 void Rdb_dict_manager::end_ongoing_index_operation(
5088     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5089     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5090   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5091               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5092 
5093   delete_with_prefix(batch, dd_type, gl_index_id);
5094 }
5095 
5096 /*
5097   Returning true if there is no target index ids to be removed
5098   by drop_index_thread
5099  */
5100 bool Rdb_dict_manager::is_drop_index_empty() const {
5101   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5102   get_ongoing_drop_indexes(&gl_index_ids);
5103   return gl_index_ids.empty();
5104 }
5105 
5106 /*
5107   This function is supposed to be called by DROP TABLE. Logging messages
5108   that dropping indexes started, and adding data dictionary so that
5109   all associated indexes to be removed
5110  */
5111 void Rdb_dict_manager::add_drop_table(
5112     std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5113     rocksdb::WriteBatch *const batch) const {
5114   std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5115   for (uint32 i = 0; i < n_keys; i++) {
5116     dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5117   }
5118 
5119   add_drop_index(dropped_index_ids, batch);
5120 }
5121 
5122 /*
5123   Called during inplace index drop operations. Logging messages
5124   that dropping indexes started, and adding data dictionary so that
5125   all associated indexes to be removed
5126  */
5127 void Rdb_dict_manager::add_drop_index(
5128     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5129     rocksdb::WriteBatch *const batch) const {
5130   for (const auto &gl_index_id : gl_index_ids) {
5131     log_start_drop_index(gl_index_id, "Begin");
5132     start_drop_index(batch, gl_index_id);
5133   }
5134 }
5135 
5136 /*
5137   Called during inplace index creation operations. Logging messages
5138   that adding indexes started, and updates data dictionary with all associated
5139   indexes to be added.
5140  */
5141 void Rdb_dict_manager::add_create_index(
5142     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5143     rocksdb::WriteBatch *const batch) const {
5144   for (const auto &gl_index_id : gl_index_ids) {
5145     // NO_LINT_DEBUG
5146     sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)",
5147                            gl_index_id.cf_id, gl_index_id.index_id);
5148     start_create_index(batch, gl_index_id);
5149   }
5150 }
5151 
5152 /*
5153   This function is supposed to be called by drop_index_thread, when it
5154   finished dropping any index, or at the completion of online index creation.
5155  */
5156 void Rdb_dict_manager::finish_indexes_operation(
5157     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5158     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5159   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5160               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5161 
5162   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5163   rocksdb::WriteBatch *const batch = wb.get();
5164 
5165   std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5166   get_ongoing_create_indexes(&incomplete_create_indexes);
5167 
5168   for (const auto &gl_index_id : gl_index_ids) {
5169     if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5170       end_ongoing_index_operation(batch, gl_index_id, dd_type);
5171 
5172       /*
5173         Remove the corresponding incomplete create indexes from data
5174         dictionary as well
5175       */
5176       if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5177         if (incomplete_create_indexes.count(gl_index_id)) {
5178           end_ongoing_index_operation(batch, gl_index_id,
5179                                       Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5180         }
5181       }
5182     }
5183 
5184     if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5185       delete_index_info(batch, gl_index_id);
5186     }
5187   }
5188   commit(batch);
5189 }
5190 
5191 /*
5192   This function is supposed to be called when initializing
5193   Rdb_dict_manager (at startup). If there is any index ids that are
5194   drop ongoing, printing out messages for diagnostics purposes.
5195  */
5196 void Rdb_dict_manager::resume_drop_indexes() const {
5197   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5198   get_ongoing_drop_indexes(&gl_index_ids);
5199 
5200   uint max_index_id_in_dict = 0;
5201   get_max_index_id(&max_index_id_in_dict);
5202 
5203   for (const auto &gl_index_id : gl_index_ids) {
5204     log_start_drop_index(gl_index_id, "Resume");
5205     if (max_index_id_in_dict < gl_index_id.index_id) {
5206       // NO_LINT_DEBUG
5207       sql_print_error(
5208           "RocksDB: Found max index id %u from data dictionary "
5209           "but also found dropped index id (%u,%u) from drop_index "
5210           "dictionary. This should never happen and is possibly a "
5211           "bug.",
5212           max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
5213       abort();
5214     }
5215   }
5216 }
5217 
5218 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5219   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5220   rocksdb::WriteBatch *const batch = wb.get();
5221 
5222   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5223   get_ongoing_create_indexes(&gl_index_ids);
5224 
5225   for (const auto &gl_index_id : gl_index_ids) {
5226     // NO_LINT_DEBUG
5227     sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)",
5228                            gl_index_id.cf_id, gl_index_id.index_id);
5229 
5230     start_drop_index(batch, gl_index_id);
5231   }
5232 
5233   commit(batch);
5234 }
5235 
5236 void Rdb_dict_manager::log_start_drop_table(
5237     const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5238     const char *const log_action) const {
5239   for (uint32 i = 0; i < n_keys; i++) {
5240     log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5241   }
5242 }
5243 
5244 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5245                                             const char *log_action) const {
5246   struct Rdb_index_info index_info;
5247   if (!get_index_info(gl_index_id, &index_info)) {
5248     /*
5249       If we don't find the index info, it could be that it's because it was a
5250       partially created index that isn't in the data dictionary yet that needs
5251       to be rolled back.
5252     */
5253     std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5254     get_ongoing_create_indexes(&incomplete_create_indexes);
5255 
5256     if (!incomplete_create_indexes.count(gl_index_id)) {
5257       /* If it's not a partially created index, something is very wrong. */
5258       // NO_LINT_DEBUG
5259       sql_print_error(
5260           "RocksDB: Failed to get column family info "
5261           "from index id (%u,%u). MyRocks data dictionary may "
5262           "get corrupted.",
5263           gl_index_id.cf_id, gl_index_id.index_id);
5264       if (rocksdb_ignore_datadic_errors)
5265       {
5266         sql_print_error("RocksDB: rocksdb_ignore_datadic_errors=1, "
5267                         "trying to continue");
5268         return;
5269       }
5270       abort();
5271     }
5272   }
5273 }
5274 
5275 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5276   bool found = false;
5277   std::string value;
5278 
5279   const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5280   if (status.ok()) {
5281     const uchar *const val = (const uchar *)value.c_str();
5282     const uint16_t version = rdb_netbuf_to_uint16(val);
5283     if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5284       *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5285       found = true;
5286     }
5287   }
5288   return found;
5289 }
5290 
5291 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5292                                            const uint32_t index_id) const {
5293   DBUG_ASSERT(batch != nullptr);
5294 
5295   uint32_t old_index_id = -1;
5296   if (get_max_index_id(&old_index_id)) {
5297     if (old_index_id > index_id) {
5298       // NO_LINT_DEBUG
5299       sql_print_error(
5300           "RocksDB: Found max index id %u from data dictionary "
5301           "but trying to update to older value %u. This should "
5302           "never happen and possibly a bug.",
5303           old_index_id, index_id);
5304       return true;
5305     }
5306   }
5307 
5308   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5309       value_writer;
5310   value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
5311   value_writer.write_uint32(index_id);
5312 
5313   batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
5314   return false;
5315 }
5316 
5317 void Rdb_dict_manager::add_stats(
5318     rocksdb::WriteBatch *const batch,
5319     const std::vector<Rdb_index_stats> &stats) const {
5320   DBUG_ASSERT(batch != nullptr);
5321 
5322   for (const auto &it : stats) {
5323     Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5324     dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5325 
5326     // IndexStats::materialize takes complete care of serialization including
5327     // storing the version
5328     const auto value =
5329         Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5330 
5331     batch->Put(m_system_cfh, key_writer.to_slice(), value);
5332   }
5333 }
5334 
5335 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5336   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5337   dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5338 
5339   std::string value;
5340   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5341   if (status.ok()) {
5342     std::vector<Rdb_index_stats> v;
5343     // unmaterialize checks if the version matches
5344     if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5345       return v[0];
5346     }
5347   }
5348 
5349   return Rdb_index_stats();
5350 }
5351 
5352 rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
5353     rocksdb::WriteBatchBase *batch, const GL_INDEX_ID &gl_index_id,
5354     ulonglong val, bool overwrite) const {
5355   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5356   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5357 
5358   // Value is constructed by storing the version and the value.
5359   Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5360                  ROCKSDB_SIZEOF_AUTOINC_VALUE>
5361       value_writer;
5362   value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
5363   value_writer.write_uint64(val);
5364 
5365   if (overwrite) {
5366     return batch->Put(m_system_cfh, key_writer.to_slice(),
5367                       value_writer.to_slice());
5368   }
5369   return batch->Merge(m_system_cfh, key_writer.to_slice(),
5370                       value_writer.to_slice());
5371 }
5372 
5373 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5374                                          ulonglong *new_val) const {
5375   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5376   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5377 
5378   std::string value;
5379   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5380 
5381   if (status.ok()) {
5382     const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5383 
5384     if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5385       *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5386       return true;
5387     }
5388   }
5389   return false;
5390 }
5391 
5392 uint Rdb_seq_generator::get_and_update_next_number(
5393     Rdb_dict_manager *const dict) {
5394   DBUG_ASSERT(dict != nullptr);
5395 
5396   uint res;
5397   RDB_MUTEX_LOCK_CHECK(m_mutex);
5398 
5399   res = m_next_number++;
5400 
5401   const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
5402   rocksdb::WriteBatch *const batch = wb.get();
5403 
5404   DBUG_ASSERT(batch != nullptr);
5405   dict->update_max_index_id(batch, res);
5406   dict->commit(batch);
5407 
5408   RDB_MUTEX_UNLOCK_CHECK(m_mutex);
5409 
5410   return res;
5411 }
5412 
5413 }  // namespace myrocks
5414