1 /*
2    Copyright (c) 2012,2013 Monty Program Ab
3    Copyright (c) 2020, MariaDB Corporation.
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
17 
18 #ifdef USE_PRAGMA_IMPLEMENTATION
19 #pragma implementation  // gcc: Class implementation
20 #endif
21 
22 /* For use of 'PRIu64': */
23 #define __STDC_FORMAT_MACROS
24 
25 #include <my_global.h>
26 /* This C++ file's header file */
27 #include "./rdb_datadic.h"
28 
29 #include <inttypes.h>
30 /* C++ standard header files */
31 #include <algorithm>
32 #include <array>
33 #include <limits>
34 #include <map>
35 #include <set>
36 #include <string>
37 #include <utility>
38 #include <vector>
39 
40 /* MySQL header files */
41 #include "./field.h"
42 #include "./key.h"
43 #include "./m_ctype.h"
44 #include "./my_bit.h"
45 #include "./my_bitmap.h"
46 #include "./sql_table.h"
47 
48 /* MyRocks header files */
49 #include "./ha_rocksdb.h"
50 #include "./ha_rocksdb_proto.h"
51 #include "./my_stacktrace.h"
52 #include "./rdb_cf_manager.h"
53 #include "./rdb_psi.h"
54 #include "./rdb_utils.h"
55 
56 namespace myrocks {
57 
58 void get_mem_comparable_space(const CHARSET_INFO *cs,
59                               const std::vector<uchar> **xfrm, size_t *xfrm_len,
60                               size_t *mb_len);
61 
62 /*
63   MariaDB's replacement for FB/MySQL Field::check_field_name_match :
64 */
field_check_field_name_match(Field * field,const char * name)65 inline bool field_check_field_name_match(Field *field, const char *name)
66 {
67   return (0 == my_strcasecmp(system_charset_info,
68                              field->field_name.str,
69                              name));
70 }
71 
72 
73 /*
74   Decode  current key field
75   @param  fpi               IN      data structure contains field metadata
76   @param  field             IN      current field
77   @param  reader            IN      key slice reader
78   @param  unp_reader        IN      unpack information reader
79   @return
80     HA_EXIT_SUCCESS    OK
81     other              HA_ERR error code
82 */
decode_field(Rdb_field_packing * fpi,Field * field,Rdb_string_reader * reader,const uchar * const default_value,Rdb_string_reader * unpack_reader)83 int Rdb_convert_to_record_key_decoder::decode_field(
84     Rdb_field_packing *fpi, Field *field, Rdb_string_reader *reader,
85     const uchar *const default_value, Rdb_string_reader *unpack_reader) {
86   if (fpi->m_maybe_null) {
87     const char *nullp;
88     if (!(nullp = reader->read(1))) {
89       return HA_EXIT_FAILURE;
90     }
91 
92     if (*nullp == 0) {
93       /* Set the NULL-bit of this field */
94       field->set_null();
95       /* Also set the field to its default value */
96       memcpy(field->ptr, default_value, field->pack_length());
97       return HA_EXIT_SUCCESS;
98     } else if (*nullp == 1) {
99       field->set_notnull();
100     } else {
101       return HA_EXIT_FAILURE;
102     }
103   }
104 
105   return (fpi->m_unpack_func)(fpi, field, field->ptr, reader, unpack_reader);
106 }
107 
108 /*
109   Decode  current key field
110 
111   @param  buf               OUT     the buf starting address
112   @param  offset            OUT     the bytes offset when data is written
113   @param  fpi               IN      data structure contains field metadata
114   @param  table             IN      current table
115   @param  field             IN      current field
116   @param  has_unpack_inf    IN      whether contains unpack inf
117   @param  reader            IN      key slice reader
118   @param  unp_reader        IN      unpack information reader
119   @return
120     HA_EXIT_SUCCESS    OK
121     other              HA_ERR error code
122 */
decode(uchar * const buf,uint * offset,Rdb_field_packing * fpi,TABLE * table,Field * field,bool has_unpack_info,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)123 int Rdb_convert_to_record_key_decoder::decode(
124     uchar *const buf, uint *offset, Rdb_field_packing *fpi, TABLE *table,
125     Field *field, bool has_unpack_info, Rdb_string_reader *reader,
126     Rdb_string_reader *unpack_reader) {
127   DBUG_ASSERT(buf != nullptr);
128   DBUG_ASSERT(offset != nullptr);
129 
130   uint field_offset = field->ptr - table->record[0];
131   *offset = field_offset;
132   uint null_offset = field->null_offset();
133   bool maybe_null = field->real_maybe_null();
134 
135   field->move_field(buf + field_offset,
136                     maybe_null ? buf + null_offset : nullptr, field->null_bit);
137 
138   // If we need unpack info, but there is none, tell the unpack function
139   // this by passing unp_reader as nullptr. If we never read unpack_info
140   // during unpacking anyway, then there won't an error.
141   bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
142 
143   int res =
144       decode_field(fpi, field, reader, table->s->default_values + field_offset,
145                    maybe_missing_unpack ? nullptr : unpack_reader);
146 
147   // Restore field->ptr and field->null_ptr
148   field->move_field(table->record[0] + field_offset,
149                     maybe_null ? table->record[0] + null_offset : nullptr,
150                     field->null_bit);
151   if (res != UNPACK_SUCCESS) {
152     return HA_ERR_ROCKSDB_CORRUPT_DATA;
153   }
154   return HA_EXIT_SUCCESS;
155 }
156 
157 /*
158   Skip current key field
159 
160   @param  fpi          IN    data structure contains field metadata
161   @param  field        IN    current field
162   @param  reader       IN    key slice reader
163   @param  unp_reader   IN    unpack information reader
164   @return
165     HA_EXIT_SUCCESS    OK
166     other              HA_ERR error code
167 */
skip(const Rdb_field_packing * fpi,const Field * field,Rdb_string_reader * reader,Rdb_string_reader * unp_reader)168 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
169                                             const Field *field,
170                                             Rdb_string_reader *reader,
171                                             Rdb_string_reader *unp_reader) {
172   /* It is impossible to unpack the column. Skip it. */
173   if (fpi->m_maybe_null) {
174     const char *nullp;
175     if (!(nullp = reader->read(1))) {
176       return HA_ERR_ROCKSDB_CORRUPT_DATA;
177     }
178     if (*nullp == 0) {
179       /* This is a NULL value */
180       return HA_EXIT_SUCCESS;
181     }
182     /* If NULL marker is not '0', it can be only '1'  */
183     if (*nullp != 1) {
184       return HA_ERR_ROCKSDB_CORRUPT_DATA;
185     }
186   }
187   if ((fpi->m_skip_func)(fpi, field, reader)) {
188     return HA_ERR_ROCKSDB_CORRUPT_DATA;
189   }
190   // If this is a space padded varchar, we need to skip the indicator
191   // bytes for trailing bytes. They're useless since we can't restore the
192   // field anyway.
193   //
194   // There is a special case for prefixed varchars where we do not
195   // generate unpack info, because we know prefixed varchars cannot be
196   // unpacked. In this case, it is not necessary to skip.
197   if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
198       !fpi->m_unpack_info_stores_value) {
199     unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
200   }
201   return HA_EXIT_SUCCESS;
202 }
203 
Rdb_key_field_iterator(const Rdb_key_def * key_def,Rdb_field_packing * pack_info,Rdb_string_reader * reader,Rdb_string_reader * unp_reader,TABLE * table,bool has_unpack_info,const MY_BITMAP * covered_bitmap,uchar * const buf)204 Rdb_key_field_iterator::Rdb_key_field_iterator(
205     const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
206     Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
207     bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
208   m_key_def = key_def;
209   m_pack_info = pack_info;
210   m_iter_index = 0;
211   m_iter_end = key_def->get_key_parts();
212   m_reader = reader;
213   m_unp_reader = unp_reader;
214   m_table = table;
215   m_has_unpack_info = has_unpack_info;
216   m_covered_bitmap = covered_bitmap;
217   m_buf = buf;
218   m_secondary_key =
219       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
220   m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
221   m_is_hidden_pk =
222       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
223   m_curr_bitmap_pos = 0;
224   m_offset = 0;
225 }
226 
get_dst() const227 void *Rdb_key_field_iterator::get_dst() const { return m_buf + m_offset; }
228 
get_field_index() const229 int Rdb_key_field_iterator::get_field_index() const {
230   DBUG_ASSERT(m_field != nullptr);
231   return m_field->field_index;
232 }
233 
get_is_null() const234 bool Rdb_key_field_iterator::get_is_null() const { return m_is_null; }
get_field() const235 Field *Rdb_key_field_iterator::get_field() const {
236   DBUG_ASSERT(m_field != nullptr);
237   return m_field;
238 }
239 
has_next()240 bool Rdb_key_field_iterator::has_next() { return m_iter_index < m_iter_end; }
241 
242 /**
243  Iterate each field in the key and decode/skip one by one
244 */
next()245 int Rdb_key_field_iterator::next() {
246   int status = HA_EXIT_SUCCESS;
247   while (m_iter_index < m_iter_end) {
248     int curr_index = m_iter_index++;
249 
250     m_fpi = &m_pack_info[curr_index];
251     /*
252       Hidden pk field is packed at the end of the secondary keys, but the SQL
253       layer does not know about it. Skip retrieving field if hidden pk.
254     */
255     if ((m_secondary_key && m_hidden_pk_exists &&
256          curr_index + 1 == m_iter_end) ||
257         m_is_hidden_pk) {
258       DBUG_ASSERT(m_fpi->m_unpack_func);
259       if ((m_fpi->m_skip_func)(m_fpi, nullptr, m_reader)) {
260         return HA_ERR_ROCKSDB_CORRUPT_DATA;
261       }
262       return HA_EXIT_SUCCESS;
263     }
264 
265     m_field = m_fpi->get_field_in_table(m_table);
266 
267     bool covered_column = true;
268     if (m_covered_bitmap != nullptr &&
269         m_field->real_type() == MYSQL_TYPE_VARCHAR && !m_fpi->m_covered) {
270       uint tmp= m_curr_bitmap_pos++;
271       covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
272                        bitmap_is_set(m_covered_bitmap, tmp);
273     }
274 
275     if (m_fpi->m_unpack_func && covered_column) {
276       /* It is possible to unpack this column. Do it. */
277       status = Rdb_convert_to_record_key_decoder::decode(
278           m_buf, &m_offset, m_fpi, m_table, m_field, m_has_unpack_info,
279           m_reader, m_unp_reader);
280       if (status) {
281         return status;
282       }
283       break;
284     } else {
285       status = Rdb_convert_to_record_key_decoder::skip(m_fpi, m_field, m_reader,
286                                                        m_unp_reader);
287       if (status) {
288         return status;
289       }
290     }
291   }
292   return HA_EXIT_SUCCESS;
293 }
294 
295 /*
296   Rdb_key_def class implementation
297 */
Rdb_key_def(uint indexnr_arg,uint keyno_arg,rocksdb::ColumnFamilyHandle * cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_per_partition_cf_arg,const char * _name,Rdb_index_stats _stats,uint32 index_flags_bitmap,uint32 ttl_rec_offset,uint64 ttl_duration)298 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
299                          rocksdb::ColumnFamilyHandle *cf_handle_arg,
300                          uint16_t index_dict_version_arg, uchar index_type_arg,
301                          uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
302                          bool is_per_partition_cf_arg, const char *_name,
303                          Rdb_index_stats _stats, uint32 index_flags_bitmap,
304                          uint32 ttl_rec_offset, uint64 ttl_duration)
305     : m_index_number(indexnr_arg),
306       m_cf_handle(cf_handle_arg),
307       m_index_dict_version(index_dict_version_arg),
308       m_index_type(index_type_arg),
309       m_kv_format_version(kv_format_version_arg),
310       m_is_reverse_cf(is_reverse_cf_arg),
311       m_is_per_partition_cf(is_per_partition_cf_arg),
312       m_name(_name),
313       m_stats(_stats),
314       m_index_flags_bitmap(index_flags_bitmap),
315       m_ttl_rec_offset(ttl_rec_offset),
316       m_ttl_duration(ttl_duration),
317       m_ttl_column(""),
318       m_pk_part_no(nullptr),
319       m_pack_info(nullptr),
320       m_keyno(keyno_arg),
321       m_key_parts(0),
322       m_ttl_pk_key_part_offset(UINT_MAX),
323       m_ttl_field_index(UINT_MAX),
324       m_prefix_extractor(nullptr),
325       m_maxlength(0)  // means 'not intialized'
326 {
327   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
328   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
329   m_total_index_flags_length =
330       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
331   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
332                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
333                   m_total_index_flags_length == 0);
334   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
335                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
336                   m_total_index_flags_length == 0);
337   DBUG_ASSERT(m_cf_handle != nullptr);
338 }
339 
Rdb_key_def(const Rdb_key_def & k)340 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
341     : m_index_number(k.m_index_number),
342       m_cf_handle(k.m_cf_handle),
343       m_is_reverse_cf(k.m_is_reverse_cf),
344       m_is_per_partition_cf(k.m_is_per_partition_cf),
345       m_name(k.m_name),
346       m_stats(k.m_stats),
347       m_index_flags_bitmap(k.m_index_flags_bitmap),
348       m_ttl_rec_offset(k.m_ttl_rec_offset),
349       m_ttl_duration(k.m_ttl_duration),
350       m_ttl_column(k.m_ttl_column),
351       m_pk_part_no(k.m_pk_part_no),
352       m_pack_info(k.m_pack_info),
353       m_keyno(k.m_keyno),
354       m_key_parts(k.m_key_parts),
355       m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
356       m_ttl_field_index(UINT_MAX),
357       m_prefix_extractor(k.m_prefix_extractor),
358       m_maxlength(k.m_maxlength) {
359   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
360   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
361   m_total_index_flags_length =
362       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
363   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
364                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
365                   m_total_index_flags_length == 0);
366   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
367                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
368                   m_total_index_flags_length == 0);
369   if (k.m_pack_info) {
370     const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
371     void *pack_info= my_malloc(PSI_INSTRUMENT_ME, size, MYF(0));
372     memcpy(pack_info, k.m_pack_info, size);
373     m_pack_info = reinterpret_cast<Rdb_field_packing *>(pack_info);
374   }
375 
376   if (k.m_pk_part_no) {
377     const size_t size = sizeof(uint) * m_key_parts;
378     m_pk_part_no = reinterpret_cast<uint *>(my_malloc(PSI_INSTRUMENT_ME, size, MYF(0)));
379     memcpy(m_pk_part_no, k.m_pk_part_no, size);
380   }
381 }
382 
~Rdb_key_def()383 Rdb_key_def::~Rdb_key_def() {
384   mysql_mutex_destroy(&m_mutex);
385 
386   my_free(m_pk_part_no);
387   m_pk_part_no = nullptr;
388 
389   my_free(m_pack_info);
390   m_pack_info = nullptr;
391 }
392 
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)393 void Rdb_key_def::setup(const TABLE *const tbl,
394                         const Rdb_tbl_def *const tbl_def) {
395   DBUG_ASSERT(tbl != nullptr);
396   DBUG_ASSERT(tbl_def != nullptr);
397 
398   /*
399     Set max_length based on the table.  This can be called concurrently from
400     multiple threads, so there is a mutex to protect this code.
401   */
402   const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
403   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
404   const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
405   if (!m_maxlength) {
406     RDB_MUTEX_LOCK_CHECK(m_mutex);
407     if (m_maxlength != 0) {
408       RDB_MUTEX_UNLOCK_CHECK(m_mutex);
409       return;
410     }
411 
412     KEY *key_info = nullptr;
413     KEY *pk_info = nullptr;
414     if (!is_hidden_pk) {
415       key_info = &tbl->key_info[m_keyno];
416       if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
417       m_name = std::string(key_info->name.str);
418     } else {
419       m_name = HIDDEN_PK_NAME;
420     }
421 
422     if (secondary_key) {
423       m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts;
424     } else {
425       pk_info = nullptr;
426       m_pk_key_parts = 0;
427     }
428 
429     // "unique" secondary keys support:
430     m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts;
431 
432     if (secondary_key) {
433       /*
434         In most cases, SQL layer puts PK columns as invisible suffix at the
435         end of secondary key. There are cases where this doesn't happen:
436         - unique secondary indexes.
437         - partitioned tables.
438 
439         Internally, we always need PK columns as suffix (and InnoDB does,
440         too, if you were wondering).
441 
442         The loop below will attempt to put all PK columns at the end of key
443         definition.  Columns that are already included in the index (either
444         by the user or by "extended keys" feature) are not included for the
445         second time.
446       */
447       m_key_parts += m_pk_key_parts;
448     }
449 
450     if (secondary_key) {
451       m_pk_part_no = reinterpret_cast<uint *>(
452           my_malloc(PSI_INSTRUMENT_ME, sizeof(uint) * m_key_parts, MYF(0)));
453     } else {
454       m_pk_part_no = nullptr;
455     }
456 
457     const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
458     m_pack_info =
459         reinterpret_cast<Rdb_field_packing *>(my_malloc(PSI_INSTRUMENT_ME, size, MYF(0)));
460 
461     /*
462       Guaranteed not to error here as checks have been made already during
463       table creation.
464     */
465     Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
466                                  &m_ttl_field_index, true);
467 
468     size_t max_len = INDEX_NUMBER_SIZE;
469     int unpack_len = 0;
470     int max_part_len = 0;
471     bool simulating_extkey = false;
472     uint dst_i = 0;
473 
474     uint keyno_to_set = m_keyno;
475     uint keypart_to_set = 0;
476 
477     if (is_hidden_pk) {
478       Field *field = nullptr;
479       m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
480       m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
481       max_len += m_pack_info[dst_i].m_max_image_len;
482       max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
483       dst_i++;
484     } else {
485       KEY_PART_INFO *key_part = key_info->key_part;
486 
487       /* this loop also loops over the 'extended key' tail */
488       for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
489         Field *const field = key_part ? key_part->field : nullptr;
490 
491         if (simulating_extkey && !hidden_pk_exists) {
492           DBUG_ASSERT(secondary_key);
493           /* Check if this field is already present in the key definition */
494           bool found = false;
495           for (uint j= 0; j < key_info->ext_key_parts; j++) {
496             if (field->field_index ==
497                     key_info->key_part[j].field->field_index &&
498                 key_part->length == key_info->key_part[j].length) {
499               found = true;
500               break;
501             }
502           }
503 
504           if (found) {
505             key_part++;
506             continue;
507           }
508         }
509 
510         if (field && field->real_maybe_null()) max_len += 1;  // NULL-byte
511 
512         m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
513                                  key_part ? key_part->length : 0);
514         m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
515 
516         if (pk_info) {
517           m_pk_part_no[dst_i] = -1;
518           for (uint j = 0; j < m_pk_key_parts; j++) {
519             if (field->field_index == pk_info->key_part[j].field->field_index) {
520               m_pk_part_no[dst_i] = j;
521               break;
522             }
523           }
524         } else if (secondary_key && hidden_pk_exists) {
525           /*
526             The hidden pk can never be part of the sk.  So it is always
527             appended to the end of the sk.
528           */
529           m_pk_part_no[dst_i] = -1;
530           if (simulating_extkey) m_pk_part_no[dst_i] = 0;
531         }
532 
533         max_len += m_pack_info[dst_i].m_max_image_len;
534 
535         max_part_len =
536             std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
537 
538         /*
539           Check key part name here, if it matches the TTL column then we store
540           the offset of the TTL key part here.
541         */
542         if (!m_ttl_column.empty() &&
543             field_check_field_name_match(field, m_ttl_column.c_str())) {
544           DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
545           DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
546           DBUG_ASSERT(!field->real_maybe_null());
547           m_ttl_pk_key_part_offset = dst_i;
548         }
549 
550         key_part++;
551         /*
552           For "unique" secondary indexes, pretend they have
553           "index extensions".
554 
555           MariaDB also has this property: if an index has a partially-covered
556           column like KEY(varchar_col(N)), then the SQL layer will think it is
557           not "extended" with PK columns. The code below handles this case,
558           also.
559          */
560         if (secondary_key && src_i+1 == key_info->ext_key_parts) {
561           simulating_extkey = true;
562           if (!hidden_pk_exists) {
563             keyno_to_set = tbl->s->primary_key;
564             key_part = pk_info->key_part;
565             keypart_to_set = (uint)-1;
566           } else {
567             keyno_to_set = tbl_def->m_key_count - 1;
568             key_part = nullptr;
569             keypart_to_set = 0;
570           }
571         }
572 
573         dst_i++;
574       }
575     }
576 
577     m_key_parts = dst_i;
578 
579     /* Initialize the memory needed by the stats structure */
580     m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
581 
582     /* Cache prefix extractor for bloom filter usage later */
583     rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
584     m_prefix_extractor = opt.prefix_extractor;
585 
586     /*
587       This should be the last member variable set before releasing the mutex
588       so that other threads can't see the object partially set up.
589      */
590     m_maxlength = max_len;
591 
592     RDB_MUTEX_UNLOCK_CHECK(m_mutex);
593   }
594 }
595 
596 /*
597   Determine if the table has TTL enabled by parsing the table comment.
598 
599   @param[IN]  table_arg
600   @param[IN]  tbl_def_arg
601   @param[OUT] ttl_duration        Default TTL value parsed from table comment
602 */
extract_ttl_duration(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,uint64 * ttl_duration)603 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
604                                        const Rdb_tbl_def *const tbl_def_arg,
605                                        uint64 *ttl_duration) {
606   DBUG_ASSERT(table_arg != nullptr);
607   DBUG_ASSERT(tbl_def_arg != nullptr);
608   DBUG_ASSERT(ttl_duration != nullptr);
609   std::string table_comment(table_arg->s->comment.str,
610                             table_arg->s->comment.length);
611 
612   bool ttl_duration_per_part_match_found = false;
613   std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
614       table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
615       RDB_TTL_DURATION_QUALIFIER);
616 
617   /* If we don't have a ttl duration, nothing to do here. */
618   if (ttl_duration_str.empty()) {
619     return HA_EXIT_SUCCESS;
620   }
621 
622   /*
623     Catch errors where a non-integral value was used as ttl duration, strtoull
624     will return 0.
625   */
626   *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
627   if (!*ttl_duration) {
628     my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
629     return HA_EXIT_FAILURE;
630   }
631 
632   return HA_EXIT_SUCCESS;
633 }
634 
635 /*
636   Determine if the table has TTL enabled by parsing the table comment.
637 
638   @param[IN]  table_arg
639   @param[IN]  tbl_def_arg
640   @param[OUT] ttl_column          TTL column in the table
641   @param[IN]  skip_checks         Skip validation checks (when called in
642                                   setup())
643 */
extract_ttl_col(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,std::string * ttl_column,uint * ttl_field_index,bool skip_checks)644 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
645                                   const Rdb_tbl_def *const tbl_def_arg,
646                                   std::string *ttl_column,
647                                   uint *ttl_field_index, bool skip_checks) {
648   std::string table_comment(table_arg->s->comment.str,
649                             table_arg->s->comment.length);
650   /*
651     Check if there is a TTL column specified. Note that this is not required
652     and if omitted, an 8-byte ttl field will be prepended to each record
653     implicitly.
654   */
655   bool ttl_col_per_part_match_found = false;
656   std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
657       table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
658       RDB_TTL_COL_QUALIFIER);
659 
660   if (skip_checks) {
661     for (uint i = 0; i < table_arg->s->fields; i++) {
662       Field *const field = table_arg->field[i];
663       if (field_check_field_name_match(field, ttl_col_str.c_str())) {
664         *ttl_column = ttl_col_str;
665         *ttl_field_index = i;
666       }
667     }
668     return HA_EXIT_SUCCESS;
669   }
670 
671   /* Check if TTL column exists in table */
672   if (!ttl_col_str.empty()) {
673     bool found = false;
674     for (uint i = 0; i < table_arg->s->fields; i++) {
675       Field *const field = table_arg->field[i];
676       if (field_check_field_name_match(field, ttl_col_str.c_str()) &&
677           field->real_type() == MYSQL_TYPE_LONGLONG &&
678           field->key_type() == HA_KEYTYPE_ULONGLONG &&
679           !field->real_maybe_null()) {
680         *ttl_column = ttl_col_str;
681         *ttl_field_index = i;
682         found = true;
683         break;
684       }
685     }
686 
687     if (!found) {
688       my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
689       return HA_EXIT_FAILURE;
690     }
691   }
692 
693   return HA_EXIT_SUCCESS;
694 }
695 
gen_qualifier_for_table(const char * const qualifier,const std::string & partition_name)696 const std::string Rdb_key_def::gen_qualifier_for_table(
697     const char *const qualifier, const std::string &partition_name) {
698   bool has_partition = !partition_name.empty();
699   std::string qualifier_str = "";
700 
701   if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
702     return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
703                          : qualifier_str + RDB_CF_NAME_QUALIFIER +
704                                RDB_QUALIFIER_VALUE_SEP;
705   } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
706     return has_partition
707                ? gen_ttl_duration_qualifier_for_partition(partition_name)
708                : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
709                      RDB_QUALIFIER_VALUE_SEP;
710   } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
711     return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
712                          : qualifier_str + RDB_TTL_COL_QUALIFIER +
713                                RDB_QUALIFIER_VALUE_SEP;
714   } else {
715     DBUG_ASSERT(0);
716   }
717 
718   return qualifier_str;
719 }
720 
721 /*
722   Formats the string and returns the column family name assignment part for a
723   specific partition.
724 */
gen_cf_name_qualifier_for_partition(const std::string & prefix)725 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
726     const std::string &prefix) {
727   DBUG_ASSERT(!prefix.empty());
728 
729   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
730          RDB_QUALIFIER_VALUE_SEP;
731 }
732 
gen_ttl_duration_qualifier_for_partition(const std::string & prefix)733 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
734     const std::string &prefix) {
735   DBUG_ASSERT(!prefix.empty());
736 
737   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
738          RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
739 }
740 
gen_ttl_col_qualifier_for_partition(const std::string & prefix)741 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
742     const std::string &prefix) {
743   DBUG_ASSERT(!prefix.empty());
744 
745   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
746          RDB_QUALIFIER_VALUE_SEP;
747 }
748 
parse_comment_for_qualifier(const std::string & comment,const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,bool * per_part_match_found,const char * const qualifier)749 const std::string Rdb_key_def::parse_comment_for_qualifier(
750     const std::string &comment, const TABLE *const table_arg,
751     const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
752     const char *const qualifier) {
753   DBUG_ASSERT(table_arg != nullptr);
754   DBUG_ASSERT(tbl_def_arg != nullptr);
755   DBUG_ASSERT(per_part_match_found != nullptr);
756   DBUG_ASSERT(qualifier != nullptr);
757 
758   std::string empty_result;
759 
760   // Flag which marks if partition specific options were found.
761   *per_part_match_found = false;
762 
763   if (comment.empty()) {
764     return empty_result;
765   }
766 
767   // Let's fetch the comment for a index and check if there's a custom key
768   // name specified for a partition we are handling.
769   std::vector<std::string> v =
770       myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
771 
772   std::string search_str = gen_qualifier_for_table(qualifier);
773 
774   // If table has partitions then we need to check if user has requested
775   // qualifiers on a per partition basis.
776   //
777   // NOTE: this means if you specify a qualifier for a specific partition it
778   // will take precedence the 'table level' qualifier if one exists.
779   std::string search_str_part;
780   if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) {
781     std::string partition_name = tbl_def_arg->base_partition();
782     DBUG_ASSERT(!partition_name.empty());
783     search_str_part = gen_qualifier_for_table(qualifier, partition_name);
784   }
785 
786   DBUG_ASSERT(!search_str.empty());
787 
788   // Basic O(N) search for a matching assignment. At most we expect maybe
789   // ten or so elements here.
790   if (!search_str_part.empty()) {
791     for (const auto &it : v) {
792       if (it.substr(0, search_str_part.length()) == search_str_part) {
793         // We found a prefix match. Try to parse it as an assignment.
794         std::vector<std::string> tokens =
795             myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
796 
797         // We found a custom qualifier, it was in the form we expected it to be.
798         // Return that instead of whatever we initially wanted to return. In
799         // a case below the `foo` part will be returned to the caller.
800         //
801         // p3_cfname=foo
802         //
803         // If no value was specified then we'll return an empty string which
804         // later gets translated into using a default CF.
805         if (tokens.size() == 2) {
806           *per_part_match_found = true;
807           return tokens[1];
808         } else {
809           return empty_result;
810         }
811       }
812     }
813   }
814 
815   // Do this loop again, this time searching for 'table level' qualifiers if we
816   // didn't find any partition level qualifiers above.
817   for (const auto &it : v) {
818     if (it.substr(0, search_str.length()) == search_str) {
819       std::vector<std::string> tokens =
820           myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
821       if (tokens.size() == 2) {
822         return tokens[1];
823       } else {
824         return empty_result;
825       }
826     }
827   }
828 
829   // If we didn't find any partitioned/non-partitioned qualifiers, return an
830   // empty string.
831   return empty_result;
832 }
833 
834 /**
835   Read a memcmp key part from a slice using the passed in reader.
836 
837   Returns -1 if field was null, 1 if error, 0 otherwise.
838 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const839 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
840                                       Rdb_string_reader *reader,
841                                       const uint part_num) const {
842   /* It is impossible to unpack the column. Skip it. */
843   if (m_pack_info[part_num].m_maybe_null) {
844     const char *nullp;
845     if (!(nullp = reader->read(1))) return 1;
846     if (*nullp == 0) {
847       /* This is a NULL value */
848       return -1;
849     } else {
850       /* If NULL marker is not '0', it can be only '1'  */
851       if (*nullp != 1) return 1;
852     }
853   }
854 
855   Rdb_field_packing *fpi = &m_pack_info[part_num];
856   DBUG_ASSERT(table_arg->s != nullptr);
857 
858   bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
859                            (table_arg->s->primary_key == MAX_INDEXES);
860   Field *field = nullptr;
861   if (!is_hidden_pk_part) {
862     field = fpi->get_field_in_table(table_arg);
863   }
864   if ((fpi->m_skip_func)(fpi, field, reader)) {
865     return 1;
866   }
867   return 0;
868 }
869 
870 /**
871   Get a mem-comparable form of Primary Key from mem-comparable form of this key
872 
873   @param
874     pk_descr        Primary Key descriptor
875     key             Index tuple from this key in mem-comparable form
876     pk_buffer  OUT  Put here mem-comparable form of the Primary Key.
877 
878   @note
879     It may or may not be possible to restore primary key columns to their
880     mem-comparable form.  To handle all cases, this function copies mem-
881     comparable forms directly.
882 
883     RocksDB SE supports "Extended keys". This means that PK columns are present
884     at the end of every key.  If the key already includes PK columns, then
885     these columns are not present at the end of the key.
886 
887     Because of the above, we copy each primary key column.
888 
889   @todo
890     If we checked crc32 checksums in this function, we would catch some CRC
891     violations that we currently don't. On the other hand, there is a broader
892     set of queries for which we would check the checksum twice.
893 */
894 
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const895 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
896                                         const Rdb_key_def &pk_descr,
897                                         const rocksdb::Slice *const key,
898                                         uchar *const pk_buffer) const {
899   DBUG_ASSERT(table != nullptr);
900   DBUG_ASSERT(key != nullptr);
901   DBUG_ASSERT(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
902   DBUG_ASSERT(pk_buffer);
903 
904   uint size = 0;
905   uchar *buf = pk_buffer;
906   DBUG_ASSERT(m_pk_key_parts);
907 
908   /* Put the PK number */
909   rdb_netbuf_store_index(buf, pk_descr.m_index_number);
910   buf += INDEX_NUMBER_SIZE;
911   size += INDEX_NUMBER_SIZE;
912 
913   const char *start_offs[MAX_REF_PARTS];
914   const char *end_offs[MAX_REF_PARTS];
915   int pk_key_part;
916   uint i;
917   Rdb_string_reader reader(key);
918 
919   // Skip the index number
920   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
921 
922   for (i = 0; i < m_key_parts; i++) {
923     if ((pk_key_part = m_pk_part_no[i]) != -1) {
924       start_offs[pk_key_part] = reader.get_current_ptr();
925     }
926 
927     if (read_memcmp_key_part(table, &reader, i) > 0) {
928       return RDB_INVALID_KEY_LEN;
929     }
930 
931     if (pk_key_part != -1) {
932       end_offs[pk_key_part] = reader.get_current_ptr();
933     }
934   }
935 
936   for (i = 0; i < m_pk_key_parts; i++) {
937     const uint part_size = end_offs[i] - start_offs[i];
938     memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
939     buf += part_size;
940     size += part_size;
941   }
942 
943   return size;
944 }
945 
946 /**
947   Get a mem-comparable form of Secondary Key from mem-comparable form of this
948   key, without the extended primary key tail.
949 
950   @param
951     key                Index tuple from this key in mem-comparable form
952     sk_buffer     OUT  Put here mem-comparable form of the Secondary Key.
953     n_null_fields OUT  Put number of null fields contained within sk entry
954 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const955 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
956                                       const rocksdb::Slice &key,
957                                       uchar *sk_buffer,
958                                       uint *n_null_fields) const {
959   DBUG_ASSERT(table != nullptr);
960   DBUG_ASSERT(sk_buffer != nullptr);
961   DBUG_ASSERT(n_null_fields != nullptr);
962   DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
963 
964   uchar *buf = sk_buffer;
965 
966   int res;
967   Rdb_string_reader reader(&key);
968   const char *start = reader.get_current_ptr();
969 
970   // Skip the index number
971   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
972 
973   for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
974     if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
975       return RDB_INVALID_KEY_LEN;
976     } else if (res == -1) {
977       (*n_null_fields)++;
978     }
979   }
980 
981   uint sk_memcmp_len = reader.get_current_ptr() - start;
982   memcpy(buf, start, sk_memcmp_len);
983   return sk_memcmp_len;
984 }
985 
986 /**
987   Convert index tuple into storage (i.e. mem-comparable) format
988 
989   @detail
990     Currently this is done by unpacking into record_buffer and then
991     packing index columns into storage format.
992 
993   @param pack_buffer Temporary area for packing varchar columns. Its
994                      size is at least max_storage_fmt_length() bytes.
995 */
996 
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,uchar * const record_buffer,const uchar * const key_tuple,const key_part_map & keypart_map) const997 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
998                                    uchar *const packed_tuple,
999                                    uchar *const record_buffer,
1000                                    const uchar *const key_tuple,
1001                                    const key_part_map &keypart_map) const {
1002   DBUG_ASSERT(tbl != nullptr);
1003   DBUG_ASSERT(pack_buffer != nullptr);
1004   DBUG_ASSERT(packed_tuple != nullptr);
1005   DBUG_ASSERT(key_tuple != nullptr);
1006 
1007   /* We were given a record in KeyTupleFormat. First, save it to record */
1008   const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
1009   key_restore(record_buffer, key_tuple, &tbl->key_info[m_keyno], key_len);
1010 
1011   uint n_used_parts = my_count_bits(keypart_map);
1012   if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0;  // Full key is used
1013 
1014   /* Then, convert the record into a mem-comparable form */
1015   return pack_record(tbl, pack_buffer, record_buffer, packed_tuple, nullptr,
1016                      false, 0, n_used_parts);
1017 }
1018 
1019 /**
1020   @brief
1021     Check if "unpack info" data includes checksum.
1022 
1023   @detail
1024     This is used only by CHECK TABLE to count the number of rows that have
1025     checksums.
1026 */
1027 
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)1028 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
1029   size_t size = unpack_info.size();
1030   if (size == 0) {
1031     return false;
1032   }
1033   const uchar *ptr = (const uchar *)unpack_info.data();
1034 
1035   // Skip unpack info if present.
1036   if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
1037     const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
1038     SHIP_ASSERT(size >= skip_len);
1039 
1040     size -= skip_len;
1041     ptr += skip_len;
1042   }
1043 
1044   return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
1045 }
1046 
1047 /*
1048   @return Number of bytes that were changed
1049 */
successor(uchar * const packed_tuple,const uint len)1050 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
1051   DBUG_ASSERT(packed_tuple != nullptr);
1052 
1053   int changed = 0;
1054   uchar *p = packed_tuple + len - 1;
1055   for (; p > packed_tuple; p--) {
1056     changed++;
1057     if (*p != uchar(0xFF)) {
1058       *p = *p + 1;
1059       break;
1060     }
1061     *p = '\0';
1062   }
1063   return changed;
1064 }
1065 
1066 /*
1067   @return Number of bytes that were changed
1068 */
predecessor(uchar * const packed_tuple,const uint len)1069 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
1070   DBUG_ASSERT(packed_tuple != nullptr);
1071 
1072   int changed = 0;
1073   uchar *p = packed_tuple + len - 1;
1074   for (; p > packed_tuple; p--) {
1075     changed++;
1076     if (*p != uchar(0x00)) {
1077       *p = *p - 1;
1078       break;
1079     }
1080     *p = 0xFF;
1081   }
1082   return changed;
1083 }
1084 
1085 static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
1086     {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
1087     {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
1088 
1089 /*
1090   @return The length in bytes of the header specified by the given tag
1091 */
get_unpack_header_size(char tag)1092 size_t Rdb_key_def::get_unpack_header_size(char tag) {
1093   DBUG_ASSERT(is_unpack_data_tag(tag));
1094   return UNPACK_HEADER_SIZES.at(tag);
1095 }
1096 
1097 /*
1098   Get a bitmap indicating which varchar columns must be covered for this
1099   lookup to be covered. If the bitmap is a subset of the covered bitmap, then
1100   the lookup is covered. If it can already be determined that the lookup is
1101   not covered, map->bitmap will be set to null.
1102  */
get_lookup_bitmap(const TABLE * table,MY_BITMAP * map) const1103 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
1104   DBUG_ASSERT(map->bitmap == nullptr);
1105   bitmap_init(map, nullptr, MAX_REF_PARTS, false);
1106   uint curr_bitmap_pos = 0;
1107 
1108   // Indicates which columns in the read set might be covered.
1109   MY_BITMAP maybe_covered_bitmap;
1110   bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
1111 
1112   for (uint i = 0; i < m_key_parts; i++) {
1113     if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
1114       continue;
1115     }
1116 
1117     Field *const field = m_pack_info[i].get_field_in_table(table);
1118 
1119     // Columns which are always covered are not stored in the covered bitmap so
1120     // we can ignore them here too.
1121     if (m_pack_info[i].m_covered &&
1122         bitmap_is_set(table->read_set, field->field_index)) {
1123       bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1124       continue;
1125     }
1126 
1127     switch (field->real_type()) {
1128       // This type may be covered depending on the record. If it was requested,
1129       // we require the covered bitmap to have this bit set.
1130       case MYSQL_TYPE_VARCHAR:
1131         if (curr_bitmap_pos < MAX_REF_PARTS) {
1132           if (bitmap_is_set(table->read_set, field->field_index)) {
1133             bitmap_set_bit(map, curr_bitmap_pos);
1134             bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1135           }
1136           curr_bitmap_pos++;
1137         } else {
1138           bitmap_free(&maybe_covered_bitmap);
1139           bitmap_free(map);
1140           return;
1141         }
1142         break;
1143       // This column is a type which is never covered. If it was requested, we
1144       // know this lookup will never be covered.
1145       default:
1146         if (bitmap_is_set(table->read_set, field->field_index)) {
1147           bitmap_free(&maybe_covered_bitmap);
1148           bitmap_free(map);
1149           return;
1150         }
1151         break;
1152     }
1153   }
1154 
1155   // If there are columns which are not covered in the read set, the lookup
1156   // can't be covered.
1157   if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
1158     bitmap_free(map);
1159   }
1160   bitmap_free(&maybe_covered_bitmap);
1161 }
1162 
1163 /*
1164   Return true if for this secondary index
1165   - All of the requested columns are in the index
1166   - All values for columns that are prefix-only indexes are shorter or equal
1167     in length to the prefix
1168  */
covers_lookup(const rocksdb::Slice * const unpack_info,const MY_BITMAP * const lookup_bitmap) const1169 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
1170                                 const MY_BITMAP *const lookup_bitmap) const {
1171   DBUG_ASSERT(lookup_bitmap != nullptr);
1172   if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
1173     return false;
1174   }
1175 
1176   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1177 
1178   // Check if this unpack_info has a covered_bitmap
1179   const char *unpack_header = unp_reader.get_current_ptr();
1180   const bool has_covered_unpack_info =
1181       unp_reader.remaining_bytes() &&
1182       unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
1183   if (!has_covered_unpack_info ||
1184       !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
1185     return false;
1186   }
1187 
1188   MY_BITMAP covered_bitmap;
1189   my_bitmap_map covered_bits;
1190   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1191   covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1192                                       sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1193                                       RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1194 
1195   return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
1196 }
1197 
1198 /* Indicates that all key parts can be unpacked to cover a secondary lookup */
can_cover_lookup() const1199 bool Rdb_key_def::can_cover_lookup() const {
1200   for (uint i = 0; i < m_key_parts; i++) {
1201     if (!m_pack_info[i].m_covered) return false;
1202   }
1203   return true;
1204 }
1205 
pack_field(Field * const field,Rdb_field_packing * pack_info,uchar * tuple,uchar * const packed_tuple,uchar * const pack_buffer,Rdb_string_writer * const unpack_info,uint * const n_null_fields) const1206 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
1207                                uchar *tuple, uchar *const packed_tuple,
1208                                uchar *const pack_buffer,
1209                                Rdb_string_writer *const unpack_info,
1210                                uint *const n_null_fields) const {
1211   if (field->real_maybe_null()) {
1212     DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
1213     if (field->is_real_null()) {
1214       /* NULL value. store '\0' so that it sorts before non-NULL values */
1215       *tuple++ = 0;
1216       /* That's it, don't store anything else */
1217       if (n_null_fields) (*n_null_fields)++;
1218       return tuple;
1219     } else {
1220       /* Not a NULL value. Store '1' */
1221       *tuple++ = 1;
1222     }
1223   }
1224 
1225   const bool create_unpack_info =
1226       (unpack_info &&  // we were requested to generate unpack_info
1227        pack_info->uses_unpack_info());  // and this keypart uses it
1228   Rdb_pack_field_context pack_ctx(unpack_info);
1229 
1230   // Set the offset for methods which do not take an offset as an argument
1231   DBUG_ASSERT(
1232       is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
1233 
1234   (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
1235 
1236   /* Make "unpack info" to be stored in the value */
1237   if (create_unpack_info) {
1238     (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
1239                                          &pack_ctx);
1240   }
1241 
1242   return tuple;
1243 }
1244 
1245 /**
1246   Get index columns from the record and pack them into mem-comparable form.
1247 
1248   @param
1249     tbl                   Table we're working on
1250     record           IN   Record buffer with fields in table->record format
1251     pack_buffer      IN   Temporary area for packing varchars. The size is
1252                           at least max_storage_fmt_length() bytes.
1253     packed_tuple     OUT  Key in the mem-comparable form
1254     unpack_info      OUT  Unpack data
1255     unpack_info_len  OUT  Unpack data length
1256     n_key_parts           Number of keyparts to process. 0 means all of them.
1257     n_null_fields    OUT  Number of key fields with NULL value.
1258     ttl_bytes        IN   Previous ttl bytes from old record for update case or
1259                           current ttl bytes from just packed primary key/value
1260   @detail
1261     Some callers do not need the unpack information, they can pass
1262     unpack_info=nullptr, unpack_info_len=nullptr.
1263 
1264   @return
1265     Length of the packed tuple
1266 */
1267 
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool should_store_row_debug_checksums,const longlong hidden_pk_id,uint n_key_parts,uint * const n_null_fields,const char * const ttl_bytes) const1268 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
1269                               const uchar *const record,
1270                               uchar *const packed_tuple,
1271                               Rdb_string_writer *const unpack_info,
1272                               const bool should_store_row_debug_checksums,
1273                               const longlong hidden_pk_id, uint n_key_parts,
1274                               uint *const n_null_fields,
1275                               const char *const ttl_bytes) const {
1276   DBUG_ASSERT(tbl != nullptr);
1277   DBUG_ASSERT(pack_buffer != nullptr);
1278   DBUG_ASSERT(record != nullptr);
1279   DBUG_ASSERT(packed_tuple != nullptr);
1280   // Checksums for PKs are made when record is packed.
1281   // We should never attempt to make checksum just from PK values
1282   DBUG_ASSERT_IMP(should_store_row_debug_checksums,
1283                   (m_index_type == INDEX_TYPE_SECONDARY));
1284 
1285   uchar *tuple = packed_tuple;
1286   size_t unpack_start_pos = size_t(-1);
1287   size_t unpack_len_pos = size_t(-1);
1288   size_t covered_bitmap_pos = size_t(-1);
1289   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1290 
1291   rdb_netbuf_store_index(tuple, m_index_number);
1292   tuple += INDEX_NUMBER_SIZE;
1293 
1294   // If n_key_parts is 0, it means all columns.
1295   // The following includes the 'extended key' tail.
1296   // The 'extended key' includes primary key. This is done to 'uniqify'
1297   // non-unique indexes
1298   const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1299 
1300   // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1301   // hidden key part.  So we skip it (its always 1 part).
1302   if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
1303     n_key_parts = m_key_parts - 1;
1304   } else if (use_all_columns) {
1305     n_key_parts = m_key_parts;
1306   }
1307 
1308   if (n_null_fields) *n_null_fields = 0;
1309 
1310   // Check if we need a covered bitmap. If it is certain that all key parts are
1311   // covering, we don't need one.
1312   bool store_covered_bitmap = false;
1313   if (unpack_info && use_covered_bitmap_format()) {
1314     for (uint i = 0; i < n_key_parts; i++) {
1315       if (!m_pack_info[i].m_covered) {
1316         store_covered_bitmap = true;
1317         break;
1318       }
1319     }
1320   }
1321 
1322   const char tag =
1323       store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1324 
1325   if (unpack_info) {
1326     unpack_info->clear();
1327 
1328     if (m_index_type == INDEX_TYPE_SECONDARY &&
1329         m_total_index_flags_length > 0) {
1330       // Reserve space for index flag fields
1331       unpack_info->allocate(m_total_index_flags_length);
1332 
1333       // Insert TTL timestamp
1334       if (has_ttl() && ttl_bytes) {
1335         write_index_flag_field(unpack_info,
1336                                reinterpret_cast<const uchar *>(ttl_bytes),
1337                                Rdb_key_def::TTL_FLAG);
1338       }
1339     }
1340 
1341     unpack_start_pos = unpack_info->get_current_pos();
1342     unpack_info->write_uint8(tag);
1343     unpack_len_pos = unpack_info->get_current_pos();
1344     // we don't know the total length yet, so write a zero
1345     unpack_info->write_uint16(0);
1346 
1347     if (store_covered_bitmap) {
1348       // Reserve two bytes for the covered bitmap. This will store, for key
1349       // parts which are not always covering, whether or not it is covering
1350       // for this record.
1351       covered_bitmap_pos = unpack_info->get_current_pos();
1352       unpack_info->write_uint16(0);
1353     }
1354   }
1355 
1356   MY_BITMAP covered_bitmap;
1357   my_bitmap_map covered_bits;
1358   uint curr_bitmap_pos = 0;
1359   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1360 
1361   for (uint i = 0; i < n_key_parts; i++) {
1362     // Fill hidden pk id into the last key part for secondary keys for tables
1363     // with no pk
1364     if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1365       m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1366       break;
1367     }
1368 
1369     Field *const field = m_pack_info[i].get_field_in_table(tbl);
1370     DBUG_ASSERT(field != nullptr);
1371 
1372     uint field_offset = field->ptr - tbl->record[0];
1373     uint null_offset = field->null_offset(tbl->record[0]);
1374     bool maybe_null = field->real_maybe_null();
1375 
1376     field->move_field(
1377         const_cast<uchar *>(record) + field_offset,
1378         maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
1379         field->null_bit);
1380     // WARNING! Don't return without restoring field->ptr and field->null_ptr
1381 
1382     tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1383                        unpack_info, n_null_fields);
1384 
1385     // If this key part is a prefix of a VARCHAR field, check if it's covered.
1386     if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1387         !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1388       size_t data_length = field->data_length();
1389       uint16 key_length;
1390       if (m_pk_part_no[i] == (uint)-1) {
1391         key_length = tbl->key_info[get_keyno()].key_part[i].length;
1392       } else {
1393         key_length =
1394             tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1395       }
1396 
1397       if (m_pack_info[i].m_unpack_func != nullptr &&
1398           data_length <= key_length) {
1399         bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1400       }
1401       curr_bitmap_pos++;
1402     }
1403 
1404     // Restore field->ptr and field->null_ptr
1405     field->move_field(tbl->record[0] + field_offset,
1406                       maybe_null ? tbl->record[0] + null_offset : nullptr,
1407                       field->null_bit);
1408   }
1409 
1410   if (unpack_info) {
1411     const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1412     DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
1413 
1414     // Don't store the unpack_info if it has only the header (that is, there's
1415     // no meaningful content).
1416     // Primary Keys are special: for them, store the unpack_info even if it's
1417     // empty (provided m_maybe_unpack_info==true, see
1418     // ha_rocksdb::convert_record_to_storage_format)
1419     if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1420       if (len == get_unpack_header_size(tag) && !covered_bits) {
1421         unpack_info->truncate(unpack_start_pos);
1422       } else if (store_covered_bitmap) {
1423         unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1424       }
1425     } else {
1426       unpack_info->write_uint16_at(unpack_len_pos, len);
1427     }
1428 
1429     //
1430     // Secondary keys have key and value checksums in the value part
1431     // Primary key is a special case (the value part has non-indexed columns),
1432     // so the checksums are computed and stored by
1433     // ha_rocksdb::convert_record_to_storage_format
1434     //
1435     if (should_store_row_debug_checksums) {
1436       const uint32_t key_crc32 =
1437           my_checksum(0, packed_tuple, tuple - packed_tuple);
1438       const uint32_t val_crc32 =
1439           my_checksum(0, unpack_info->ptr(), unpack_info->get_current_pos());
1440 
1441       unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1442       unpack_info->write_uint32(key_crc32);
1443       unpack_info->write_uint32(val_crc32);
1444     }
1445   }
1446 
1447   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1448 
1449   return tuple - packed_tuple;
1450 }
1451 
1452 /**
1453   Pack the hidden primary key into mem-comparable form.
1454 
1455   @param
1456     tbl                   Table we're working on
1457     hidden_pk_id     IN   New value to be packed into key
1458     packed_tuple     OUT  Key in the mem-comparable form
1459 
1460   @return
1461     Length of the packed tuple
1462 */
1463 
pack_hidden_pk(const longlong hidden_pk_id,uchar * const packed_tuple) const1464 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
1465                                  uchar *const packed_tuple) const {
1466   DBUG_ASSERT(packed_tuple != nullptr);
1467 
1468   uchar *tuple = packed_tuple;
1469   rdb_netbuf_store_index(tuple, m_index_number);
1470   tuple += INDEX_NUMBER_SIZE;
1471   DBUG_ASSERT(m_key_parts == 1);
1472   DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
1473                                    m_pack_info[0].m_max_image_len));
1474 
1475   m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1476 
1477   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1478   return tuple - packed_tuple;
1479 }
1480 
1481 /*
1482   Function of type rdb_index_field_pack_t
1483 */
1484 
pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1485 void Rdb_key_def::pack_with_make_sort_key(
1486     Rdb_field_packing *const fpi, Field *const field,
1487     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1488     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1489   DBUG_ASSERT(fpi != nullptr);
1490   DBUG_ASSERT(field != nullptr);
1491   DBUG_ASSERT(dst != nullptr);
1492   DBUG_ASSERT(*dst != nullptr);
1493 
1494   const int max_len = fpi->m_max_image_len;
1495   MY_BITMAP*old_map;
1496 
1497   old_map= dbug_tmp_use_all_columns(field->table,
1498                                     &field->table->read_set);
1499   field->sort_string(*dst, max_len);
1500   dbug_tmp_restore_column_map(&field->table->read_set, old_map);
1501   *dst += max_len;
1502 }
1503 
1504 /*
1505   Compares two keys without unpacking
1506 
1507   @detail
1508   @return
1509     0 - Ok. column_index is the index of the first column which is different.
1510           -1 if two kes are equal
1511     1 - Data format error.
1512 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const1513 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
1514                               const rocksdb::Slice *key2,
1515                               std::size_t *const column_index) const {
1516   DBUG_ASSERT(key1 != nullptr);
1517   DBUG_ASSERT(key2 != nullptr);
1518   DBUG_ASSERT(column_index != nullptr);
1519 
1520   // the caller should check the return value and
1521   // not rely on column_index being valid
1522   *column_index = 0xbadf00d;
1523 
1524   Rdb_string_reader reader1(key1);
1525   Rdb_string_reader reader2(key2);
1526 
1527   // Skip the index number
1528   if ((!reader1.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1529 
1530   if ((!reader2.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1531 
1532   for (uint i = 0; i < m_key_parts; i++) {
1533     const Rdb_field_packing *const fpi = &m_pack_info[i];
1534     if (fpi->m_maybe_null) {
1535       const auto nullp1 = reader1.read(1);
1536       const auto nullp2 = reader2.read(1);
1537 
1538       if (nullp1 == nullptr || nullp2 == nullptr) {
1539         return HA_EXIT_FAILURE;
1540       }
1541 
1542       if (*nullp1 != *nullp2) {
1543         *column_index = i;
1544         return HA_EXIT_SUCCESS;
1545       }
1546 
1547       if (*nullp1 == 0) {
1548         /* This is a NULL value */
1549         continue;
1550       }
1551     }
1552 
1553     const auto before_skip1 = reader1.get_current_ptr();
1554     const auto before_skip2 = reader2.get_current_ptr();
1555     DBUG_ASSERT(fpi->m_skip_func);
1556     if ((fpi->m_skip_func)(fpi, nullptr, &reader1)) {
1557       return HA_EXIT_FAILURE;
1558     }
1559     if ((fpi->m_skip_func)(fpi, nullptr, &reader2)) {
1560       return HA_EXIT_FAILURE;
1561     }
1562     const auto size1 = reader1.get_current_ptr() - before_skip1;
1563     const auto size2 = reader2.get_current_ptr() - before_skip2;
1564     if (size1 != size2) {
1565       *column_index = i;
1566       return HA_EXIT_SUCCESS;
1567     }
1568 
1569     if (memcmp(before_skip1, before_skip2, size1) != 0) {
1570       *column_index = i;
1571       return HA_EXIT_SUCCESS;
1572     }
1573   }
1574 
1575   *column_index = m_key_parts;
1576   return HA_EXIT_SUCCESS;
1577 }
1578 
1579 /*
1580   @brief
1581     Given a zero-padded key, determine its real key length
1582 
1583   @detail
1584     Fixed-size skip functions just read.
1585 */
1586 
key_length(const TABLE * const table,const rocksdb::Slice & key) const1587 size_t Rdb_key_def::key_length(const TABLE *const table,
1588                                const rocksdb::Slice &key) const {
1589   DBUG_ASSERT(table != nullptr);
1590 
1591   Rdb_string_reader reader(&key);
1592 
1593   if ((!reader.read(INDEX_NUMBER_SIZE))) {
1594     return size_t(-1);
1595   }
1596   for (uint i = 0; i < m_key_parts; i++) {
1597     const Rdb_field_packing *fpi = &m_pack_info[i];
1598     const Field *field = nullptr;
1599     if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY) {
1600       field = fpi->get_field_in_table(table);
1601     }
1602     if ((fpi->m_skip_func)(fpi, field, &reader)) {
1603       return size_t(-1);
1604     }
1605   }
1606   return key.size() - reader.remaining_bytes();
1607 }
1608 
1609 /*
1610   Take mem-comparable form and unpack_info and unpack it to Table->record
1611 
1612   @detail
1613     not all indexes support this
1614 
1615   @return
1616     HA_EXIT_SUCCESS    OK
1617     other              HA_ERR error code
1618 */
1619 
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool verify_row_debug_checksums) const1620 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
1621                                const rocksdb::Slice *const packed_key,
1622                                const rocksdb::Slice *const unpack_info,
1623                                const bool verify_row_debug_checksums) const {
1624   Rdb_string_reader reader(packed_key);
1625   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1626 
1627   // There is no checksuming data after unpack_info for primary keys, because
1628   // the layout there is different. The checksum is verified in
1629   // ha_rocksdb::convert_record_from_storage_format instead.
1630   DBUG_ASSERT_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
1631                   !verify_row_debug_checksums);
1632 
1633   // Skip the index number
1634   if ((!reader.read(INDEX_NUMBER_SIZE))) {
1635     return HA_ERR_ROCKSDB_CORRUPT_DATA;
1636   }
1637 
1638   // For secondary keys, we expect the value field to contain index flags,
1639   // unpack data, and checksum data in that order. One or all can be missing,
1640   // but they cannot be reordered.
1641   if (unp_reader.remaining_bytes()) {
1642     if (m_index_type == INDEX_TYPE_SECONDARY &&
1643         m_total_index_flags_length > 0 &&
1644         !unp_reader.read(m_total_index_flags_length)) {
1645       return HA_ERR_ROCKSDB_CORRUPT_DATA;
1646     }
1647   }
1648 
1649   const char *unpack_header = unp_reader.get_current_ptr();
1650   bool has_unpack_info =
1651       unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
1652   if (has_unpack_info) {
1653     if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
1654       return HA_ERR_ROCKSDB_CORRUPT_DATA;
1655     }
1656   }
1657 
1658   // Read the covered bitmap
1659   MY_BITMAP covered_bitmap;
1660   my_bitmap_map covered_bits;
1661   bool has_covered_bitmap =
1662       has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
1663   if (has_covered_bitmap) {
1664     bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1665     covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1666                                         sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1667                                         RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1668   }
1669 
1670   int err = HA_EXIT_SUCCESS;
1671 
1672 
1673   Rdb_key_field_iterator iter(
1674       this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
1675       has_covered_bitmap ? &covered_bitmap : nullptr, buf);
1676   while (iter.has_next()) {
1677     err = iter.next();
1678     if (err) {
1679       return err;
1680     }
1681   }
1682 
1683   /*
1684     Check checksum values if present
1685   */
1686   const char *ptr;
1687   if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
1688     if (verify_row_debug_checksums) {
1689       uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
1690           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1691       const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
1692           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1693 
1694       const uint32_t computed_key_chksum =
1695           my_checksum(0, packed_key->data(), packed_key->size());
1696       const uint32_t computed_val_chksum =
1697           my_checksum(0, unpack_info->data(),
1698                 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1699 
1700       DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
1701                       stored_key_chksum++;);
1702 
1703       if (stored_key_chksum != computed_key_chksum) {
1704         report_checksum_mismatch(true, packed_key->data(), packed_key->size());
1705         return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1706       }
1707 
1708       if (stored_val_chksum != computed_val_chksum) {
1709         report_checksum_mismatch(false, unpack_info->data(),
1710                                  unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1711         return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1712       }
1713     } else {
1714       /* The checksums are present but we are not checking checksums */
1715     }
1716   }
1717 
1718   if (reader.remaining_bytes()) return HA_ERR_ROCKSDB_CORRUPT_DATA;
1719 
1720   return HA_EXIT_SUCCESS;
1721 }
1722 
table_has_hidden_pk(const TABLE * const table)1723 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
1724   return table->s->primary_key == MAX_INDEXES;
1725 }
1726 
report_checksum_mismatch(const bool is_key,const char * const data,const size_t data_size) const1727 void Rdb_key_def::report_checksum_mismatch(const bool is_key,
1728                                            const char *const data,
1729                                            const size_t data_size) const {
1730   // NO_LINT_DEBUG
1731   sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
1732                   is_key ? "key" : "value", get_index_number());
1733 
1734   const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
1735   // NO_LINT_DEBUG
1736   sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
1737                   (uint64_t)data_size, buf.c_str());
1738 
1739   my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1740 }
1741 
index_format_min_check(const int pk_min,const int sk_min) const1742 bool Rdb_key_def::index_format_min_check(const int pk_min,
1743                                          const int sk_min) const {
1744   switch (m_index_type) {
1745     case INDEX_TYPE_PRIMARY:
1746     case INDEX_TYPE_HIDDEN_PRIMARY:
1747       return (m_kv_format_version >= pk_min);
1748     case INDEX_TYPE_SECONDARY:
1749       return (m_kv_format_version >= sk_min);
1750     default:
1751       DBUG_ASSERT(0);
1752       return false;
1753   }
1754 }
1755 
1756 ///////////////////////////////////////////////////////////////////////////////////////////
1757 // Rdb_field_packing
1758 ///////////////////////////////////////////////////////////////////////////////////////////
1759 
1760 /*
1761   Function of type rdb_index_field_skip_t
1762 */
1763 
skip_max_length(const Rdb_field_packing * const fpi,const Field * const field MY_ATTRIBUTE ((__unused__)),Rdb_string_reader * const reader)1764 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
1765                                  const Field *const field
1766                                      MY_ATTRIBUTE((__unused__)),
1767                                  Rdb_string_reader *const reader) {
1768   if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
1769   return HA_EXIT_SUCCESS;
1770 }
1771 
1772 /*
1773   (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1774   split in the middle of an UTF-8 character. See the implementation of
1775   unpack_binary_or_utf8_varchar.
1776 */
1777 #define RDB_ESCAPE_LENGTH 9
1778 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
1779 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1780               "RDB_ESCAPE_LENGTH-1 must be even.");
1781 
1782 #define RDB_ENCODED_SIZE(len)                                   \
1783   ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
1784       RDB_ESCAPE_LENGTH
1785 
1786 #define RDB_LEGACY_ENCODED_SIZE(len)                                          \
1787   ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
1788       RDB_LEGACY_ESCAPE_LENGTH
1789 
1790 /*
1791   Function of type rdb_index_field_skip_t
1792 */
1793 
skip_variable_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1794 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
1795                                       const Field *const field,
1796                                       Rdb_string_reader *const reader) {
1797   const uchar *ptr;
1798   bool finished = false;
1799 
1800   size_t dst_len; /* How much data can be there */
1801   if (field) {
1802     const Field_varstring *const field_var =
1803         static_cast<const Field_varstring *>(field);
1804     dst_len = field_var->pack_length() - field_var->length_bytes;
1805   } else {
1806     dst_len = UINT_MAX;
1807   }
1808 
1809   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
1810 
1811   /* Decode the length-emitted encoding here */
1812   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1813     uint used_bytes;
1814 
1815     /* See pack_with_varchar_encoding. */
1816     if (use_legacy_format) {
1817       used_bytes = calc_unpack_legacy_variable_format(
1818           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1819     } else {
1820       used_bytes =
1821           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1822     }
1823 
1824     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
1825       return HA_EXIT_FAILURE;  // Corruption in the data
1826     }
1827 
1828     if (finished) {
1829       break;
1830     }
1831 
1832     dst_len -= used_bytes;
1833   }
1834 
1835   if (!finished) {
1836     return HA_EXIT_FAILURE;
1837   }
1838 
1839   return HA_EXIT_SUCCESS;
1840 }
1841 
1842 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1843 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1844 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1845 
1846 /*
1847   Skip a keypart that uses Variable-Length Space-Padded encoding
1848 */
1849 
skip_variable_space_pad(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1850 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
1851                                          const Field *const field,
1852                                          Rdb_string_reader *const reader) {
1853   const uchar *ptr;
1854   bool finished = false;
1855 
1856   size_t dst_len = UINT_MAX; /* How much data can be there */
1857 
1858   if (field) {
1859     const Field_varstring *const field_var =
1860         static_cast<const Field_varstring *>(field);
1861     dst_len = field_var->pack_length() - field_var->length_bytes;
1862   }
1863 
1864   /* Decode the length-emitted encoding here */
1865   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1866     // See pack_with_varchar_space_pad
1867     const uchar c = ptr[fpi->m_segment_size - 1];
1868     if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1869       // This is the last segment
1870       finished = true;
1871       break;
1872     } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1873                c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1874       // This is not the last segment
1875       if ((fpi->m_segment_size - 1) > dst_len) {
1876         // The segment is full of data but the table field can't hold that
1877         // much! This must be data corruption.
1878         return HA_EXIT_FAILURE;
1879       }
1880       dst_len -= (fpi->m_segment_size - 1);
1881     } else {
1882       // Encountered a value that's none of the VARCHAR_CMP* constants
1883       // It's data corruption.
1884       return HA_EXIT_FAILURE;
1885     }
1886   }
1887   return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1888 }
1889 
1890 /*
1891   Function of type rdb_index_field_unpack_t
1892 */
1893 
unpack_integer(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))1894 int Rdb_key_def::unpack_integer(
1895     Rdb_field_packing *const fpi, Field *const field, uchar *const to,
1896     Rdb_string_reader *const reader,
1897     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
1898   const int length = fpi->m_max_image_len;
1899 
1900   const uchar *from;
1901   if (!(from = (const uchar *)reader->read(length))) {
1902     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1903   }
1904 
1905 #ifdef WORDS_BIGENDIAN
1906   {
1907     if (static_cast<Field_num *>(field)->unsigned_flag) {
1908       to[0] = from[0];
1909     } else {
1910       to[0] = static_cast<char>(from[0] ^ 128);  // Reverse the sign bit.
1911     }
1912     memcpy(to + 1, from + 1, length - 1);
1913   }
1914 #else
1915   {
1916     const int sign_byte = from[0];
1917     if (static_cast<Field_num *>(field)->unsigned_flag) {
1918       to[length - 1] = sign_byte;
1919     } else {
1920       to[length - 1] =
1921           static_cast<char>(sign_byte ^ 128);  // Reverse the sign bit.
1922     }
1923     for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
1924   }
1925 #endif
1926   return UNPACK_SUCCESS;
1927 }
1928 
1929 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)1930 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1931 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1932   // A few systems store the most-significant _word_ first on little-endian
1933   dst[0] = src[3];
1934   dst[1] = src[2];
1935   dst[2] = src[1];
1936   dst[3] = src[0];
1937   dst[4] = src[7];
1938   dst[5] = src[6];
1939   dst[6] = src[5];
1940   dst[7] = src[4];
1941 #else
1942   dst[0] = src[7];
1943   dst[1] = src[6];
1944   dst[2] = src[5];
1945   dst[3] = src[4];
1946   dst[4] = src[3];
1947   dst[5] = src[2];
1948   dst[6] = src[1];
1949   dst[7] = src[0];
1950 #endif
1951 }
1952 
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)1953 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1954   dst[0] = src[3];
1955   dst[1] = src[2];
1956   dst[2] = src[1];
1957   dst[3] = src[0];
1958 }
1959 #else
1960 #define rdb_swap_double_bytes nullptr
1961 #define rdb_swap_float_bytes nullptr
1962 #endif
1963 
unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t size,const int exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))1964 int Rdb_key_def::unpack_floating_point(
1965     uchar *const dst, Rdb_string_reader *const reader, const size_t size,
1966     const int exp_digit, const uchar *const zero_pattern,
1967     const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
1968   const uchar *const from = (const uchar *)reader->read(size);
1969   if (from == nullptr) {
1970     /* Mem-comparable image doesn't have enough bytes */
1971     return UNPACK_FAILURE;
1972   }
1973 
1974   /* Check to see if the value is zero */
1975   if (memcmp(from, zero_pattern, size) == 0) {
1976     memcpy(dst, zero_val, size);
1977     return UNPACK_SUCCESS;
1978   }
1979 
1980 #if defined(WORDS_BIGENDIAN)
1981   // On big-endian, output can go directly into result
1982   uchar *const tmp = dst;
1983 #else
1984   // Otherwise use a temporary buffer to make byte-swapping easier later
1985   uchar tmp[8];
1986 #endif
1987 
1988   memcpy(tmp, from, size);
1989 
1990   if (tmp[0] & 0x80) {
1991     // If the high bit is set the original value was positive so
1992     // remove the high bit and subtract one from the exponent.
1993     ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1994     exp_part &= 0x7FFF;                             // clear high bit;
1995     exp_part -= (ushort)1 << (16 - 1 - exp_digit);  // subtract from exponent
1996     tmp[0] = (uchar)(exp_part >> 8);
1997     tmp[1] = (uchar)exp_part;
1998   } else {
1999     // Otherwise the original value was negative and all bytes have been
2000     // negated.
2001     for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
2002   }
2003 
2004 #if !defined(WORDS_BIGENDIAN)
2005   // On little-endian, swap the bytes around
2006   swap_func(dst, tmp);
2007 #else
2008   DBUG_ASSERT(swap_func == nullptr);
2009 #endif
2010 
2011   return UNPACK_SUCCESS;
2012 }
2013 
2014 #if !defined(DBL_EXP_DIG)
2015 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
2016 #endif
2017 
2018 /*
2019   Function of type rdb_index_field_unpack_t
2020 
2021   Unpack a double by doing the reverse action of change_double_for_sort
2022   (sql/filesort.cc).  Note that this only works on IEEE values.
2023   Note also that this code assumes that NaN and +/-Infinity are never
2024   allowed in the database.
2025 */
unpack_double(Rdb_field_packing * const fpi MY_ATTRIBUTE ((__unused__)),Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2026 int Rdb_key_def::unpack_double(
2027     Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
2028     Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr,
2029     Rdb_string_reader *const reader,
2030     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2031   static double zero_val = 0.0;
2032   static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
2033 
2034   return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
2035                                zero_pattern, (const uchar *)&zero_val,
2036                                rdb_swap_double_bytes);
2037 }
2038 
2039 #if !defined(FLT_EXP_DIG)
2040 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
2041 #endif
2042 
2043 /*
2044   Function of type rdb_index_field_unpack_t
2045 
2046   Unpack a float by doing the reverse action of Field_float::make_sort_key
2047   (sql/field.cc).  Note that this only works on IEEE values.
2048   Note also that this code assumes that NaN and +/-Infinity are never
2049   allowed in the database.
2050 */
unpack_float(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2051 int Rdb_key_def::unpack_float(
2052     Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2053     uchar *const field_ptr, Rdb_string_reader *const reader,
2054     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2055   static float zero_val = 0.0;
2056   static const uchar zero_pattern[4] = {128, 0, 0, 0};
2057 
2058   return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
2059                                zero_pattern, (const uchar *)&zero_val,
2060                                rdb_swap_float_bytes);
2061 }
2062 
2063 /*
2064   Function of type rdb_index_field_unpack_t used to
2065   Unpack by doing the reverse action to Field_newdate::make_sort_key.
2066 */
2067 
unpack_newdate(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2068 int Rdb_key_def::unpack_newdate(
2069     Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2070     uchar *const field_ptr, Rdb_string_reader *const reader,
2071     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2072   const char *from;
2073   DBUG_ASSERT(fpi->m_max_image_len == 3);
2074 
2075   if (!(from = reader->read(3))) {
2076     /* Mem-comparable image doesn't have enough bytes */
2077     return UNPACK_FAILURE;
2078   }
2079 
2080   field_ptr[0] = from[2];
2081   field_ptr[1] = from[1];
2082   field_ptr[2] = from[0];
2083   return UNPACK_SUCCESS;
2084 }
2085 
2086 /*
2087   Function of type rdb_index_field_unpack_t, used to
2088   Unpack the string by copying it over.
2089   This is for BINARY(n) where the value occupies the whole length.
2090 */
2091 
unpack_binary_str(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2092 int Rdb_key_def::unpack_binary_str(
2093     Rdb_field_packing *const fpi, Field *const field, uchar *const to,
2094     Rdb_string_reader *const reader,
2095     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2096   const char *from;
2097   if (!(from = reader->read(fpi->m_max_image_len))) {
2098     /* Mem-comparable image doesn't have enough bytes */
2099     return UNPACK_FAILURE;
2100   }
2101 
2102   memcpy(to, from, fpi->m_max_image_len);
2103   return UNPACK_SUCCESS;
2104 }
2105 
2106 /*
2107   Function of type rdb_index_field_unpack_t.
2108   For UTF-8, we need to convert 2-byte wide-character entities back into
2109   UTF8 sequences.
2110 */
2111 
unpack_utf8_str(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2112 int Rdb_key_def::unpack_utf8_str(
2113     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2114     Rdb_string_reader *const reader,
2115     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2116   my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
2117   const uchar *src;
2118   if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
2119     /* Mem-comparable image doesn't have enough bytes */
2120     return UNPACK_FAILURE;
2121   }
2122 
2123   const uchar *const src_end = src + fpi->m_max_image_len;
2124   uchar *const dst_end = dst + field->pack_length();
2125 
2126   while (src < src_end) {
2127     my_wc_t wc = (src[0] << 8) | src[1];
2128     src += 2;
2129     int res = cset->wc_mb(wc, dst, dst_end);
2130     DBUG_ASSERT(res > 0 && res <= 3);
2131     if (res < 0) return UNPACK_FAILURE;
2132     dst += res;
2133   }
2134 
2135   cset->fill(reinterpret_cast<char *>(dst), dst_end - dst,
2136              cset->pad_char);
2137   return UNPACK_SUCCESS;
2138 }
2139 
2140 /*
2141   This is the original algorithm to encode a variable binary field.  It
2142   sets a flag byte every Nth byte.  The flag value is (255 - #pad) where
2143   #pad is the number of padding bytes that were needed (0 if all N-1
2144   bytes were used).
2145 
2146   If N=8 and the field is:
2147   * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2148   * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2149   And the 4 byte string compares as greater than the 3 byte string
2150 
2151   Unfortunately the algorithm has a flaw.  If the input is exactly a
2152   multiple of N-1, an extra N bytes are written.  Since we usually use
2153   N=9, an 8 byte input will generate 18 bytes of output instead of the
2154   9 bytes of output that is optimal.
2155 
2156   See pack_variable_format for the newer algorithm.
2157 */
pack_legacy_variable_format(const uchar * src,size_t src_len,uchar ** dst)2158 void Rdb_key_def::pack_legacy_variable_format(
2159     const uchar *src,  // The data to encode
2160     size_t src_len,    // The length of the data to encode
2161     uchar **dst)       // The location to encode the data
2162 {
2163   size_t copy_len;
2164   size_t padding_bytes;
2165   uchar *ptr = *dst;
2166 
2167   do {
2168     copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2169     padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2170     memcpy(ptr, src, copy_len);
2171     ptr += copy_len;
2172     src += copy_len;
2173     // pad with zeros if necessary
2174     if (padding_bytes > 0) {
2175       memset(ptr, 0, padding_bytes);
2176       ptr += padding_bytes;
2177     }
2178 
2179     *(ptr++) = 255 - padding_bytes;
2180 
2181     src_len -= copy_len;
2182   } while (padding_bytes == 0);
2183 
2184   *dst = ptr;
2185 }
2186 
2187 /*
2188   This is the new algorithm.  Similarly to the legacy format the input
2189   is split up into N-1 bytes and a flag byte is used as the Nth byte
2190   in the output.
2191 
2192   - If the previous segment needed any padding the flag is set to the
2193     number of bytes used (0..N-2).  0 is possible in the first segment
2194     if the input is 0 bytes long.
2195   - If no padding was used and there is no more data left in the input
2196     the flag is set to N-1
2197   - If no padding was used and there is still data left in the input the
2198     flag is set to N.
2199 
2200   For N=9, the following input values encode to the specified
2201   outout (where 'X' indicates a byte of the original input):
2202   - 0 bytes  is encoded as 0 0 0 0 0 0 0 0 0
2203   - 1 byte   is encoded as X 0 0 0 0 0 0 0 1
2204   - 2 bytes  is encoded as X X 0 0 0 0 0 0 2
2205   - 7 bytes  is encoded as X X X X X X X 0 7
2206   - 8 bytes  is encoded as X X X X X X X X 8
2207   - 9 bytes  is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2208   - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2209 */
pack_variable_format(const uchar * src,size_t src_len,uchar ** dst)2210 void Rdb_key_def::pack_variable_format(
2211     const uchar *src,  // The data to encode
2212     size_t src_len,    // The length of the data to encode
2213     uchar **dst)       // The location to encode the data
2214 {
2215   uchar *ptr = *dst;
2216 
2217   for (;;) {
2218     // Figure out how many bytes to copy, copy them and adjust pointers
2219     const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2220     memcpy(ptr, src, copy_len);
2221     ptr += copy_len;
2222     src += copy_len;
2223     src_len -= copy_len;
2224 
2225     // Are we at the end of the input?
2226     if (src_len == 0) {
2227       // pad with zeros if necessary;
2228       const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2229       if (padding_bytes > 0) {
2230         memset(ptr, 0, padding_bytes);
2231         ptr += padding_bytes;
2232       }
2233 
2234       // Put the flag byte (0 - N-1) in the output
2235       *(ptr++) = (uchar)copy_len;
2236       break;
2237     }
2238 
2239     // We have more data - put the flag byte (N) in and continue
2240     *(ptr++) = RDB_ESCAPE_LENGTH;
2241   }
2242 
2243   *dst = ptr;
2244 }
2245 
2246 /*
2247   Function of type rdb_index_field_pack_t
2248 */
2249 
pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))2250 void Rdb_key_def::pack_with_varchar_encoding(
2251     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2252     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
2253   const CHARSET_INFO *const charset = field->charset();
2254   Field_varstring *const field_var = (Field_varstring *)field;
2255 
2256   const size_t value_length = (field_var->length_bytes == 1)
2257                                   ? (uint)*field->ptr
2258                                   : uint2korr(field->ptr);
2259   size_t xfrm_len = charset->strnxfrm(
2260       buf, fpi->m_max_image_len, field_var->char_length(),
2261       field_var->ptr + field_var->length_bytes, value_length, 0);
2262 
2263   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2264   if (fpi->m_use_legacy_varbinary_format) {
2265     pack_legacy_variable_format(buf, xfrm_len, dst);
2266   } else {
2267     pack_variable_format(buf, xfrm_len, dst);
2268   }
2269 }
2270 
2271 /*
2272   Compare the string in [buf..buf_end) with a string that is an infinite
2273   sequence of strings in space_xfrm
2274 */
2275 
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)2276 static int rdb_compare_string_with_spaces(
2277     const uchar *buf, const uchar *const buf_end,
2278     const std::vector<uchar> *const space_xfrm) {
2279   int cmp = 0;
2280   while (buf < buf_end) {
2281     size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2282     if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
2283     buf += bytes;
2284   }
2285   return cmp;
2286 }
2287 
2288 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2289 /*
2290   Pack the data with Variable-Length Space-Padded Encoding.
2291 
2292   The encoding is there to meet two goals:
2293 
2294   Goal#1. Comparison. The SQL standard says
2295 
2296     " If the collation for the comparison has the PAD SPACE characteristic,
2297     for the purposes of the comparison, the shorter value is effectively
2298     extended to the length of the longer by concatenation of <space>s on the
2299     right.
2300 
2301   At the moment, all MySQL collations except one have the PAD SPACE
2302   characteristic.  The exception is the "binary" collation that is used by
2303   [VAR]BINARY columns. (Note that binary collations for specific charsets,
2304   like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2305   the PAD SPACE characteristic).
2306 
2307   Goal#2 is to preserve the number of trailing spaces in the original value.
2308 
2309   This is achieved by using the following encoding:
2310   The key part:
2311   - Stores mem-comparable image of the column
2312   - It is stored in chunks of fpi->m_segment_size bytes (*)
2313     = If the remainder of the chunk is not occupied, it is padded with mem-
2314       comparable image of the space character (cs->pad_char to be precise).
2315   - The last byte of the chunk shows how the rest of column's mem-comparable
2316     image would compare to mem-comparable image of the column extended with
2317     spaces. There are three possible values.
2318      - VARCHAR_CMP_LESS_THAN_SPACES,
2319      - VARCHAR_CMP_EQUAL_TO_SPACES
2320      - VARCHAR_CMP_GREATER_THAN_SPACES
2321 
2322   VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2323   is spaces, or something that sorts as spaces, so there is no reason to store
2324   it).
2325 
2326   Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2327 
2328    'abcd\0'   => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0    ' <VARCHAR_CMP_EQUAL> ]
2329    'abcd'     => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2330    'abcd   '  => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2331    'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2332 
2333   As mentioned above, the last chunk is padded with mem-comparable images of
2334   cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2335 
2336   fpi->m_segment_size depends on the used collation. It is chosen to be such
2337   that no mem-comparable image of space will ever stretch across the segments
2338   (see get_segment_size_from_collation).
2339 
2340   == The value part (aka unpack_info) ==
2341   The value part stores the number of space characters that one needs to add
2342   when unpacking the string.
2343   - If the number is positive, it means add this many spaces at the end
2344   - If the number is negative, it means padding has added extra spaces which
2345     must be removed.
2346 
2347   Storage considerations
2348   - depending on column's max size, the number may occupy 1 or 2 bytes
2349   - the number of spaces that need to be removed is not more than
2350     RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2351     then store it as unsigned.
2352 
2353   @seealso
2354     unpack_binary_or_utf8_varchar_space_pad
2355     unpack_simple_varchar_space_pad
2356     dummy_make_unpack_info
2357     skip_variable_space_pad
2358 */
2359 
pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)2360 void Rdb_key_def::pack_with_varchar_space_pad(
2361     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2362     Rdb_pack_field_context *const pack_ctx) {
2363   Rdb_string_writer *const unpack_info = pack_ctx->writer;
2364   const CHARSET_INFO *const charset = field->charset();
2365   const auto field_var = static_cast<Field_varstring *>(field);
2366 
2367   const size_t value_length = (field_var->length_bytes == 1)
2368                                   ? (uint)*field->ptr
2369                                   : uint2korr(field->ptr);
2370 
2371   const size_t trimmed_len = charset->lengthsp(
2372       (const char *)field_var->ptr + field_var->length_bytes,
2373       value_length);
2374   const size_t xfrm_len = charset->strnxfrm(
2375       buf, fpi->m_max_image_len, field_var->char_length(),
2376       field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2377 
2378   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2379   uchar *const buf_end = buf + xfrm_len;
2380 
2381   size_t encoded_size = 0;
2382   uchar *ptr = *dst;
2383   size_t padding_bytes;
2384   while (true) {
2385     const size_t copy_len =
2386         std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2387     padding_bytes = fpi->m_segment_size - 1 - copy_len;
2388     memcpy(ptr, buf, copy_len);
2389     ptr += copy_len;
2390     buf += copy_len;
2391 
2392     if (padding_bytes) {
2393       memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2394       ptr += padding_bytes;
2395       *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;  // last segment
2396     } else {
2397       // Compare the string suffix with a hypothetical infinite string of
2398       // spaces. It could be that the first difference is beyond the end of
2399       // current chunk.
2400       const int cmp =
2401           rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2402 
2403       if (cmp < 0) {
2404         *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2405       } else if (cmp > 0) {
2406         *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2407       } else {
2408         // It turns out all the rest are spaces.
2409         *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2410       }
2411     }
2412     encoded_size += fpi->m_segment_size;
2413 
2414     if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
2415   }
2416 
2417   // m_unpack_info_stores_value means unpack_info stores the whole original
2418   // value. There is no need to store the number of trimmed/padded endspaces
2419   // in that case.
2420   if (unpack_info && !fpi->m_unpack_info_stores_value) {
2421     // (value_length - trimmed_len) is the number of trimmed space *characters*
2422     // then, padding_bytes is the number of *bytes* added as padding
2423     // then, we add 8, because we don't store negative values.
2424     DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
2425     DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
2426     const size_t removed_chars =
2427         RDB_TRIMMED_CHARS_OFFSET +
2428         (value_length - trimmed_len) / fpi->space_mb_len -
2429         padding_bytes / fpi->space_xfrm_len;
2430 
2431     if (fpi->m_unpack_info_uses_two_bytes) {
2432       unpack_info->write_uint16(removed_chars);
2433     } else {
2434       DBUG_ASSERT(removed_chars < 0x100);
2435       unpack_info->write_uint8(removed_chars);
2436     }
2437   }
2438 
2439   *dst += encoded_size;
2440 }
2441 
2442 /*
2443   Calculate the number of used bytes in the chunk and whether this is the
2444   last chunk in the input.  This is based on the old legacy format - see
2445   pack_legacy_variable_format.
2446  */
calc_unpack_legacy_variable_format(uchar flag,bool * done)2447 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
2448   uint pad = 255 - flag;
2449   uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2450   if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2451     return (uint)-1;
2452   }
2453 
2454   *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2455   return used_bytes;
2456 }
2457 
2458 /*
2459   Calculate the number of used bytes in the chunk and whether this is the
2460   last chunk in the input.  This is based on the new format - see
2461   pack_variable_format.
2462  */
calc_unpack_variable_format(uchar flag,bool * done)2463 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
2464   // Check for invalid flag values
2465   if (flag > RDB_ESCAPE_LENGTH) {
2466     return (uint)-1;
2467   }
2468 
2469   // Values from 1 to N-1 indicate this is the last chunk and that is how
2470   // many bytes were used
2471   if (flag < RDB_ESCAPE_LENGTH) {
2472     *done = true;
2473     return flag;
2474   }
2475 
2476   // A value of N means we used N-1 bytes and had more to go
2477   *done = false;
2478   return RDB_ESCAPE_LENGTH - 1;
2479 }
2480 
2481 /*
2482   Unpack data that has charset information.  Each two bytes of the input is
2483   treated as a wide-character and converted to its multibyte equivalent in
2484   the output.
2485  */
unpack_charset(const CHARSET_INFO * cset,const uchar * src,uint src_len,uchar * dst,uint dst_len,uint * used_bytes)2486 static int unpack_charset(
2487     const CHARSET_INFO *cset,  // character set information
2488     const uchar *src,          // source data to unpack
2489     uint src_len,              // length of source data
2490     uchar *dst,                // destination of unpacked data
2491     uint dst_len,              // length of destination data
2492     uint *used_bytes)          // output number of bytes used
2493 {
2494   if (src_len & 1) {
2495     /*
2496       UTF-8 characters are encoded into two-byte entities. There is no way
2497       we can have an odd number of bytes after encoding.
2498     */
2499     return UNPACK_FAILURE;
2500   }
2501 
2502   uchar *dst_end = dst + dst_len;
2503   uint used = 0;
2504 
2505   for (uint ii = 0; ii < src_len; ii += 2) {
2506     my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2507     int res = cset->wc_mb(wc, dst + used, dst_end);
2508     DBUG_ASSERT(res > 0 && res <= 3);
2509     if (res < 0) {
2510       return UNPACK_FAILURE;
2511     }
2512 
2513     used += res;
2514   }
2515 
2516   *used_bytes = used;
2517   return UNPACK_SUCCESS;
2518 }
2519 
2520 /*
2521   Function of type rdb_index_field_unpack_t
2522 */
2523 
unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2524 int Rdb_key_def::unpack_binary_or_utf8_varchar(
2525     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2526     Rdb_string_reader *const reader,
2527     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2528   const uchar *ptr;
2529   size_t len = 0;
2530   bool finished = false;
2531   uchar *d0 = dst;
2532   Field_varstring *const field_var = (Field_varstring *)field;
2533   dst += field_var->length_bytes;
2534   // How much we can unpack
2535   size_t dst_len = field_var->pack_length() - field_var->length_bytes;
2536 
2537   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
2538 
2539   /* Decode the length-emitted encoding here */
2540   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2541     uint used_bytes;
2542 
2543     /* See pack_with_varchar_encoding. */
2544     if (use_legacy_format) {
2545       used_bytes = calc_unpack_legacy_variable_format(
2546           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2547     } else {
2548       used_bytes =
2549           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2550     }
2551 
2552     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2553       return UNPACK_FAILURE;  // Corruption in the data
2554     }
2555 
2556     /*
2557       Now, we need to decode used_bytes of data and append them to the value.
2558     */
2559     if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2560       int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst,
2561                                dst_len, &used_bytes);
2562       if (err != UNPACK_SUCCESS) {
2563         return err;
2564       }
2565     } else {
2566       memcpy(dst, ptr, used_bytes);
2567     }
2568 
2569     dst += used_bytes;
2570     dst_len -= used_bytes;
2571     len += used_bytes;
2572 
2573     if (finished) {
2574       break;
2575     }
2576   }
2577 
2578   if (!finished) {
2579     return UNPACK_FAILURE;
2580   }
2581 
2582   /* Save the length */
2583   if (field_var->length_bytes == 1) {
2584     d0[0] = (uchar)len;
2585   } else {
2586     DBUG_ASSERT(field_var->length_bytes == 2);
2587     int2store(d0, len);
2588   }
2589   return UNPACK_SUCCESS;
2590 }
2591 
2592 /*
2593   @seealso
2594     pack_with_varchar_space_pad - packing function
2595     unpack_simple_varchar_space_pad - unpacking function for 'simple'
2596     charsets.
2597     skip_variable_space_pad - skip function
2598 */
unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2599 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
2600     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2601     Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2602   const uchar *ptr;
2603   size_t len = 0;
2604   bool finished = false;
2605   Field_varstring *const field_var = static_cast<Field_varstring *>(field);
2606   uchar *d0 = dst;
2607   uchar *dst_end = dst + field_var->pack_length();
2608   dst += field_var->length_bytes;
2609 
2610   uint space_padding_bytes = 0;
2611   uint extra_spaces;
2612   if ((fpi->m_unpack_info_uses_two_bytes
2613            ? unp_reader->read_uint16(&extra_spaces)
2614            : unp_reader->read_uint8(&extra_spaces))) {
2615     return UNPACK_FAILURE;
2616   }
2617 
2618   if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
2619     space_padding_bytes =
2620         -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
2621     extra_spaces = 0;
2622   } else {
2623     extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
2624   }
2625 
2626   space_padding_bytes *= fpi->space_xfrm_len;
2627 
2628   /* Decode the length-emitted encoding here */
2629   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2630     const char last_byte = ptr[fpi->m_segment_size - 1];
2631     size_t used_bytes;
2632     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES)  // this is the last segment
2633     {
2634       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2635         return UNPACK_FAILURE;  // Cannot happen, corrupted data
2636       }
2637       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2638       finished = true;
2639     } else {
2640       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2641           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2642         return UNPACK_FAILURE;  // Invalid value
2643       }
2644       used_bytes = fpi->m_segment_size - 1;
2645     }
2646 
2647     // Now, need to decode used_bytes of data and append them to the value.
2648     if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2649       if (used_bytes & 1) {
2650         /*
2651           UTF-8 characters are encoded into two-byte entities. There is no way
2652           we can have an odd number of bytes after encoding.
2653         */
2654         return UNPACK_FAILURE;
2655       }
2656 
2657       const uchar *src = ptr;
2658       const uchar *const src_end = ptr + used_bytes;
2659       while (src < src_end) {
2660         my_wc_t wc = (src[0] << 8) | src[1];
2661         src += 2;
2662         const CHARSET_INFO *cset = fpi->m_varchar_charset;
2663         int res = cset->wc_mb(wc, dst, dst_end);
2664         DBUG_ASSERT(res <= 3);
2665         if (res <= 0) return UNPACK_FAILURE;
2666         dst += res;
2667         len += res;
2668       }
2669     } else {
2670       if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
2671       memcpy(dst, ptr, used_bytes);
2672       dst += used_bytes;
2673       len += used_bytes;
2674     }
2675 
2676     if (finished) {
2677       if (extra_spaces) {
2678         // Both binary and UTF-8 charset store space as ' ',
2679         // so the following is ok:
2680         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2681         memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
2682         len += extra_spaces;
2683       }
2684       break;
2685     }
2686   }
2687 
2688   if (!finished) return UNPACK_FAILURE;
2689 
2690   /* Save the length */
2691   if (field_var->length_bytes == 1) {
2692     d0[0] = (uchar)len;
2693   } else {
2694     DBUG_ASSERT(field_var->length_bytes == 2);
2695     int2store(d0, len);
2696   }
2697   return UNPACK_SUCCESS;
2698 }
2699 
2700 /////////////////////////////////////////////////////////////////////////
2701 
2702 /*
2703   Function of type rdb_make_unpack_info_t
2704 */
2705 
make_unpack_unknown(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2706 void Rdb_key_def::make_unpack_unknown(
2707     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2708     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2709   pack_ctx->writer->write(field->ptr, field->pack_length());
2710 }
2711 
2712 /*
2713   This point of this function is only to indicate that unpack_info is
2714   available.
2715 
2716   The actual unpack_info data is produced by the function that packs the key,
2717   that is, pack_with_varchar_space_pad.
2718 */
2719 
dummy_make_unpack_info(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * field MY_ATTRIBUTE ((__unused__)),Rdb_pack_field_context * pack_ctx MY_ATTRIBUTE ((__unused__)))2720 void Rdb_key_def::dummy_make_unpack_info(
2721     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2722     const Field *field MY_ATTRIBUTE((__unused__)),
2723     Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
2724   // Do nothing
2725 }
2726 
2727 /*
2728   Function of type rdb_index_field_unpack_t
2729 */
2730 
unpack_unknown(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2731 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi,
2732                                 Field *const field, uchar *const dst,
2733                                 Rdb_string_reader *const reader,
2734                                 Rdb_string_reader *const unp_reader) {
2735   const uchar *ptr;
2736   const uint len = fpi->m_unpack_data_len;
2737   // We don't use anything from the key, so skip over it.
2738   if (skip_max_length(fpi, field, reader)) {
2739     return UNPACK_FAILURE;
2740   }
2741 
2742   DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
2743 
2744   if ((ptr = (const uchar *)unp_reader->read(len))) {
2745     memcpy(dst, ptr, len);
2746     return UNPACK_SUCCESS;
2747   }
2748   return UNPACK_FAILURE;
2749 }
2750 
2751 /*
2752   Function of type rdb_make_unpack_info_t
2753 */
2754 
make_unpack_unknown_varchar(const Rdb_collation_codec * const codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2755 void Rdb_key_def::make_unpack_unknown_varchar(
2756     const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
2757     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2758   const auto f = static_cast<const Field_varstring *>(field);
2759   uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2760   len += f->length_bytes;
2761   pack_ctx->writer->write(field->ptr, len);
2762 }
2763 
2764 /*
2765   Function of type rdb_index_field_unpack_t
2766 
2767   @detail
2768   Unpack a key part in an "unknown" collation from its
2769   (mem_comparable_form, unpack_info) form.
2770 
2771   "Unknown" means we have no clue about how mem_comparable_form is made from
2772   the original string, so we keep the whole original string in the unpack_info.
2773 
2774   @seealso
2775     make_unpack_unknown, unpack_unknown
2776 */
2777 
unpack_unknown_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2778 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
2779                                         Field *const field, uchar *dst,
2780                                         Rdb_string_reader *const reader,
2781                                         Rdb_string_reader *const unp_reader) {
2782   const uchar *ptr;
2783   uchar *const d0 = dst;
2784   const auto f = static_cast<Field_varstring *>(field);
2785   dst += f->length_bytes;
2786   const uint len_bytes = f->length_bytes;
2787   // We don't use anything from the key, so skip over it.
2788   if ((fpi->m_skip_func)(fpi, field, reader)) {
2789     return UNPACK_FAILURE;
2790   }
2791 
2792   DBUG_ASSERT(len_bytes > 0);
2793   DBUG_ASSERT(unp_reader != nullptr);
2794 
2795   if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
2796     memcpy(d0, ptr, len_bytes);
2797     const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
2798     if ((ptr = (const uchar *)unp_reader->read(len))) {
2799       memcpy(dst, ptr, len);
2800       return UNPACK_SUCCESS;
2801     }
2802   }
2803   return UNPACK_FAILURE;
2804 }
2805 
2806 /*
2807   Write unpack_data for a "simple" collation
2808 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)2809 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
2810                                     const Rdb_collation_codec *const codec,
2811                                     const uchar *const src,
2812                                     const size_t src_len) {
2813   for (uint i = 0; i < src_len; i++) {
2814     writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
2815   }
2816 }
2817 
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len,uchar * const dst)2818 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
2819                                    const Rdb_collation_codec *const codec,
2820                                    const uchar *const src, const size_t src_len,
2821                                    uchar *const dst) {
2822   for (uint i = 0; i < src_len; i++) {
2823     if (codec->m_dec_size[src[i]] > 0) {
2824       uint *ret;
2825       DBUG_ASSERT(reader != nullptr);
2826 
2827       if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
2828         return UNPACK_FAILURE;
2829       }
2830       dst[i] = codec->m_dec_idx[*ret][src[i]];
2831     } else {
2832       dst[i] = codec->m_dec_idx[0][src[i]];
2833     }
2834   }
2835 
2836   return UNPACK_SUCCESS;
2837 }
2838 
2839 /*
2840   Function of type rdb_make_unpack_info_t
2841 
2842   @detail
2843     Make unpack_data for VARCHAR(n) in a "simple" charset.
2844 */
2845 
make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2846 void Rdb_key_def::make_unpack_simple_varchar(
2847     const Rdb_collation_codec *const codec, const Field *const field,
2848     Rdb_pack_field_context *const pack_ctx) {
2849   const auto f = static_cast<const Field_varstring *>(field);
2850   uchar *const src = f->ptr + f->length_bytes;
2851   const size_t src_len =
2852       f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2853   Rdb_bit_writer bit_writer(pack_ctx->writer);
2854   // The std::min compares characters with bytes, but for simple collations,
2855   // mbmaxlen = 1.
2856   rdb_write_unpack_simple(&bit_writer, codec, src,
2857                           std::min((size_t)f->char_length(), src_len));
2858 }
2859 
2860 /*
2861   Function of type rdb_index_field_unpack_t
2862 
2863   @seealso
2864     pack_with_varchar_space_pad - packing function
2865     unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
2866 */
2867 
unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2868 int Rdb_key_def::unpack_simple_varchar_space_pad(
2869     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2870     Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2871   const uchar *ptr;
2872   size_t len = 0;
2873   bool finished = false;
2874   uchar *d0 = dst;
2875   const Field_varstring *const field_var =
2876       static_cast<Field_varstring *>(field);
2877   // For simple collations, char_length is also number of bytes.
2878   DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
2879   uchar *dst_end = dst + field_var->pack_length();
2880   dst += field_var->length_bytes;
2881   Rdb_bit_reader bit_reader(unp_reader);
2882 
2883   uint space_padding_bytes = 0;
2884   uint extra_spaces;
2885   DBUG_ASSERT(unp_reader != nullptr);
2886 
2887   if ((fpi->m_unpack_info_uses_two_bytes
2888            ? unp_reader->read_uint16(&extra_spaces)
2889            : unp_reader->read_uint8(&extra_spaces))) {
2890     return UNPACK_FAILURE;
2891   }
2892 
2893   if (extra_spaces <= 8) {
2894     space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2895     extra_spaces = 0;
2896   } else {
2897     extra_spaces -= 8;
2898   }
2899 
2900   space_padding_bytes *= fpi->space_xfrm_len;
2901 
2902   /* Decode the length-emitted encoding here */
2903   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2904     const char last_byte =
2905         ptr[fpi->m_segment_size - 1];  // number of padding bytes
2906     size_t used_bytes;
2907     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2908       // this is the last one
2909       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2910         return UNPACK_FAILURE;  // Cannot happen, corrupted data
2911       }
2912       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2913       finished = true;
2914     } else {
2915       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2916           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2917         return UNPACK_FAILURE;
2918       }
2919       used_bytes = fpi->m_segment_size - 1;
2920     }
2921 
2922     if (dst + used_bytes > dst_end) {
2923       // The value on disk is longer than the field definition allows?
2924       return UNPACK_FAILURE;
2925     }
2926 
2927     uint ret;
2928     if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2929                                       used_bytes, dst)) != UNPACK_SUCCESS) {
2930       return ret;
2931     }
2932 
2933     dst += used_bytes;
2934     len += used_bytes;
2935 
2936     if (finished) {
2937       if (extra_spaces) {
2938         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2939         // pad_char has a 1-byte form in all charsets that
2940         // are handled by rdb_init_collation_mapping.
2941         memset(dst, field_var->charset()->pad_char, extra_spaces);
2942         len += extra_spaces;
2943       }
2944       break;
2945     }
2946   }
2947 
2948   if (!finished) return UNPACK_FAILURE;
2949 
2950   /* Save the length */
2951   if (field_var->length_bytes == 1) {
2952     d0[0] = (uchar)len;
2953   } else {
2954     DBUG_ASSERT(field_var->length_bytes == 2);
2955     int2store(d0, len);
2956   }
2957   return UNPACK_SUCCESS;
2958 }
2959 
2960 /*
2961   Function of type rdb_make_unpack_info_t
2962 
2963   @detail
2964     Make unpack_data for CHAR(n) value in a "simple" charset.
2965     It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2966 
2967   @seealso
2968     The VARCHAR variant is in make_unpack_simple_varchar
2969 */
2970 
make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2971 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
2972                                      const Field *const field,
2973                                      Rdb_pack_field_context *const pack_ctx) {
2974   const uchar *const src = field->ptr;
2975   Rdb_bit_writer bit_writer(pack_ctx->writer);
2976   rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2977 }
2978 
2979 /*
2980   Function of type rdb_index_field_unpack_t
2981 */
2982 
unpack_simple(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2983 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi,
2984                                Field *const field MY_ATTRIBUTE((__unused__)),
2985                                uchar *const dst,
2986                                Rdb_string_reader *const reader,
2987                                Rdb_string_reader *const unp_reader) {
2988   const uchar *ptr;
2989   const uint len = fpi->m_max_image_len;
2990   Rdb_bit_reader bit_reader(unp_reader);
2991 
2992   if (!(ptr = (const uchar *)reader->read(len))) {
2993     return UNPACK_FAILURE;
2994   }
2995 
2996   return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2997                                 fpi->m_charset_codec, ptr, len, dst);
2998 }
2999 
3000 // See Rdb_charset_space_info::spaces_xfrm
3001 const int RDB_SPACE_XFRM_SIZE = 32;
3002 
3003 // A class holding information about how space character is represented in a
3004 // charset.
3005 class Rdb_charset_space_info {
3006  public:
3007   Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
3008   Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
3009   Rdb_charset_space_info() = default;
3010 
3011   // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
3012   std::vector<uchar> spaces_xfrm;
3013 
3014   // length(strxfrm(' '))
3015   size_t space_xfrm_len;
3016 
3017   // length of the space character itself
3018   // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
3019   // (length=2)
3020   size_t space_mb_len;
3021 };
3022 
3023 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
3024     rdb_mem_comparable_space;
3025 
3026 /*
3027   @brief
3028   For a given charset, get
3029    - strxfrm('    '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
3030    - length of strxfrm(charset, ' ')
3031    - length of the space character in the charset
3032 
3033   @param cs  IN    Charset to get the space for
3034   @param ptr OUT   A few space characters
3035   @param len OUT   Return length of the space (in bytes)
3036 
3037   @detail
3038     It is tempting to pre-generate mem-comparable form of space character for
3039     every charset on server startup.
3040     One can't do that: some charsets are not initialized until somebody
3041     attempts to use them (e.g. create or open a table that has a field that
3042     uses the charset).
3043 */
3044 
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)3045 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
3046                                          const std::vector<uchar> **xfrm,
3047                                          size_t *const xfrm_len,
3048                                          size_t *const mb_len) {
3049   DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
3050   if (!rdb_mem_comparable_space[cs->number].get()) {
3051     RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
3052     if (!rdb_mem_comparable_space[cs->number].get()) {
3053       // Upper bound of how many bytes can be occupied by multi-byte form of a
3054       // character in any charset.
3055       const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
3056       DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
3057 
3058       // multi-byte form of the ' ' (space) character
3059       uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
3060 
3061       const size_t space_mb_len = cs->wc_mb(
3062           (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
3063 
3064       // mem-comparable image of the space character
3065       std::array<uchar, 20> space;
3066 
3067       const size_t space_len = cs->strnxfrm(
3068           space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
3069       Rdb_charset_space_info *const info = new Rdb_charset_space_info;
3070       info->space_xfrm_len = space_len;
3071       info->space_mb_len = space_mb_len;
3072       while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
3073         info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
3074                                  space.data() + space_len);
3075       }
3076       rdb_mem_comparable_space[cs->number].reset(info);
3077     }
3078     RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
3079   }
3080 
3081   *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
3082   *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
3083   *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
3084 }
3085 
3086 mysql_mutex_t rdb_mem_cmp_space_mutex;
3087 
3088 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
3089     rdb_collation_data;
3090 mysql_mutex_t rdb_collation_data_mutex;
3091 
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)3092 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
3093   return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 &&
3094          !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD));
3095 }
3096 
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)3097 static const Rdb_collation_codec *rdb_init_collation_mapping(
3098     const my_core::CHARSET_INFO *const cs) {
3099   DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
3100   const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
3101 
3102   if (codec == nullptr && rdb_is_collation_supported(cs)) {
3103     RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
3104 
3105     codec = rdb_collation_data[cs->number];
3106     if (codec == nullptr) {
3107       Rdb_collation_codec *cur = nullptr;
3108 
3109       // Compute reverse mapping for simple collations.
3110       if (rdb_is_collation_supported(cs)) {
3111         cur = new Rdb_collation_codec;
3112         std::map<uchar, std::vector<uchar>> rev_map;
3113         size_t max_conflict_size = 0;
3114         for (int src = 0; src < 256; src++) {
3115           uchar dst = cs->sort_order[src];
3116           rev_map[dst].push_back(src);
3117           max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
3118         }
3119         cur->m_dec_idx.resize(max_conflict_size);
3120 
3121         for (auto const &p : rev_map) {
3122           uchar dst = p.first;
3123           for (uint idx = 0; idx < p.second.size(); idx++) {
3124             uchar src = p.second[idx];
3125             uchar bits =
3126                 my_bit_log2_uint32(my_round_up_to_next_power(p.second.size()));
3127             cur->m_enc_idx[src] = idx;
3128             cur->m_enc_size[src] = bits;
3129             cur->m_dec_size[dst] = bits;
3130             cur->m_dec_idx[idx][dst] = src;
3131           }
3132         }
3133 
3134         cur->m_make_unpack_info_func = {Rdb_key_def::make_unpack_simple_varchar,
3135                                         Rdb_key_def::make_unpack_simple};
3136         cur->m_unpack_func = {Rdb_key_def::unpack_simple_varchar_space_pad,
3137                               Rdb_key_def::unpack_simple};
3138       } else {
3139         // Out of luck for now.
3140       }
3141 
3142       if (cur != nullptr) {
3143         codec = cur;
3144         cur->m_cs = cs;
3145         rdb_collation_data[cs->number] = cur;
3146       }
3147     }
3148 
3149     RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3150   }
3151 
3152   return codec;
3153 }
3154 
get_segment_size_from_collation(const CHARSET_INFO * const cs)3155 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3156   int ret;
3157   if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN ||
3158       cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) {
3159     /*
3160       In these collations, a character produces one weight, which is 3 bytes.
3161       Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3162       get 3*3+1=10
3163     */
3164     ret = 10;
3165   } else {
3166     /*
3167       All other collations. There are two classes:
3168       - Unicode-based, except for collations mentioned in the if-condition.
3169         For these all weights are 2 bytes long, a character may produce 0..8
3170         weights.
3171         in any case, 8 bytes of payload in the segment guarantee that the last
3172         space character won't span across segments.
3173 
3174       - Collations not based on unicode. These have length(strxfrm(' '))=1,
3175         there nothing to worry about.
3176 
3177       In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3178     */
3179     ret = 9;
3180   }
3181   DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
3182   return ret;
3183 }
3184 
3185 /*
3186   @brief
3187     Setup packing of index field into its mem-comparable form
3188 
3189   @detail
3190     - It is possible produce mem-comparable form for any datatype.
3191     - Some datatypes also allow to unpack the original value from its
3192       mem-comparable form.
3193       = Some of these require extra information to be stored in "unpack_info".
3194         unpack_info is not a part of mem-comparable form, it is only used to
3195         restore the original value
3196 
3197   @param
3198     field  IN  field to be packed/un-packed
3199 
3200   @return
3201     TRUE  -  Field can be read with index-only reads
3202     FALSE -  Otherwise
3203 */
3204 
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint keynr_arg,const uint key_part_arg,const uint16 key_length)3205 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3206                               const Field *const field, const uint keynr_arg,
3207                               const uint key_part_arg,
3208                               const uint16 key_length) {
3209   int res = false;
3210   enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3211 
3212   m_keynr = keynr_arg;
3213   m_key_part = key_part_arg;
3214 
3215   m_maybe_null = field ? field->real_maybe_null() : false;
3216   m_unpack_func = nullptr;
3217   m_make_unpack_info_func = nullptr;
3218   m_unpack_data_len = 0;
3219   space_xfrm = nullptr;  // safety
3220   // whether to use legacy format for varchar
3221   m_use_legacy_varbinary_format = false;
3222   // ha_rocksdb::index_flags() will pass key_descr == null to
3223   // see whether field(column) can be read-only reads through return value,
3224   // but the legacy vs. new varchar format doesn't affect return value.
3225   // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
3226   if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3227     m_use_legacy_varbinary_format = true;
3228   }
3229   /* Calculate image length. By default, is is pack_length() */
3230   m_max_image_len =
3231       field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3232   m_skip_func = Rdb_key_def::skip_max_length;
3233   m_pack_func = Rdb_key_def::pack_with_make_sort_key;
3234 
3235   m_covered = false;
3236 
3237   switch (type) {
3238     case MYSQL_TYPE_LONGLONG:
3239     case MYSQL_TYPE_LONG:
3240     case MYSQL_TYPE_INT24:
3241     case MYSQL_TYPE_SHORT:
3242     case MYSQL_TYPE_TINY:
3243       m_unpack_func = Rdb_key_def::unpack_integer;
3244       m_covered = true;
3245       return true;
3246 
3247     case MYSQL_TYPE_DOUBLE:
3248       m_unpack_func = Rdb_key_def::unpack_double;
3249       m_covered = true;
3250       return true;
3251 
3252     case MYSQL_TYPE_FLOAT:
3253       m_unpack_func = Rdb_key_def::unpack_float;
3254       m_covered = true;
3255       return true;
3256 
3257     case MYSQL_TYPE_NEWDECIMAL:
3258     /*
3259       Decimal is packed with Field_new_decimal::make_sort_key, which just
3260       does memcpy.
3261       Unpacking decimal values was supported only after fix for issue#253,
3262       because of that ha_rocksdb::get_storage_type() handles decimal values
3263       in a special way.
3264     */
3265     case MYSQL_TYPE_DATETIME2:
3266     case MYSQL_TYPE_TIMESTAMP2:
3267     /* These are packed with Field_temporal_with_date_and_timef::make_sort_key
3268      */
3269     case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
3270     case MYSQL_TYPE_YEAR:  /* YEAR is packed with  Field_tiny::make_sort_key */
3271       /* Everything that comes here is packed with just a memcpy(). */
3272       m_unpack_func = Rdb_key_def::unpack_binary_str;
3273       m_covered = true;
3274       return true;
3275 
3276     case MYSQL_TYPE_NEWDATE:
3277       /*
3278         This is packed by Field_newdate::make_sort_key. It assumes the data is
3279         3 bytes, and packing is done by swapping the byte order (for both big-
3280         and little-endian)
3281       */
3282       m_unpack_func = Rdb_key_def::unpack_newdate;
3283       m_covered = true;
3284       return true;
3285     case MYSQL_TYPE_TINY_BLOB:
3286     case MYSQL_TYPE_MEDIUM_BLOB:
3287     case MYSQL_TYPE_LONG_BLOB:
3288     case MYSQL_TYPE_BLOB: {
3289       if (key_descr) {
3290         // The my_charset_bin collation is special in that it will consider
3291         // shorter strings sorting as less than longer strings.
3292         //
3293         // See Field_blob::make_sort_key for details.
3294         m_max_image_len =
3295           key_length + (field->charset()->number == COLLATION_BINARY
3296                               ? reinterpret_cast<const Field_blob *>(field)
3297                                     ->pack_length_no_ptr()
3298                               : 0);
3299         // Return false because indexes on text/blob will always require
3300         // a prefix. With a prefix, the optimizer will not be able to do an
3301         // index-only scan since there may be content occuring after the prefix
3302         // length.
3303         return false;
3304       }
3305       break;
3306     }
3307     default:
3308       break;
3309   }
3310 
3311   m_unpack_info_stores_value = false;
3312   /* Handle [VAR](CHAR|BINARY) */
3313 
3314   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3315     /*
3316       For CHAR-based columns, check how strxfrm image will take.
3317       field->field_length = field->char_length() * cs->mbmaxlen.
3318     */
3319     const CHARSET_INFO *cs = field->charset();
3320     m_max_image_len = cs->strnxfrmlen(type == MYSQL_TYPE_STRING ?
3321                                       field->pack_length() :
3322                                       field->field_length);
3323   }
3324   const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3325   const CHARSET_INFO *cs = field->charset();
3326   // max_image_len before chunking is taken into account
3327   const int max_image_len_before_chunks = m_max_image_len;
3328 
3329   if (is_varchar) {
3330     // The default for varchar is variable-length, without space-padding for
3331     // comparisons
3332     m_varchar_charset = cs;
3333     m_skip_func = Rdb_key_def::skip_variable_length;
3334     m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3335     if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3336       m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3337     } else {
3338       // Calculate the maximum size of the short section plus the
3339       // maximum size of the long section
3340       m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3341     }
3342 
3343     const auto field_var = static_cast<const Field_varstring *>(field);
3344     m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3345   }
3346 
3347   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3348     // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3349     // information about character-based datatypes are compared.
3350     bool use_unknown_collation = false;
3351     DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3352                     use_unknown_collation = true;);
3353 
3354     if (cs->number == COLLATION_BINARY) {
3355       // - SQL layer pads BINARY(N) so that it always is N bytes long.
3356       // - For VARBINARY(N), values may have different lengths, so we're using
3357       //   variable-length encoding. This is also the only charset where the
3358       //   values are not space-padded for comparison.
3359       m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
3360                                  : Rdb_key_def::unpack_binary_str;
3361       res = true;
3362     } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) {
3363       // For _bin collations, mem-comparable form of the string is the string
3364       // itself.
3365 
3366       if (is_varchar) {
3367         // VARCHARs - are compared as if they were space-padded - but are
3368         // not actually space-padded (reading the value back produces the
3369         // original value, without the padding)
3370         m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3371         m_skip_func = Rdb_key_def::skip_variable_space_pad;
3372         m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3373         m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
3374         m_segment_size = get_segment_size_from_collation(cs);
3375         m_max_image_len =
3376             (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3377             m_segment_size;
3378         rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3379                                      &space_mb_len);
3380       } else {
3381         // SQL layer pads CHAR(N) values to their maximum length.
3382         // We just store that and restore it back.
3383         m_unpack_func = (cs->number == COLLATION_LATIN1_BIN)
3384                             ? Rdb_key_def::unpack_binary_str
3385                             : Rdb_key_def::unpack_utf8_str;
3386       }
3387       res = true;
3388     } else {
3389       // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3390 
3391       res = true;  // index-only scans are possible
3392       m_unpack_data_len = is_varchar ? 0 : field->field_length;
3393       const uint idx = is_varchar ? 0 : 1;
3394       const Rdb_collation_codec *codec = nullptr;
3395 
3396       if (is_varchar) {
3397         // VARCHAR requires space-padding for doing comparisons
3398         //
3399         // The check for cs->levels_for_order is to catch
3400         // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3401         // that Variable-Length Space Padded Encoding can't handle.
3402         // It is not expected to work for any other multi-level collations,
3403         // either.
3404         // Currently we handle these collations as NO_PAD, even if they have
3405         // PAD_SPACE attribute.
3406         if (cs->levels_for_order == 1) {
3407           m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3408           m_skip_func = Rdb_key_def::skip_variable_space_pad;
3409           m_segment_size = get_segment_size_from_collation(cs);
3410           m_max_image_len =
3411               (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3412               m_segment_size;
3413           rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3414                                        &space_mb_len);
3415         } else {
3416           //  NO_LINT_DEBUG
3417           sql_print_warning(
3418               "RocksDB: you're trying to create an index "
3419               "with a multi-level collation %s",
3420               cs->name);
3421           //  NO_LINT_DEBUG
3422           sql_print_warning(
3423               "MyRocks will handle this collation internally "
3424               " as if it had a NO_PAD attribute.");
3425           m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3426           m_skip_func = Rdb_key_def::skip_variable_length;
3427         }
3428       }
3429 
3430       if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
3431         // The collation allows to store extra information in the unpack_info
3432         // which can be used to restore the original value from the
3433         // mem-comparable form.
3434         m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
3435         m_unpack_func = codec->m_unpack_func[idx];
3436         m_charset_codec = codec;
3437       } else if (use_unknown_collation) {
3438         // We have no clue about how this collation produces mem-comparable
3439         // form. Our way of restoring the original value is to keep a copy of
3440         // the original value in unpack_info.
3441         m_unpack_info_stores_value = true;
3442         m_make_unpack_info_func = is_varchar
3443                                       ? Rdb_key_def::make_unpack_unknown_varchar
3444                                       : Rdb_key_def::make_unpack_unknown;
3445         m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
3446                                    : Rdb_key_def::unpack_unknown;
3447       } else {
3448         // Same as above: we don't know how to restore the value from its
3449         // mem-comparable form.
3450         // Here, we just indicate to the SQL layer we can't do it.
3451         DBUG_ASSERT(m_unpack_func == nullptr);
3452         m_unpack_info_stores_value = false;
3453         res = false;  // Indicate that index-only reads are not possible
3454       }
3455     }
3456 
3457     // Make an adjustment: if this column is partially covered, tell the SQL
3458     // layer we can't do index-only scans. Later when we perform an index read,
3459     // we'll check on a record-by-record basis if we can do an index-only scan
3460     // or not.
3461     uint field_length;
3462     if (field->table) {
3463       field_length = field->table->field[field->field_index]->field_length;
3464     } else {
3465       field_length = field->field_length;
3466     }
3467 
3468     if (field_length != key_length) {
3469       res = false;
3470       // If this index doesn't support covered bitmaps, then we won't know
3471       // during a read if the column is actually covered or not. If so, we need
3472       // to assume the column isn't covered and skip it during unpacking.
3473       //
3474       // If key_descr == NULL, then this is a dummy field and we probably don't
3475       // need to perform this step. However, to preserve the behavior before
3476       // this change, we'll only skip this step if we have an index which
3477       // supports covered bitmaps.
3478       if (!key_descr || !key_descr->use_covered_bitmap_format()) {
3479         m_unpack_func = nullptr;
3480         m_make_unpack_info_func = nullptr;
3481         m_unpack_info_stores_value = true;
3482       }
3483     }
3484   }
3485 
3486   m_covered = res;
3487   return res;
3488 }
3489 
get_field_in_table(const TABLE * const tbl) const3490 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
3491   return tbl->key_info[m_keynr].key_part[m_key_part].field;
3492 }
3493 
fill_hidden_pk_val(uchar ** dst,const longlong hidden_pk_id) const3494 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
3495                                            const longlong hidden_pk_id) const {
3496   DBUG_ASSERT(m_max_image_len == 8);
3497 
3498   String to;
3499   rdb_netstr_append_uint64(&to, hidden_pk_id);
3500   memcpy(*dst, to.ptr(), m_max_image_len);
3501 
3502   *dst += m_max_image_len;
3503 }
3504 
3505 ///////////////////////////////////////////////////////////////////////////////////////////
3506 // Rdb_ddl_manager
3507 ///////////////////////////////////////////////////////////////////////////////////////////
3508 
~Rdb_tbl_def()3509 Rdb_tbl_def::~Rdb_tbl_def() {
3510   auto ddl_manager = rdb_get_ddl_manager();
3511   /* Don't free key definitions */
3512   if (m_key_descr_arr) {
3513     for (uint i = 0; i < m_key_count; i++) {
3514       if (ddl_manager && m_key_descr_arr[i]) {
3515         ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
3516       }
3517 
3518       m_key_descr_arr[i] = nullptr;
3519     }
3520 
3521     delete[] m_key_descr_arr;
3522     m_key_descr_arr = nullptr;
3523   }
3524 }
3525 
3526 /*
3527   Put table definition DDL entry. Actual write is done at
3528   Rdb_dict_manager::commit.
3529 
3530   We write
3531     dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
3532 
3533   Where key entries are a tuple of
3534     ( cf_id, index_nr )
3535 */
3536 
put_dict(Rdb_dict_manager * const dict,rocksdb::WriteBatch * const batch,const rocksdb::Slice & key)3537 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
3538                            rocksdb::WriteBatch *const batch,
3539                            const rocksdb::Slice &key) {
3540   StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
3541   indexes.alloc(Rdb_key_def::VERSION_SIZE +
3542                 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
3543   rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
3544 
3545   for (uint i = 0; i < m_key_count; i++) {
3546     const Rdb_key_def &kd = *m_key_descr_arr[i];
3547 
3548     uchar flags =
3549         (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
3550         (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0);
3551 
3552     const uint cf_id = kd.get_cf()->GetID();
3553     /*
3554       If cf_id already exists, cf_flags must be the same.
3555       To prevent race condition, reading/modifying/committing CF flags
3556       need to be protected by mutex (dict_manager->lock()).
3557       When RocksDB supports transaction with pessimistic concurrency
3558       control, we can switch to use it and removing mutex.
3559     */
3560     uint existing_cf_flags;
3561     const std::string cf_name = kd.get_cf()->GetName();
3562 
3563     if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
3564       // For the purposes of comparison we'll clear the partitioning bit. The
3565       // intent here is to make sure that both partitioned and non-partitioned
3566       // tables can refer to the same CF.
3567       existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3568       flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3569 
3570       if (existing_cf_flags != flags) {
3571         my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags,
3572                  existing_cf_flags);
3573         return true;
3574       }
3575     } else {
3576       dict->add_cf_flags(batch, cf_id, flags);
3577     }
3578 
3579     rdb_netstr_append_uint32(&indexes, cf_id);
3580 
3581     uint32 index_number = kd.get_index_number();
3582     rdb_netstr_append_uint32(&indexes, index_number);
3583 
3584     struct Rdb_index_info index_info;
3585     index_info.m_gl_index_id = {cf_id, index_number};
3586     index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
3587     index_info.m_index_type = kd.m_index_type;
3588     index_info.m_kv_version = kd.m_kv_format_version;
3589     index_info.m_index_flags = kd.m_index_flags_bitmap;
3590     index_info.m_ttl_duration = kd.m_ttl_duration;
3591 
3592     dict->add_or_update_index_cf_mapping(batch, &index_info);
3593   }
3594 
3595   const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
3596 
3597   dict->put_key(batch, key, svalue);
3598   return false;
3599 }
3600 
get_create_time()3601 time_t Rdb_tbl_def::get_create_time() {
3602   time_t create_time = m_create_time;
3603 
3604   if (create_time == CREATE_TIME_UNKNOWN) {
3605     // Read it from the .frm file. It's not a problem if several threads do this
3606     // concurrently
3607     char path[FN_REFLEN];
3608     snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
3609              m_dbname.c_str(), m_tablename.c_str(), reg_ext);
3610     unpack_filename(path,path);
3611     MY_STAT f_stat;
3612     if (my_stat(path, &f_stat, MYF(0)))
3613       create_time = f_stat.st_ctime;
3614     else
3615       create_time = 0; // will be shown as SQL NULL
3616     m_create_time = create_time;
3617   }
3618   return create_time;
3619 }
3620 
3621 // Length that each index flag takes inside the record.
3622 // Each index in the array maps to the enum INDEX_FLAG
3623 static const std::array<uint, 1> index_flag_lengths = {
3624     {ROCKSDB_SIZEOF_TTL_RECORD}};
3625 
has_index_flag(uint32 index_flags,enum INDEX_FLAG flag)3626 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
3627   return flag & index_flags;
3628 }
3629 
calculate_index_flag_offset(uint32 index_flags,enum INDEX_FLAG flag,uint * const length)3630 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
3631                                                 enum INDEX_FLAG flag,
3632                                                 uint *const length) {
3633   DBUG_ASSERT_IMP(flag != MAX_FLAG,
3634                   Rdb_key_def::has_index_flag(index_flags, flag));
3635 
3636   uint offset = 0;
3637   for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
3638     int mask = 1 << bit;
3639 
3640     /* Exit once we've reached the proper flag */
3641     if (flag & mask) {
3642       if (length != nullptr) {
3643         *length = index_flag_lengths[bit];
3644       }
3645       break;
3646     }
3647 
3648     if (index_flags & mask) {
3649       offset += index_flag_lengths[bit];
3650     }
3651   }
3652 
3653   return offset;
3654 }
3655 
write_index_flag_field(Rdb_string_writer * const buf,const uchar * const val,enum INDEX_FLAG flag) const3656 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
3657                                          const uchar *const val,
3658                                          enum INDEX_FLAG flag) const {
3659   uint len;
3660   uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
3661   DBUG_ASSERT(offset + len <= buf->get_current_pos());
3662   memcpy(buf->ptr() + offset, val, len);
3663 }
3664 
check_if_is_mysql_system_table()3665 void Rdb_tbl_def::check_if_is_mysql_system_table() {
3666   static const char *const system_dbs[] = {
3667       "mysql",
3668       "performance_schema",
3669       "information_schema",
3670   };
3671 
3672   m_is_mysql_system_table = false;
3673   for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
3674     if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
3675       m_is_mysql_system_table = true;
3676       break;
3677     }
3678   }
3679 }
3680 
check_and_set_read_free_rpl_table()3681 void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
3682   m_is_read_free_rpl_table =
3683 #if 0 // MARIAROCKS_NOT_YET : read-free replication is not supported
3684       rdb_read_free_regex_handler.matches(base_tablename());
3685 #else
3686       false;
3687 #endif
3688 }
3689 
set_name(const std::string & name)3690 void Rdb_tbl_def::set_name(const std::string &name) {
3691   int err MY_ATTRIBUTE((__unused__));
3692 
3693   m_dbname_tablename = name;
3694   err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
3695                                        &m_partition);
3696   DBUG_ASSERT(err == 0);
3697 
3698   check_if_is_mysql_system_table();
3699 }
3700 
get_autoincr_gl_index_id()3701 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
3702   for (uint i = 0; i < m_key_count; i++) {
3703     auto &k = m_key_descr_arr[i];
3704     if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
3705         k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
3706       return k->get_gl_index_id();
3707     }
3708   }
3709 
3710   // Every table must have a primary key, even if it's hidden.
3711   abort();
3712   return GL_INDEX_ID();
3713 }
3714 
erase_index_num(const GL_INDEX_ID & gl_index_id)3715 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
3716   m_index_num_to_keydef.erase(gl_index_id);
3717 }
3718 
add_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3719 void Rdb_ddl_manager::add_uncommitted_keydefs(
3720     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3721   mysql_rwlock_wrlock(&m_rwlock);
3722   for (const auto &index : indexes) {
3723     m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
3724   }
3725   mysql_rwlock_unlock(&m_rwlock);
3726 }
3727 
remove_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3728 void Rdb_ddl_manager::remove_uncommitted_keydefs(
3729     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3730   mysql_rwlock_wrlock(&m_rwlock);
3731   for (const auto &index : indexes) {
3732     m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
3733   }
3734   mysql_rwlock_unlock(&m_rwlock);
3735 }
3736 
3737 namespace  // anonymous namespace = not visible outside this source file
3738 {
3739 struct Rdb_validate_tbls : public Rdb_tables_scanner {
3740   using tbl_info_t = std::pair<std::string, bool>;
3741   using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
3742 
3743   tbl_list_t m_list;
3744 
3745   int add_table(Rdb_tbl_def *tdef) override;
3746 
3747   bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
3748 
3749   bool scan_for_frms(const std::string &datadir, const std::string &dbname,
3750                      bool *has_errors);
3751 
3752   bool check_frm_file(const std::string &fullpath, const std::string &dbname,
3753                       const std::string &tablename, bool *has_errors);
3754 };
3755 }  // anonymous namespace
3756 
3757 /*
3758   Get a list of tables that we expect to have .frm files for.  This will use the
3759   information just read from the RocksDB data dictionary.
3760 */
add_table(Rdb_tbl_def * tdef)3761 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
3762   DBUG_ASSERT(tdef != nullptr);
3763 
3764   /* Add the database/table into the list that are not temp table */
3765   if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) {
3766     bool is_partition = tdef->base_partition().size() != 0;
3767     m_list[tdef->base_dbname()].insert(
3768         tbl_info_t(tdef->base_tablename(), is_partition));
3769   }
3770 
3771   return HA_EXIT_SUCCESS;
3772 }
3773 
3774 /*
3775   Access the .frm file for this dbname/tablename and see if it is a RocksDB
3776   table (or partition table).
3777 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)3778 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
3779                                        const std::string &dbname,
3780                                        const std::string &tablename,
3781                                        bool *has_errors) {
3782   /* Check this .frm file to see what engine it uses */
3783   String fullfilename(fullpath.c_str(), &my_charset_bin);
3784   fullfilename.append(FN_DIRSEP);
3785   fullfilename.append(tablename.c_str());
3786   fullfilename.append(".frm");
3787 
3788   /*
3789     This function will return the legacy_db_type of the table.  Currently
3790     it does not reference the first parameter (THD* thd), but if it ever
3791     did in the future we would need to make a version that does it without
3792     the connection handle as we don't have one here.
3793   */
3794   char eng_type_buf[NAME_CHAR_LEN+1];
3795   LEX_CSTRING eng_type_str = {eng_type_buf, 0};
3796   enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type_str);
3797   if (type == TABLE_TYPE_UNKNOWN) {
3798     // NO_LINT_DEBUG
3799     sql_print_warning("RocksDB: Failed to open/read .from file: %s",
3800                       fullfilename.ptr());
3801     return false;
3802   }
3803 
3804   if (type == TABLE_TYPE_NORMAL) {
3805     /* For a RocksDB table do we have a reference in the data dictionary? */
3806     if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) {
3807       /*
3808         Attempt to remove the table entry from the list of tables.  If this
3809         fails then we know we had a .frm file that wasn't registered in RocksDB.
3810       */
3811       tbl_info_t element(tablename, false);
3812       if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
3813         // NO_LINT_DEBUG
3814         sql_print_warning(
3815             "RocksDB: Schema mismatch - "
3816             "A .frm file exists for table %s.%s, "
3817             "but that table is not registered in RocksDB",
3818             dbname.c_str(), tablename.c_str());
3819         *has_errors = true;
3820       }
3821     } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) {
3822       /*
3823         For partition tables, see if it is in the m_list as a partition,
3824         but don't generate an error if it isn't there - we don't know that the
3825         .frm is for RocksDB.
3826       */
3827       if (m_list.count(dbname) > 0) {
3828         m_list[dbname].erase(tbl_info_t(tablename, true));
3829       }
3830     }
3831   }
3832 
3833   return true;
3834 }
3835 
3836 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)3837 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
3838                                       const std::string &dbname,
3839                                       bool *has_errors) {
3840   bool result = true;
3841   std::string fullpath = datadir + dbname;
3842   struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
3843 
3844   /* Access the directory */
3845   if (dir_info == nullptr) {
3846     // NO_LINT_DEBUG
3847     sql_print_warning("RocksDB: Could not open database directory: %s",
3848                       fullpath.c_str());
3849     return false;
3850   }
3851 
3852   /* Scan through the files in the directory */
3853   struct fileinfo *file_info = dir_info->dir_entry;
3854   for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3855     /* Find .frm files that are not temp files (those that contain '#sql') */
3856     const char *ext = strrchr(file_info->name, '.');
3857     if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
3858         strcmp(ext, ".frm") == 0) {
3859       std::string tablename =
3860           std::string(file_info->name, ext - file_info->name);
3861 
3862       /* Check to see if the .frm file is from RocksDB */
3863       if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
3864         result = false;
3865         break;
3866       }
3867     }
3868   }
3869 
3870   /* Remove any databases who have no more tables listed */
3871   if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
3872     m_list.erase(dbname);
3873   }
3874 
3875   /* Release the directory entry */
3876   my_dirend(dir_info);
3877 
3878   return result;
3879 }
3880 
3881 /*
3882   Scan the datadir for all databases (subdirectories) and get a list of .frm
3883   files they contain
3884 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)3885 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
3886                                                  bool *has_errors) {
3887   bool result = true;
3888   struct st_my_dir *dir_info;
3889   struct fileinfo *file_info;
3890 
3891   dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
3892   if (dir_info == nullptr) {
3893     // NO_LINT_DEBUG
3894     sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
3895     return false;
3896   }
3897 
3898   file_info = dir_info->dir_entry;
3899   for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3900     /* Ignore files/dirs starting with '.' */
3901     if (file_info->name[0] == '.') continue;
3902 
3903     /* Ignore all non-directory files */
3904     if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
3905 
3906     /* Scan all the .frm files in the directory */
3907     if (!scan_for_frms(datadir, file_info->name, has_errors)) {
3908       result = false;
3909       break;
3910     }
3911   }
3912 
3913   /* Release the directory info */
3914   my_dirend(dir_info);
3915 
3916   return result;
3917 }
3918 
3919 /*
3920   Validate that all auto increment values in the data dictionary are on a
3921   supported version.
3922 */
validate_auto_incr()3923 bool Rdb_ddl_manager::validate_auto_incr() {
3924   std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
3925 
3926   uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
3927   rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
3928   const rocksdb::Slice auto_incr_entry_slice(
3929       reinterpret_cast<char *>(auto_incr_entry),
3930       Rdb_key_def::INDEX_NUMBER_SIZE);
3931   for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
3932     const rocksdb::Slice key = it->key();
3933     const rocksdb::Slice val = it->value();
3934     GL_INDEX_ID gl_index_id;
3935 
3936     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
3937         memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
3938       break;
3939     }
3940 
3941     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
3942       return false;
3943     }
3944 
3945     if (val.size() <= Rdb_key_def::VERSION_SIZE) {
3946       return false;
3947     }
3948 
3949     // Check if we have orphaned entries for whatever reason by cross
3950     // referencing ddl entries.
3951     auto ptr = reinterpret_cast<const uchar *>(key.data());
3952     ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
3953     rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
3954     if (!m_dict->get_index_info(gl_index_id, nullptr)) {
3955       // NO_LINT_DEBUG
3956       sql_print_warning(
3957           "RocksDB: AUTOINC mismatch - "
3958           "Index number (%u, %u) found in AUTOINC "
3959           "but does not exist as a DDL entry",
3960           gl_index_id.cf_id, gl_index_id.index_id);
3961       return false;
3962     }
3963 
3964     ptr = reinterpret_cast<const uchar *>(val.data());
3965     const int version = rdb_netbuf_read_uint16(&ptr);
3966     if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
3967       // NO_LINT_DEBUG
3968       sql_print_warning(
3969           "RocksDB: AUTOINC mismatch - "
3970           "Index number (%u, %u) found in AUTOINC "
3971           "is on unsupported version %d",
3972           gl_index_id.cf_id, gl_index_id.index_id, version);
3973       return false;
3974     }
3975   }
3976 
3977   if (!it->status().ok()) {
3978     return false;
3979   }
3980 
3981   return true;
3982 }
3983 
3984 /*
3985   Validate that all the tables in the RocksDB database dictionary match the .frm
3986   files in the datadir
3987 */
validate_schemas(void)3988 bool Rdb_ddl_manager::validate_schemas(void) {
3989   bool has_errors = false;
3990   const std::string datadir = std::string(mysql_real_data_home);
3991   Rdb_validate_tbls table_list;
3992 
3993   /* Get the list of tables from the database dictionary */
3994   if (scan_for_tables(&table_list) != 0) {
3995     return false;
3996   }
3997 
3998   /* Compare that to the list of actual .frm files */
3999   if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
4000     return false;
4001   }
4002 
4003   /*
4004     Any tables left in the tables list are ones that are registered in RocksDB
4005     but don't have .frm files.
4006   */
4007   for (const auto &db : table_list.m_list) {
4008     for (const auto &table : db.second) {
4009       // NO_LINT_DEBUG
4010       sql_print_warning(
4011           "RocksDB: Schema mismatch - "
4012           "Table %s.%s is registered in RocksDB "
4013           "but does not have a .frm file",
4014           db.first.c_str(), table.first.c_str());
4015       has_errors = true;
4016     }
4017   }
4018 
4019   return !has_errors;
4020 }
4021 
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t validate_tables)4022 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4023                            Rdb_cf_manager *const cf_manager,
4024                            const uint32_t validate_tables) {
4025   m_dict = dict_arg;
4026   mysql_rwlock_init(0, &m_rwlock);
4027 
4028   /* Read the data dictionary and populate the hash */
4029   uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4030   rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4031   const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
4032                                        Rdb_key_def::INDEX_NUMBER_SIZE);
4033 
4034   /* Reading data dictionary should always skip bloom filter */
4035   rocksdb::Iterator *it = m_dict->new_iterator();
4036   int i = 0;
4037 
4038   uint max_index_id_in_dict = 0;
4039   m_dict->get_max_index_id(&max_index_id_in_dict);
4040 
4041   for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
4042     const uchar *ptr;
4043     const uchar *ptr_end;
4044     const rocksdb::Slice key = it->key();
4045     const rocksdb::Slice val = it->value();
4046 
4047     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4048         memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4049       break;
4050     }
4051 
4052     if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
4053       // NO_LINT_DEBUG
4054       sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
4055                       (int)key.size());
4056       return true;
4057     }
4058 
4059     Rdb_tbl_def *const tdef =
4060         new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
4061 
4062     // Now, read the DDLs.
4063     const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
4064     if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
4065       // NO_LINT_DEBUG
4066       sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
4067                       tdef->full_tablename().c_str());
4068       return true;
4069     }
4070     tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
4071     tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
4072 
4073     ptr = reinterpret_cast<const uchar *>(val.data());
4074     const int version = rdb_netbuf_read_uint16(&ptr);
4075     if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
4076       // NO_LINT_DEBUG
4077       sql_print_error(
4078           "RocksDB: DDL ENTRY Version was not expected."
4079           "Expected: %d, Actual: %d",
4080           Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
4081       return true;
4082     }
4083     ptr_end = ptr + real_val_size;
4084     for (uint keyno = 0; ptr < ptr_end; keyno++) {
4085       GL_INDEX_ID gl_index_id;
4086       rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4087       uint flags = 0;
4088       struct Rdb_index_info index_info;
4089       if (!m_dict->get_index_info(gl_index_id, &index_info)) {
4090         // NO_LINT_DEBUG
4091         sql_print_error(
4092             "RocksDB: Could not get index information "
4093             "for Index Number (%u,%u), table %s",
4094             gl_index_id.cf_id, gl_index_id.index_id,
4095             tdef->full_tablename().c_str());
4096         return true;
4097       }
4098       if (max_index_id_in_dict < gl_index_id.index_id) {
4099         // NO_LINT_DEBUG
4100         sql_print_error(
4101             "RocksDB: Found max index id %u from data dictionary "
4102             "but also found larger index id %u from dictionary. "
4103             "This should never happen and possibly a bug.",
4104             max_index_id_in_dict, gl_index_id.index_id);
4105         return true;
4106       }
4107       if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
4108         // NO_LINT_DEBUG
4109         sql_print_error(
4110             "RocksDB: Could not get Column Family Flags "
4111             "for CF Number %d, table %s",
4112             gl_index_id.cf_id, tdef->full_tablename().c_str());
4113         return true;
4114       }
4115 
4116       if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
4117         // The per-index cf option is deprecated.  Make sure we don't have the
4118         // flag set in any existing database.   NO_LINT_DEBUG
4119         // NO_LINT_DEBUG
4120         sql_print_error(
4121             "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
4122             "number %d, table %s",
4123             gl_index_id.cf_id, tdef->full_tablename().c_str());
4124       }
4125 
4126       rocksdb::ColumnFamilyHandle *const cfh =
4127           cf_manager->get_cf(gl_index_id.cf_id);
4128       DBUG_ASSERT(cfh != nullptr);
4129 
4130       uint32 ttl_rec_offset =
4131           Rdb_key_def::has_index_flag(index_info.m_index_flags,
4132                                       Rdb_key_def::TTL_FLAG)
4133               ? Rdb_key_def::calculate_index_flag_offset(
4134                     index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
4135               : UINT_MAX;
4136 
4137       /*
4138         We can't fully initialize Rdb_key_def object here, because full
4139         initialization requires that there is an open TABLE* where we could
4140         look at Field* objects and set max_length and other attributes
4141       */
4142       tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
4143           gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
4144           index_info.m_index_type, index_info.m_kv_version,
4145           flags & Rdb_key_def::REVERSE_CF_FLAG,
4146           flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
4147           m_dict->get_stats(gl_index_id), index_info.m_index_flags,
4148           ttl_rec_offset, index_info.m_ttl_duration);
4149     }
4150     put(tdef);
4151     i++;
4152   }
4153 
4154   /*
4155     If validate_tables is greater than 0 run the validation.  Only fail the
4156     initialzation if the setting is 1.  If the setting is 2 we continue.
4157   */
4158   if (validate_tables > 0) {
4159     std::string msg;
4160     if (!validate_schemas()) {
4161       msg =
4162           "RocksDB: Problems validating data dictionary "
4163           "against .frm files, exiting";
4164     } else if (!validate_auto_incr()) {
4165       msg =
4166           "RocksDB: Problems validating auto increment values in "
4167           "data dictionary, exiting";
4168     }
4169     if (validate_tables == 1 && !msg.empty()) {
4170       // NO_LINT_DEBUG
4171       sql_print_error("%s", msg.c_str());
4172       return true;
4173     }
4174   }
4175 
4176   // index ids used by applications should not conflict with
4177   // data dictionary index ids
4178   if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4179     max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4180   }
4181 
4182   m_sequence.init(max_index_id_in_dict + 1);
4183 
4184   if (!it->status().ok()) {
4185     rdb_log_status_error(it->status(), "Table_store load error");
4186     return true;
4187   }
4188   delete it;
4189   // NO_LINT_DEBUG
4190   sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4191                         i);
4192   return false;
4193 }
4194 
find(const std::string & table_name,const bool lock)4195 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4196                                    const bool lock) {
4197   if (lock) {
4198     mysql_rwlock_rdlock(&m_rwlock);
4199   }
4200 
4201   Rdb_tbl_def *rec = nullptr;
4202   const auto it = m_ddl_map.find(table_name);
4203   if (it != m_ddl_map.end()) {
4204     rec = it->second;
4205   }
4206 
4207   if (lock) {
4208     mysql_rwlock_unlock(&m_rwlock);
4209   }
4210 
4211   return rec;
4212 }
4213 
4214 // this is a safe version of the find() function below.  It acquires a read
4215 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4216 // are finding it.  Copying it into 'ret' increments the count making sure
4217 // that the object will not be discarded until we are finished with it.
safe_find(GL_INDEX_ID gl_index_id)4218 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
4219     GL_INDEX_ID gl_index_id) {
4220   std::shared_ptr<const Rdb_key_def> ret(nullptr);
4221 
4222   mysql_rwlock_rdlock(&m_rwlock);
4223 
4224   auto it = m_index_num_to_keydef.find(gl_index_id);
4225   if (it != m_index_num_to_keydef.end()) {
4226     const auto table_def = find(it->second.first, false);
4227     if (table_def && it->second.second < table_def->m_key_count) {
4228       const auto &kd = table_def->m_key_descr_arr[it->second.second];
4229       if (kd->max_storage_fmt_length() != 0) {
4230         ret = kd;
4231       }
4232     }
4233   } else {
4234     auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4235     if (it != m_index_num_to_uncommitted_keydef.end()) {
4236       const auto &kd = it->second;
4237       if (kd->max_storage_fmt_length() != 0) {
4238         ret = kd;
4239       }
4240     }
4241   }
4242 
4243   mysql_rwlock_unlock(&m_rwlock);
4244 
4245   return ret;
4246 }
4247 
4248 // this method assumes at least read-only lock on m_rwlock
find(GL_INDEX_ID gl_index_id)4249 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
4250     GL_INDEX_ID gl_index_id) {
4251   auto it = m_index_num_to_keydef.find(gl_index_id);
4252   if (it != m_index_num_to_keydef.end()) {
4253     auto table_def = find(it->second.first, false);
4254     if (table_def) {
4255       if (it->second.second < table_def->m_key_count) {
4256         return table_def->m_key_descr_arr[it->second.second];
4257       }
4258     }
4259   } else {
4260     auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4261     if (it != m_index_num_to_uncommitted_keydef.end()) {
4262       return it->second;
4263     }
4264   }
4265 
4266   static std::shared_ptr<Rdb_key_def> empty = nullptr;
4267 
4268   return empty;
4269 }
4270 
4271 // this method returns the name of the table based on an index id. It acquires
4272 // a read lock on m_rwlock.
safe_get_table_name(const GL_INDEX_ID & gl_index_id)4273 const std::string Rdb_ddl_manager::safe_get_table_name(
4274     const GL_INDEX_ID &gl_index_id) {
4275   std::string ret;
4276   mysql_rwlock_rdlock(&m_rwlock);
4277   auto it = m_index_num_to_keydef.find(gl_index_id);
4278   if (it != m_index_num_to_keydef.end()) {
4279     ret = it->second.first;
4280   }
4281   mysql_rwlock_unlock(&m_rwlock);
4282   return ret;
4283 }
4284 
set_stats(const std::unordered_map<GL_INDEX_ID,Rdb_index_stats> & stats)4285 void Rdb_ddl_manager::set_stats(
4286     const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4287   mysql_rwlock_wrlock(&m_rwlock);
4288   for (auto src : stats) {
4289     const auto &keydef = find(src.second.m_gl_index_id);
4290     if (keydef) {
4291       keydef->m_stats = src.second;
4292       m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4293     }
4294   }
4295   mysql_rwlock_unlock(&m_rwlock);
4296 }
4297 
adjust_stats(const std::vector<Rdb_index_stats> & new_data,const std::vector<Rdb_index_stats> & deleted_data)4298 void Rdb_ddl_manager::adjust_stats(
4299     const std::vector<Rdb_index_stats> &new_data,
4300     const std::vector<Rdb_index_stats> &deleted_data) {
4301   mysql_rwlock_wrlock(&m_rwlock);
4302   int i = 0;
4303   for (const auto &data : {new_data, deleted_data}) {
4304     for (const auto &src : data) {
4305       const auto &keydef = find(src.m_gl_index_id);
4306       if (keydef) {
4307         keydef->m_stats.m_distinct_keys_per_prefix.resize(
4308             keydef->get_key_parts());
4309         keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4310         m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4311       }
4312     }
4313     i++;
4314   }
4315   const bool should_save_stats = !m_stats2store.empty();
4316   mysql_rwlock_unlock(&m_rwlock);
4317   if (should_save_stats) {
4318     // Queue an async persist_stats(false) call to the background thread.
4319     rdb_queue_save_stats_request();
4320   }
4321 }
4322 
persist_stats(const bool sync)4323 void Rdb_ddl_manager::persist_stats(const bool sync) {
4324   mysql_rwlock_wrlock(&m_rwlock);
4325   const auto local_stats2store = std::move(m_stats2store);
4326   m_stats2store.clear();
4327   mysql_rwlock_unlock(&m_rwlock);
4328 
4329   // Persist stats
4330   const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4331   std::vector<Rdb_index_stats> stats;
4332   std::transform(local_stats2store.begin(), local_stats2store.end(),
4333                  std::back_inserter(stats),
4334                  [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4335                    return s.second;
4336                  });
4337   m_dict->add_stats(wb.get(), stats);
4338   m_dict->commit(wb.get(), sync);
4339 }
4340 
4341 /*
4342   Put table definition of `tbl` into the mapping, and also write it to the
4343   on-disk data dictionary.
4344 */
4345 
put_and_write(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch)4346 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
4347                                    rocksdb::WriteBatch *const batch) {
4348   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
4349 
4350   buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4351 
4352   const std::string &dbname_tablename = tbl->full_tablename();
4353   buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4354 
4355   int res;
4356   if ((res = tbl->put_dict(m_dict, batch, buf_writer.to_slice()))) {
4357     return res;
4358   }
4359   if ((res = put(tbl))) {
4360     return res;
4361   }
4362   return HA_EXIT_SUCCESS;
4363 }
4364 
4365 /* Return 0 - ok, other value - error */
4366 /* TODO:
4367   This function modifies m_ddl_map and m_index_num_to_keydef.
4368   However, these changes need to be reversed if dict_manager.commit fails
4369   See the discussion here: https://reviews.facebook.net/D35925#inline-259167
4370   Tracked by https://github.com/facebook/mysql-5.6/issues/33
4371 */
put(Rdb_tbl_def * const tbl,const bool lock)4372 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
4373   Rdb_tbl_def *rec;
4374   const std::string &dbname_tablename = tbl->full_tablename();
4375 
4376   if (lock) mysql_rwlock_wrlock(&m_rwlock);
4377 
4378   // We have to do this find because 'tbl' is not yet in the list.  We need
4379   // to find the one we are replacing ('rec')
4380   rec = find(dbname_tablename, false);
4381   if (rec) {
4382     // Free the old record.
4383     delete rec;
4384     m_ddl_map.erase(dbname_tablename);
4385   }
4386   m_ddl_map.emplace(dbname_tablename, tbl);
4387 
4388   for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
4389     m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
4390         std::make_pair(dbname_tablename, keyno);
4391   }
4392   tbl->check_and_set_read_free_rpl_table();
4393 
4394   if (lock) mysql_rwlock_unlock(&m_rwlock);
4395   return 0;
4396 }
4397 
remove(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch,const bool lock)4398 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
4399                              rocksdb::WriteBatch *const batch,
4400                              const bool lock) {
4401   if (lock) mysql_rwlock_wrlock(&m_rwlock);
4402 
4403   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
4404   key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4405   const std::string &dbname_tablename = tbl->full_tablename();
4406   key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4407 
4408   m_dict->delete_key(batch, key_writer.to_slice());
4409 
4410   const auto it = m_ddl_map.find(dbname_tablename);
4411   if (it != m_ddl_map.end()) {
4412     // Free Rdb_tbl_def
4413     delete it->second;
4414 
4415     m_ddl_map.erase(it);
4416   }
4417 
4418   if (lock) mysql_rwlock_unlock(&m_rwlock);
4419 }
4420 
rename(const std::string & from,const std::string & to,rocksdb::WriteBatch * const batch)4421 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
4422                              rocksdb::WriteBatch *const batch) {
4423   Rdb_tbl_def *rec;
4424   Rdb_tbl_def *new_rec;
4425   bool res = true;
4426   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
4427 
4428   mysql_rwlock_wrlock(&m_rwlock);
4429   if (!(rec = find(from, false))) {
4430     mysql_rwlock_unlock(&m_rwlock);
4431     return true;
4432   }
4433 
4434   new_rec = new Rdb_tbl_def(to);
4435 
4436   new_rec->m_key_count = rec->m_key_count;
4437   new_rec->m_auto_incr_val =
4438       rec->m_auto_incr_val.load(std::memory_order_relaxed);
4439   new_rec->m_key_descr_arr = rec->m_key_descr_arr;
4440 
4441   new_rec->m_hidden_pk_val =
4442       rec->m_hidden_pk_val.load(std::memory_order_relaxed);
4443 
4444   // so that it's not free'd when deleting the old rec
4445   rec->m_key_descr_arr = nullptr;
4446 
4447   // Create a new key
4448   new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4449 
4450   const std::string &dbname_tablename = new_rec->full_tablename();
4451   new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4452 
4453   // Create a key to add
4454   if (!new_rec->put_dict(m_dict, batch, new_buf_writer.to_slice())) {
4455     remove(rec, batch, false);
4456     put(new_rec, false);
4457     res = false;  // ok
4458   }
4459 
4460   mysql_rwlock_unlock(&m_rwlock);
4461   return res;
4462 }
4463 
cleanup()4464 void Rdb_ddl_manager::cleanup() {
4465   for (const auto &kv : m_ddl_map) {
4466     delete kv.second;
4467   }
4468   m_ddl_map.clear();
4469 
4470   mysql_rwlock_destroy(&m_rwlock);
4471   m_sequence.cleanup();
4472 }
4473 
scan_for_tables(Rdb_tables_scanner * const tables_scanner)4474 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
4475   int ret;
4476   Rdb_tbl_def *rec;
4477 
4478   DBUG_ASSERT(tables_scanner != nullptr);
4479 
4480   mysql_rwlock_rdlock(&m_rwlock);
4481 
4482   ret = 0;
4483 
4484   for (const auto &kv : m_ddl_map) {
4485     rec = kv.second;
4486     ret = tables_scanner->add_table(rec);
4487     if (ret) break;
4488   }
4489 
4490   mysql_rwlock_unlock(&m_rwlock);
4491   return ret;
4492 }
4493 
4494 /*
4495   Rdb_binlog_manager class implementation
4496 */
4497 
init(Rdb_dict_manager * const dict_arg)4498 bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) {
4499   DBUG_ASSERT(dict_arg != nullptr);
4500   m_dict = dict_arg;
4501 
4502   m_key_writer.reset();
4503   m_key_writer.write_index(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER);
4504   m_key_slice = m_key_writer.to_slice();
4505   return false;
4506 }
4507 
cleanup()4508 void Rdb_binlog_manager::cleanup() {}
4509 
4510 /**
4511   Set binlog name, pos and optionally gtid into WriteBatch.
4512   This function should be called as part of transaction commit,
4513   since binlog info is set only at transaction commit.
4514   Actual write into RocksDB is not done here, so checking if
4515   write succeeded or not is not possible here.
4516   @param binlog_name   Binlog name
4517   @param binlog_pos    Binlog pos
4518   @param batch         WriteBatch
4519 */
update(const char * const binlog_name,const my_off_t binlog_pos,rocksdb::WriteBatchBase * const batch)4520 void Rdb_binlog_manager::update(const char *const binlog_name,
4521                                 const my_off_t binlog_pos,
4522                                 rocksdb::WriteBatchBase *const batch) {
4523   if (binlog_name && binlog_pos) {
4524     // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024
4525     const size_t RDB_MAX_BINLOG_INFO_LEN = 1024;
4526     Rdb_buf_writer<RDB_MAX_BINLOG_INFO_LEN> value_writer;
4527 
4528     // store version
4529     value_writer.write_uint16(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION);
4530 
4531     // store binlog file name length
4532     DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN);
4533     const uint16_t binlog_name_len = strlen(binlog_name);
4534     value_writer.write_uint16(binlog_name_len);
4535 
4536     // store binlog file name
4537     value_writer.write(binlog_name, binlog_name_len);
4538 
4539     // store binlog pos
4540     value_writer.write_uint32(binlog_pos);
4541 
4542 #ifdef MARIADB_MERGE_2019
4543     // store binlog gtid length.
4544     // If gtid was not set, store 0 instead
4545     const uint16_t binlog_max_gtid_len =
4546         binlog_max_gtid ? strlen(binlog_max_gtid) : 0;
4547     value_writer.write_uint16(binlog_max_gtid_len);
4548 
4549     if (binlog_max_gtid_len > 0) {
4550       // store binlog gtid
4551       value_writer.write(binlog_max_gtid, binlog_max_gtid_len);
4552     }
4553 #endif
4554 
4555     m_dict->put_key(batch, m_key_slice, value_writer.to_slice());
4556   }
4557 }
4558 
4559 /**
4560   Read binlog committed entry stored in RocksDB, then unpack
4561   @param[OUT] binlog_name  Binlog name
4562   @param[OUT] binlog_pos   Binlog pos
4563   @param[OUT] binlog_gtid  Binlog GTID
4564   @return
4565     true is binlog info was found (valid behavior)
4566     false otherwise
4567 */
read(char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4568 bool Rdb_binlog_manager::read(char *const binlog_name,
4569                               my_off_t *const binlog_pos,
4570                               char *const binlog_gtid) const {
4571   bool ret = false;
4572   if (binlog_name) {
4573     std::string value;
4574     rocksdb::Status status = m_dict->get_value(m_key_slice, &value);
4575     if (status.ok()) {
4576       if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos,
4577                         binlog_gtid)) {
4578         ret = true;
4579       }
4580     }
4581   }
4582   return ret;
4583 }
4584 
4585 /**
4586   Unpack value then split into binlog_name, binlog_pos (and binlog_gtid)
4587   @param[IN]  value        Binlog state info fetched from RocksDB
4588   @param[OUT] binlog_name  Binlog name
4589   @param[OUT] binlog_pos   Binlog pos
4590   @param[OUT] binlog_gtid  Binlog GTID
4591   @return     true on error
4592 */
unpack_value(const uchar * const value,size_t value_size_arg,char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4593 bool Rdb_binlog_manager::unpack_value(const uchar *const value,
4594                                       size_t value_size_arg,
4595                                       char *const binlog_name,
4596                                       my_off_t *const binlog_pos,
4597                                       char *const binlog_gtid) const {
4598   uint pack_len = 0;
4599   intmax_t value_size= value_size_arg;
4600 
4601   DBUG_ASSERT(binlog_pos != nullptr);
4602 
4603   if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0)
4604     return true;
4605   // read version
4606   const uint16_t version = rdb_netbuf_to_uint16(value);
4607 
4608   pack_len += Rdb_key_def::VERSION_SIZE;
4609   if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION) return true;
4610 
4611   if ((value_size -= sizeof(uint16)) < 0)
4612     return true;
4613 
4614   // read binlog file name length
4615   const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len);
4616   pack_len += sizeof(uint16);
4617 
4618   if (binlog_name_len >= (FN_REFLEN+1))
4619     return true;
4620 
4621   if ((value_size -= binlog_name_len) < 0)
4622     return true;
4623 
4624   if (binlog_name_len) {
4625     // read and set binlog name
4626     memcpy(binlog_name, value + pack_len, binlog_name_len);
4627     binlog_name[binlog_name_len] = '\0';
4628     pack_len += binlog_name_len;
4629 
4630     if ((value_size -= sizeof(uint32)) < 0)
4631       return true;
4632     // read and set binlog pos
4633     *binlog_pos = rdb_netbuf_to_uint32(value + pack_len);
4634     pack_len += sizeof(uint32);
4635 
4636     if ((value_size -= sizeof(uint16)) < 0)
4637       return true;
4638     // read gtid length
4639     const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len);
4640     pack_len += sizeof(uint16);
4641 
4642     if (binlog_gtid_len >= GTID_BUF_LEN)
4643       return true;
4644     if ((value_size -= binlog_gtid_len) < 0)
4645       return true;
4646 
4647     if (binlog_gtid && binlog_gtid_len > 0) {
4648       // read and set gtid
4649       memcpy(binlog_gtid, value + pack_len, binlog_gtid_len);
4650       binlog_gtid[binlog_gtid_len] = '\0';
4651       pack_len += binlog_gtid_len;
4652     }
4653   }
4654   return false;
4655 }
4656 
4657 /**
4658   Inserts a row into mysql.slave_gtid_info table. Doing this inside
4659   storage engine is more efficient than inserting/updating through MySQL.
4660 
4661   @param[IN] id Primary key of the table.
4662   @param[IN] db Database name. This is column 2 of the table.
4663   @param[IN] gtid Gtid in human readable form. This is column 3 of the table.
4664   @param[IN] write_batch Handle to storage engine writer.
4665 */
update_slave_gtid_info(const uint id,const char * const db,const char * const gtid,rocksdb::WriteBatchBase * const write_batch)4666 void Rdb_binlog_manager::update_slave_gtid_info(
4667     const uint id, const char *const db, const char *const gtid,
4668     rocksdb::WriteBatchBase *const write_batch) {
4669   if (id && db && gtid) {
4670     // Make sure that if the slave_gtid_info table exists we have a
4671     // pointer to it via m_slave_gtid_info_tbl.
4672     if (!m_slave_gtid_info_tbl.load()) {
4673       m_slave_gtid_info_tbl.store(
4674           rdb_get_ddl_manager()->find("mysql.slave_gtid_info"));
4675     }
4676     if (!m_slave_gtid_info_tbl.load()) {
4677       // slave_gtid_info table is not present. Simply return.
4678       return;
4679     }
4680     DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1);
4681 
4682     const std::shared_ptr<const Rdb_key_def> &kd =
4683         m_slave_gtid_info_tbl.load()->m_key_descr_arr[0];
4684     String value;
4685 
4686     // Build key
4687     Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE + 4> key_writer;
4688     key_writer.write_index(kd->get_index_number());
4689     key_writer.write_uint32(id);
4690 
4691     // Build value
4692     Rdb_buf_writer<128> value_writer;
4693     DBUG_ASSERT(gtid);
4694     const uint db_len = strlen(db);
4695     const uint gtid_len = strlen(gtid);
4696     // 1 byte used for flags. Empty here.
4697     value_writer.write_byte(0);
4698 
4699     // Write column 1.
4700     DBUG_ASSERT(strlen(db) <= 64);
4701     value_writer.write_byte(db_len);
4702     value_writer.write(db, db_len);
4703 
4704     // Write column 2.
4705     DBUG_ASSERT(gtid_len <= 56);
4706     value_writer.write_byte(gtid_len);
4707     value_writer.write(gtid, gtid_len);
4708 
4709     write_batch->Put(kd->get_cf(), key_writer.to_slice(),
4710                      value_writer.to_slice());
4711   }
4712 }
4713 
init(rocksdb::TransactionDB * const rdb_dict,Rdb_cf_manager * const cf_manager)4714 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
4715                             Rdb_cf_manager *const cf_manager) {
4716   DBUG_ASSERT(rdb_dict != nullptr);
4717   DBUG_ASSERT(cf_manager != nullptr);
4718 
4719   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
4720 
4721   m_db = rdb_dict;
4722 
4723   m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME);
4724   rocksdb::ColumnFamilyHandle *default_cfh =
4725       cf_manager->get_cf(DEFAULT_CF_NAME);
4726 
4727   // System CF and default CF should be initialized
4728   if (m_system_cfh == nullptr || default_cfh == nullptr) {
4729     return HA_EXIT_FAILURE;
4730   }
4731 
4732   rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
4733 
4734   m_key_slice_max_index_id =
4735       rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
4736                      Rdb_key_def::INDEX_NUMBER_SIZE);
4737 
4738   resume_drop_indexes();
4739   rollback_ongoing_index_creation();
4740 
4741   // Initialize system CF and default CF flags
4742   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
4743   rocksdb::WriteBatch *const batch = wb.get();
4744 
4745   add_cf_flags(batch, m_system_cfh->GetID(), 0);
4746   add_cf_flags(batch, default_cfh->GetID(), 0);
4747   commit(batch);
4748 
4749   return HA_EXIT_SUCCESS;
4750 }
4751 
begin() const4752 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
4753   return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
4754 }
4755 
put_key(rocksdb::WriteBatchBase * const batch,const rocksdb::Slice & key,const rocksdb::Slice & value) const4756 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
4757                                const rocksdb::Slice &key,
4758                                const rocksdb::Slice &value) const {
4759   batch->Put(m_system_cfh, key, value);
4760 }
4761 
get_value(const rocksdb::Slice & key,std::string * const value) const4762 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
4763                                             std::string *const value) const {
4764   rocksdb::ReadOptions options;
4765   options.total_order_seek = true;
4766   return m_db->Get(options, m_system_cfh, key, value);
4767 }
4768 
delete_key(rocksdb::WriteBatchBase * batch,const rocksdb::Slice & key) const4769 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
4770                                   const rocksdb::Slice &key) const {
4771   batch->Delete(m_system_cfh, key);
4772 }
4773 
new_iterator() const4774 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
4775   /* Reading data dictionary should always skip bloom filter */
4776   rocksdb::ReadOptions read_options;
4777   read_options.total_order_seek = true;
4778   return m_db->NewIterator(read_options, m_system_cfh);
4779 }
4780 
commit(rocksdb::WriteBatch * const batch,const bool sync) const4781 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
4782                              const bool sync) const {
4783   if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
4784   int res = HA_EXIT_SUCCESS;
4785   rocksdb::WriteOptions options;
4786   options.sync = sync;
4787   rocksdb::TransactionDBWriteOptimizations optimize;
4788   optimize.skip_concurrency_control = true;
4789   rocksdb::Status s = m_db->Write(options, optimize, batch);
4790   res = !s.ok();  // we return true when something failed
4791   if (res) {
4792     rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
4793   }
4794   batch->Clear();
4795   return res;
4796 }
4797 
dump_index_id(uchar * const netbuf,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id)4798 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
4799                                      Rdb_key_def::DATA_DICT_TYPE dict_type,
4800                                      const GL_INDEX_ID &gl_index_id) {
4801   rdb_netbuf_store_uint32(netbuf, dict_type);
4802   rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
4803                           gl_index_id.cf_id);
4804   rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
4805                           gl_index_id.index_id);
4806 }
4807 
delete_with_prefix(rocksdb::WriteBatch * const batch,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id) const4808 void Rdb_dict_manager::delete_with_prefix(
4809     rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
4810     const GL_INDEX_ID &gl_index_id) const {
4811   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4812   dump_index_id(&key_writer, dict_type, gl_index_id);
4813 
4814   delete_key(batch, key_writer.to_slice());
4815 }
4816 
add_or_update_index_cf_mapping(rocksdb::WriteBatch * batch,struct Rdb_index_info * const index_info) const4817 void Rdb_dict_manager::add_or_update_index_cf_mapping(
4818     rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
4819   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4820   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
4821                 index_info->m_gl_index_id);
4822 
4823   Rdb_buf_writer<256> value_writer;
4824 
4825   value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
4826   value_writer.write_byte(index_info->m_index_type);
4827   value_writer.write_uint16(index_info->m_kv_version);
4828   value_writer.write_uint32(index_info->m_index_flags);
4829   value_writer.write_uint64(index_info->m_ttl_duration);
4830 
4831   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4832 }
4833 
add_cf_flags(rocksdb::WriteBatch * const batch,const uint32_t cf_id,const uint32_t cf_flags) const4834 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
4835                                     const uint32_t cf_id,
4836                                     const uint32_t cf_flags) const {
4837   DBUG_ASSERT(batch != nullptr);
4838 
4839   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4840   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4841   key_writer.write_uint32(cf_id);
4842 
4843   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
4844       value_writer;
4845   value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
4846   value_writer.write_uint32(cf_flags);
4847 
4848   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4849 }
4850 
delete_index_info(rocksdb::WriteBatch * batch,const GL_INDEX_ID & gl_index_id) const4851 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
4852                                          const GL_INDEX_ID &gl_index_id) const {
4853   delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
4854   delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
4855   delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
4856 }
4857 
get_index_info(const GL_INDEX_ID & gl_index_id,struct Rdb_index_info * const index_info) const4858 bool Rdb_dict_manager::get_index_info(
4859     const GL_INDEX_ID &gl_index_id,
4860     struct Rdb_index_info *const index_info) const {
4861   if (index_info) {
4862     index_info->m_gl_index_id = gl_index_id;
4863   }
4864 
4865   bool found = false;
4866   bool error = false;
4867   std::string value;
4868   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4869   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
4870 
4871   const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
4872   if (status.ok()) {
4873     if (!index_info) {
4874       return true;
4875     }
4876 
4877     const uchar *const val = (const uchar *)value.c_str();
4878     const uchar *ptr = val;
4879     index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
4880     ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
4881 
4882     switch (index_info->m_index_dict_version) {
4883       case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
4884         /* Sanity check to prevent reading bogus TTL record. */
4885         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4886                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4887                                 RDB_SIZEOF_INDEX_FLAGS +
4888                                 ROCKSDB_SIZEOF_TTL_RECORD) {
4889           error = true;
4890           break;
4891         }
4892         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4893         ptr += RDB_SIZEOF_INDEX_TYPE;
4894         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4895         ptr += RDB_SIZEOF_KV_VERSION;
4896         index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
4897         ptr += RDB_SIZEOF_INDEX_FLAGS;
4898         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4899         found = true;
4900         break;
4901 
4902       case Rdb_key_def::INDEX_INFO_VERSION_TTL:
4903         /* Sanity check to prevent reading bogus into TTL record. */
4904         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4905                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4906                                 ROCKSDB_SIZEOF_TTL_RECORD) {
4907           error = true;
4908           break;
4909         }
4910         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4911         ptr += RDB_SIZEOF_INDEX_TYPE;
4912         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4913         ptr += RDB_SIZEOF_KV_VERSION;
4914         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4915         if ((index_info->m_kv_version ==
4916              Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
4917             index_info->m_ttl_duration > 0) {
4918           index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
4919         }
4920         found = true;
4921         break;
4922 
4923       case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
4924       case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
4925         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4926         ptr += RDB_SIZEOF_INDEX_TYPE;
4927         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4928         found = true;
4929         break;
4930 
4931       default:
4932         error = true;
4933         break;
4934     }
4935 
4936     switch (index_info->m_index_type) {
4937       case Rdb_key_def::INDEX_TYPE_PRIMARY:
4938       case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
4939         error = index_info->m_kv_version >
4940                 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
4941         break;
4942       }
4943       case Rdb_key_def::INDEX_TYPE_SECONDARY:
4944         error = index_info->m_kv_version >
4945                 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
4946         break;
4947       default:
4948         error = true;
4949         break;
4950     }
4951   }
4952 
4953   if (error) {
4954     // NO_LINT_DEBUG
4955     sql_print_error(
4956         "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
4957         "from data dictionary. This should never happen "
4958         "and it may be a bug.",
4959         index_info->m_index_dict_version, index_info->m_index_type,
4960         index_info->m_kv_version, index_info->m_ttl_duration);
4961     abort();
4962   }
4963 
4964   return found;
4965 }
4966 
get_cf_flags(const uint32_t cf_id,uint32_t * const cf_flags) const4967 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
4968                                     uint32_t *const cf_flags) const {
4969   DBUG_ASSERT(cf_flags != nullptr);
4970 
4971   bool found = false;
4972   std::string value;
4973   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4974 
4975   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4976   key_writer.write_uint32(cf_id);
4977 
4978   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
4979 
4980   if (status.ok()) {
4981     const uchar *val = (const uchar *)value.c_str();
4982     DBUG_ASSERT(val);
4983 
4984     const uint16_t version = rdb_netbuf_to_uint16(val);
4985 
4986     if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
4987       *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
4988       found = true;
4989     }
4990   }
4991 
4992   return found;
4993 }
4994 
4995 /*
4996   Returning index ids that were marked as deleted (via DROP TABLE) but
4997   still not removed by drop_index_thread yet, or indexes that are marked as
4998   ongoing creation.
4999  */
get_ongoing_index_operation(std::unordered_set<GL_INDEX_ID> * gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const5000 void Rdb_dict_manager::get_ongoing_index_operation(
5001     std::unordered_set<GL_INDEX_ID> *gl_index_ids,
5002     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5003   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5004               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5005 
5006   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
5007   index_writer.write_uint32(dd_type);
5008   const rocksdb::Slice index_slice = index_writer.to_slice();
5009 
5010   rocksdb::Iterator *it = new_iterator();
5011   for (it->Seek(index_slice); it->Valid(); it->Next()) {
5012     rocksdb::Slice key = it->key();
5013     const uchar *const ptr = (const uchar *)key.data();
5014 
5015     /*
5016       Ongoing drop/create index operations require key to be of the form:
5017       dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
5018 
5019       This may need to be changed in the future if we want to process a new
5020       ddl_type with different format.
5021     */
5022     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
5023         rdb_netbuf_to_uint32(ptr) != dd_type) {
5024       break;
5025     }
5026 
5027     // We don't check version right now since currently we always store only
5028     // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
5029     // If increasing version number, we need to add version check logic here.
5030     GL_INDEX_ID gl_index_id;
5031     gl_index_id.cf_id =
5032         rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5033     gl_index_id.index_id =
5034         rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
5035     gl_index_ids->insert(gl_index_id);
5036   }
5037   delete it;
5038 }
5039 
5040 /*
5041   Returning true if index_id is create/delete ongoing (undergoing creation or
5042   marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
5043   or not.
5044  */
is_index_operation_ongoing(const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5045 bool Rdb_dict_manager::is_index_operation_ongoing(
5046     const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5047   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5048               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5049 
5050   bool found = false;
5051   std::string value;
5052   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5053   dump_index_id(&key_writer, dd_type, gl_index_id);
5054 
5055   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5056   if (status.ok()) {
5057     found = true;
5058   }
5059   return found;
5060 }
5061 
5062 /*
5063   Adding index_id to data dictionary so that the index id is removed
5064   by drop_index_thread, or to track online index creation.
5065  */
start_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5066 void Rdb_dict_manager::start_ongoing_index_operation(
5067     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5068     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5069   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5070               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5071 
5072   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5073   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
5074 
5075   dump_index_id(&key_writer, dd_type, gl_index_id);
5076 
5077   // version as needed
5078   if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5079     value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
5080   } else {
5081     value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
5082   }
5083 
5084   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5085 }
5086 
5087 /*
5088   Removing index_id from data dictionary to confirm drop_index_thread
5089   completed dropping entire key/values of the index_id
5090  */
end_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5091 void Rdb_dict_manager::end_ongoing_index_operation(
5092     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5093     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5094   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5095               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5096 
5097   delete_with_prefix(batch, dd_type, gl_index_id);
5098 }
5099 
5100 /*
5101   Returning true if there is no target index ids to be removed
5102   by drop_index_thread
5103  */
is_drop_index_empty() const5104 bool Rdb_dict_manager::is_drop_index_empty() const {
5105   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5106   get_ongoing_drop_indexes(&gl_index_ids);
5107   return gl_index_ids.empty();
5108 }
5109 
5110 /*
5111   This function is supposed to be called by DROP TABLE. Logging messages
5112   that dropping indexes started, and adding data dictionary so that
5113   all associated indexes to be removed
5114  */
add_drop_table(std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,rocksdb::WriteBatch * const batch) const5115 void Rdb_dict_manager::add_drop_table(
5116     std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5117     rocksdb::WriteBatch *const batch) const {
5118   std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5119   for (uint32 i = 0; i < n_keys; i++) {
5120     dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5121   }
5122 
5123   add_drop_index(dropped_index_ids, batch);
5124 }
5125 
5126 /*
5127   Called during inplace index drop operations. Logging messages
5128   that dropping indexes started, and adding data dictionary so that
5129   all associated indexes to be removed
5130  */
add_drop_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5131 void Rdb_dict_manager::add_drop_index(
5132     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5133     rocksdb::WriteBatch *const batch) const {
5134   for (const auto &gl_index_id : gl_index_ids) {
5135     log_start_drop_index(gl_index_id, "Begin");
5136     start_drop_index(batch, gl_index_id);
5137   }
5138 }
5139 
5140 /*
5141   Called during inplace index creation operations. Logging messages
5142   that adding indexes started, and updates data dictionary with all associated
5143   indexes to be added.
5144  */
add_create_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5145 void Rdb_dict_manager::add_create_index(
5146     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5147     rocksdb::WriteBatch *const batch) const {
5148   for (const auto &gl_index_id : gl_index_ids) {
5149     // NO_LINT_DEBUG
5150     sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)",
5151                            gl_index_id.cf_id, gl_index_id.index_id);
5152     start_create_index(batch, gl_index_id);
5153   }
5154 }
5155 
5156 /*
5157   This function is supposed to be called by drop_index_thread, when it
5158   finished dropping any index, or at the completion of online index creation.
5159  */
finish_indexes_operation(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const5160 void Rdb_dict_manager::finish_indexes_operation(
5161     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5162     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5163   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5164               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5165 
5166   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5167   rocksdb::WriteBatch *const batch = wb.get();
5168 
5169   std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5170   get_ongoing_create_indexes(&incomplete_create_indexes);
5171 
5172   for (const auto &gl_index_id : gl_index_ids) {
5173     if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5174       end_ongoing_index_operation(batch, gl_index_id, dd_type);
5175 
5176       /*
5177         Remove the corresponding incomplete create indexes from data
5178         dictionary as well
5179       */
5180       if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5181         if (incomplete_create_indexes.count(gl_index_id)) {
5182           end_ongoing_index_operation(batch, gl_index_id,
5183                                       Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5184         }
5185       }
5186     }
5187 
5188     if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5189       delete_index_info(batch, gl_index_id);
5190     }
5191   }
5192   commit(batch);
5193 }
5194 
5195 /*
5196   This function is supposed to be called when initializing
5197   Rdb_dict_manager (at startup). If there is any index ids that are
5198   drop ongoing, printing out messages for diagnostics purposes.
5199  */
resume_drop_indexes() const5200 void Rdb_dict_manager::resume_drop_indexes() const {
5201   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5202   get_ongoing_drop_indexes(&gl_index_ids);
5203 
5204   uint max_index_id_in_dict = 0;
5205   get_max_index_id(&max_index_id_in_dict);
5206 
5207   for (const auto &gl_index_id : gl_index_ids) {
5208     log_start_drop_index(gl_index_id, "Resume");
5209     if (max_index_id_in_dict < gl_index_id.index_id) {
5210       // NO_LINT_DEBUG
5211       sql_print_error(
5212           "RocksDB: Found max index id %u from data dictionary "
5213           "but also found dropped index id (%u,%u) from drop_index "
5214           "dictionary. This should never happen and is possibly a "
5215           "bug.",
5216           max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
5217       abort();
5218     }
5219   }
5220 }
5221 
rollback_ongoing_index_creation() const5222 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5223   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5224   rocksdb::WriteBatch *const batch = wb.get();
5225 
5226   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5227   get_ongoing_create_indexes(&gl_index_ids);
5228 
5229   for (const auto &gl_index_id : gl_index_ids) {
5230     // NO_LINT_DEBUG
5231     sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)",
5232                            gl_index_id.cf_id, gl_index_id.index_id);
5233 
5234     start_drop_index(batch, gl_index_id);
5235   }
5236 
5237   commit(batch);
5238 }
5239 
log_start_drop_table(const std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,const char * const log_action) const5240 void Rdb_dict_manager::log_start_drop_table(
5241     const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5242     const char *const log_action) const {
5243   for (uint32 i = 0; i < n_keys; i++) {
5244     log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5245   }
5246 }
5247 
log_start_drop_index(GL_INDEX_ID gl_index_id,const char * log_action) const5248 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5249                                             const char *log_action) const {
5250   struct Rdb_index_info index_info;
5251   if (!get_index_info(gl_index_id, &index_info)) {
5252     /*
5253       If we don't find the index info, it could be that it's because it was a
5254       partially created index that isn't in the data dictionary yet that needs
5255       to be rolled back.
5256     */
5257     std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5258     get_ongoing_create_indexes(&incomplete_create_indexes);
5259 
5260     if (!incomplete_create_indexes.count(gl_index_id)) {
5261       /* If it's not a partially created index, something is very wrong. */
5262       // NO_LINT_DEBUG
5263       sql_print_error(
5264           "RocksDB: Failed to get column family info "
5265           "from index id (%u,%u). MyRocks data dictionary may "
5266           "get corrupted.",
5267           gl_index_id.cf_id, gl_index_id.index_id);
5268       if (rocksdb_ignore_datadic_errors)
5269       {
5270         sql_print_error("RocksDB: rocksdb_ignore_datadic_errors=1, "
5271                         "trying to continue");
5272         return;
5273       }
5274       abort();
5275     }
5276   }
5277 }
5278 
get_max_index_id(uint32_t * const index_id) const5279 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5280   bool found = false;
5281   std::string value;
5282 
5283   const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5284   if (status.ok()) {
5285     const uchar *const val = (const uchar *)value.c_str();
5286     const uint16_t version = rdb_netbuf_to_uint16(val);
5287     if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5288       *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5289       found = true;
5290     }
5291   }
5292   return found;
5293 }
5294 
update_max_index_id(rocksdb::WriteBatch * const batch,const uint32_t index_id) const5295 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5296                                            const uint32_t index_id) const {
5297   DBUG_ASSERT(batch != nullptr);
5298 
5299   uint32_t old_index_id = -1;
5300   if (get_max_index_id(&old_index_id)) {
5301     if (old_index_id > index_id) {
5302       // NO_LINT_DEBUG
5303       sql_print_error(
5304           "RocksDB: Found max index id %u from data dictionary "
5305           "but trying to update to older value %u. This should "
5306           "never happen and possibly a bug.",
5307           old_index_id, index_id);
5308       return true;
5309     }
5310   }
5311 
5312   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5313       value_writer;
5314   value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
5315   value_writer.write_uint32(index_id);
5316 
5317   batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
5318   return false;
5319 }
5320 
add_stats(rocksdb::WriteBatch * const batch,const std::vector<Rdb_index_stats> & stats) const5321 void Rdb_dict_manager::add_stats(
5322     rocksdb::WriteBatch *const batch,
5323     const std::vector<Rdb_index_stats> &stats) const {
5324   DBUG_ASSERT(batch != nullptr);
5325 
5326   for (const auto &it : stats) {
5327     Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5328     dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5329 
5330     // IndexStats::materialize takes complete care of serialization including
5331     // storing the version
5332     const auto value =
5333         Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5334 
5335     batch->Put(m_system_cfh, key_writer.to_slice(), value);
5336   }
5337 }
5338 
get_stats(GL_INDEX_ID gl_index_id) const5339 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5340   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5341   dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5342 
5343   std::string value;
5344   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5345   if (status.ok()) {
5346     std::vector<Rdb_index_stats> v;
5347     // unmaterialize checks if the version matches
5348     if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5349       return v[0];
5350     }
5351   }
5352 
5353   return Rdb_index_stats();
5354 }
5355 
put_auto_incr_val(rocksdb::WriteBatchBase * batch,const GL_INDEX_ID & gl_index_id,ulonglong val,bool overwrite) const5356 rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
5357     rocksdb::WriteBatchBase *batch, const GL_INDEX_ID &gl_index_id,
5358     ulonglong val, bool overwrite) const {
5359   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5360   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5361 
5362   // Value is constructed by storing the version and the value.
5363   Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5364                  ROCKSDB_SIZEOF_AUTOINC_VALUE>
5365       value_writer;
5366   value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
5367   value_writer.write_uint64(val);
5368 
5369   if (overwrite) {
5370     return batch->Put(m_system_cfh, key_writer.to_slice(),
5371                       value_writer.to_slice());
5372   }
5373   return batch->Merge(m_system_cfh, key_writer.to_slice(),
5374                       value_writer.to_slice());
5375 }
5376 
get_auto_incr_val(const GL_INDEX_ID & gl_index_id,ulonglong * new_val) const5377 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5378                                          ulonglong *new_val) const {
5379   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5380   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5381 
5382   std::string value;
5383   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5384 
5385   if (status.ok()) {
5386     const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5387 
5388     if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5389       *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5390       return true;
5391     }
5392   }
5393   return false;
5394 }
5395 
get_and_update_next_number(Rdb_dict_manager * const dict)5396 uint Rdb_seq_generator::get_and_update_next_number(
5397     Rdb_dict_manager *const dict) {
5398   DBUG_ASSERT(dict != nullptr);
5399 
5400   uint res;
5401   RDB_MUTEX_LOCK_CHECK(m_mutex);
5402 
5403   res = m_next_number++;
5404 
5405   const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
5406   rocksdb::WriteBatch *const batch = wb.get();
5407 
5408   DBUG_ASSERT(batch != nullptr);
5409   dict->update_max_index_id(batch, res);
5410   dict->commit(batch);
5411 
5412   RDB_MUTEX_UNLOCK_CHECK(m_mutex);
5413 
5414   return res;
5415 }
5416 
5417 }  // namespace myrocks
5418