1 /*
2    Copyright (c) 2012,2013 Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
16 
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation  // gcc: Class implementation
19 #endif
20 
21 /* For use of 'PRIu64': */
22 #define __STDC_FORMAT_MACROS
23 
24 #include <my_global.h>
25 /* This C++ file's header file */
26 #include "./rdb_datadic.h"
27 
28 #include <inttypes.h>
29 /* C++ standard header files */
30 #include <algorithm>
31 #include <array>
32 #include <limits>
33 #include <map>
34 #include <set>
35 #include <string>
36 #include <utility>
37 #include <vector>
38 
39 /* MySQL header files */
40 #include "./field.h"
41 #include "./key.h"
42 #include "./m_ctype.h"
43 #include "./my_bit.h"
44 #include "./my_bitmap.h"
45 #include "./sql_table.h"
46 
47 /* MyRocks header files */
48 #include "./ha_rocksdb.h"
49 #include "./ha_rocksdb_proto.h"
50 #include "./my_stacktrace.h"
51 #include "./rdb_cf_manager.h"
52 #include "./rdb_psi.h"
53 #include "./rdb_utils.h"
54 
55 namespace myrocks {
56 
57 void get_mem_comparable_space(const CHARSET_INFO *cs,
58                               const std::vector<uchar> **xfrm, size_t *xfrm_len,
59                               size_t *mb_len);
60 
61 /*
62   MariaDB's replacement for FB/MySQL Field::check_field_name_match :
63 */
field_check_field_name_match(Field * field,const char * name)64 inline bool field_check_field_name_match(Field *field, const char *name)
65 {
66   return (0 == my_strcasecmp(system_charset_info,
67                              field->field_name.str,
68                              name));
69 }
70 
71 
72 /*
73   Decode  current key field
74   @param  fpi               IN      data structure contains field metadata
75   @param  field             IN      current field
76   @param  reader            IN      key slice reader
77   @param  unp_reader        IN      unpack information reader
78   @return
79     HA_EXIT_SUCCESS    OK
80     other              HA_ERR error code
81 */
decode_field(Rdb_field_packing * fpi,Field * field,Rdb_string_reader * reader,const uchar * const default_value,Rdb_string_reader * unpack_reader)82 int Rdb_convert_to_record_key_decoder::decode_field(
83     Rdb_field_packing *fpi, Field *field, Rdb_string_reader *reader,
84     const uchar *const default_value, Rdb_string_reader *unpack_reader) {
85   if (fpi->m_maybe_null) {
86     const char *nullp;
87     if (!(nullp = reader->read(1))) {
88       return HA_EXIT_FAILURE;
89     }
90 
91     if (*nullp == 0) {
92       /* Set the NULL-bit of this field */
93       field->set_null();
94       /* Also set the field to its default value */
95       memcpy(field->ptr, default_value, field->pack_length());
96       return HA_EXIT_SUCCESS;
97     } else if (*nullp == 1) {
98       field->set_notnull();
99     } else {
100       return HA_EXIT_FAILURE;
101     }
102   }
103 
104   return (fpi->m_unpack_func)(fpi, field, field->ptr, reader, unpack_reader);
105 }
106 
107 /*
108   Decode  current key field
109 
110   @param  buf               OUT     the buf starting address
111   @param  offset            OUT     the bytes offset when data is written
112   @param  fpi               IN      data structure contains field metadata
113   @param  table             IN      current table
114   @param  field             IN      current field
115   @param  has_unpack_inf    IN      whether contains unpack inf
116   @param  reader            IN      key slice reader
117   @param  unp_reader        IN      unpack information reader
118   @return
119     HA_EXIT_SUCCESS    OK
120     other              HA_ERR error code
121 */
decode(uchar * const buf,uint * offset,Rdb_field_packing * fpi,TABLE * table,Field * field,bool has_unpack_info,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)122 int Rdb_convert_to_record_key_decoder::decode(
123     uchar *const buf, uint *offset, Rdb_field_packing *fpi, TABLE *table,
124     Field *field, bool has_unpack_info, Rdb_string_reader *reader,
125     Rdb_string_reader *unpack_reader) {
126   DBUG_ASSERT(buf != nullptr);
127   DBUG_ASSERT(offset != nullptr);
128 
129   uint field_offset = field->ptr - table->record[0];
130   *offset = field_offset;
131   uint null_offset = field->null_offset();
132   bool maybe_null = field->real_maybe_null();
133 
134   field->move_field(buf + field_offset,
135                     maybe_null ? buf + null_offset : nullptr, field->null_bit);
136 
137   // If we need unpack info, but there is none, tell the unpack function
138   // this by passing unp_reader as nullptr. If we never read unpack_info
139   // during unpacking anyway, then there won't an error.
140   bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
141 
142   int res =
143       decode_field(fpi, field, reader, table->s->default_values + field_offset,
144                    maybe_missing_unpack ? nullptr : unpack_reader);
145 
146   // Restore field->ptr and field->null_ptr
147   field->move_field(table->record[0] + field_offset,
148                     maybe_null ? table->record[0] + null_offset : nullptr,
149                     field->null_bit);
150   if (res != UNPACK_SUCCESS) {
151     return HA_ERR_ROCKSDB_CORRUPT_DATA;
152   }
153   return HA_EXIT_SUCCESS;
154 }
155 
156 /*
157   Skip current key field
158 
159   @param  fpi          IN    data structure contains field metadata
160   @param  field        IN    current field
161   @param  reader       IN    key slice reader
162   @param  unp_reader   IN    unpack information reader
163   @return
164     HA_EXIT_SUCCESS    OK
165     other              HA_ERR error code
166 */
skip(const Rdb_field_packing * fpi,const Field * field,Rdb_string_reader * reader,Rdb_string_reader * unp_reader)167 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
168                                             const Field *field,
169                                             Rdb_string_reader *reader,
170                                             Rdb_string_reader *unp_reader) {
171   /* It is impossible to unpack the column. Skip it. */
172   if (fpi->m_maybe_null) {
173     const char *nullp;
174     if (!(nullp = reader->read(1))) {
175       return HA_ERR_ROCKSDB_CORRUPT_DATA;
176     }
177     if (*nullp == 0) {
178       /* This is a NULL value */
179       return HA_EXIT_SUCCESS;
180     }
181     /* If NULL marker is not '0', it can be only '1'  */
182     if (*nullp != 1) {
183       return HA_ERR_ROCKSDB_CORRUPT_DATA;
184     }
185   }
186   if ((fpi->m_skip_func)(fpi, field, reader)) {
187     return HA_ERR_ROCKSDB_CORRUPT_DATA;
188   }
189   // If this is a space padded varchar, we need to skip the indicator
190   // bytes for trailing bytes. They're useless since we can't restore the
191   // field anyway.
192   //
193   // There is a special case for prefixed varchars where we do not
194   // generate unpack info, because we know prefixed varchars cannot be
195   // unpacked. In this case, it is not necessary to skip.
196   if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
197       !fpi->m_unpack_info_stores_value) {
198     unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
199   }
200   return HA_EXIT_SUCCESS;
201 }
202 
Rdb_key_field_iterator(const Rdb_key_def * key_def,Rdb_field_packing * pack_info,Rdb_string_reader * reader,Rdb_string_reader * unp_reader,TABLE * table,bool has_unpack_info,const MY_BITMAP * covered_bitmap,uchar * const buf)203 Rdb_key_field_iterator::Rdb_key_field_iterator(
204     const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
205     Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
206     bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
207   m_key_def = key_def;
208   m_pack_info = pack_info;
209   m_iter_index = 0;
210   m_iter_end = key_def->get_key_parts();
211   m_reader = reader;
212   m_unp_reader = unp_reader;
213   m_table = table;
214   m_has_unpack_info = has_unpack_info;
215   m_covered_bitmap = covered_bitmap;
216   m_buf = buf;
217   m_secondary_key =
218       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
219   m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
220   m_is_hidden_pk =
221       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
222   m_curr_bitmap_pos = 0;
223   m_offset = 0;
224 }
225 
get_dst() const226 void *Rdb_key_field_iterator::get_dst() const { return m_buf + m_offset; }
227 
get_field_index() const228 int Rdb_key_field_iterator::get_field_index() const {
229   DBUG_ASSERT(m_field != nullptr);
230   return m_field->field_index;
231 }
232 
get_is_null() const233 bool Rdb_key_field_iterator::get_is_null() const { return m_is_null; }
get_field() const234 Field *Rdb_key_field_iterator::get_field() const {
235   DBUG_ASSERT(m_field != nullptr);
236   return m_field;
237 }
238 
has_next()239 bool Rdb_key_field_iterator::has_next() { return m_iter_index < m_iter_end; }
240 
241 /**
242  Iterate each field in the key and decode/skip one by one
243 */
next()244 int Rdb_key_field_iterator::next() {
245   int status = HA_EXIT_SUCCESS;
246   while (m_iter_index < m_iter_end) {
247     int curr_index = m_iter_index++;
248 
249     m_fpi = &m_pack_info[curr_index];
250     /*
251       Hidden pk field is packed at the end of the secondary keys, but the SQL
252       layer does not know about it. Skip retrieving field if hidden pk.
253     */
254     if ((m_secondary_key && m_hidden_pk_exists &&
255          curr_index + 1 == m_iter_end) ||
256         m_is_hidden_pk) {
257       DBUG_ASSERT(m_fpi->m_unpack_func);
258       if ((m_fpi->m_skip_func)(m_fpi, nullptr, m_reader)) {
259         return HA_ERR_ROCKSDB_CORRUPT_DATA;
260       }
261       return HA_EXIT_SUCCESS;
262     }
263 
264     m_field = m_fpi->get_field_in_table(m_table);
265 
266     bool covered_column = true;
267     if (m_covered_bitmap != nullptr &&
268         m_field->real_type() == MYSQL_TYPE_VARCHAR && !m_fpi->m_covered) {
269       covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
270                        bitmap_is_set(m_covered_bitmap, m_curr_bitmap_pos++);
271     }
272 
273     if (m_fpi->m_unpack_func && covered_column) {
274       /* It is possible to unpack this column. Do it. */
275       status = Rdb_convert_to_record_key_decoder::decode(
276           m_buf, &m_offset, m_fpi, m_table, m_field, m_has_unpack_info,
277           m_reader, m_unp_reader);
278       if (status) {
279         return status;
280       }
281       break;
282     } else {
283       status = Rdb_convert_to_record_key_decoder::skip(m_fpi, m_field, m_reader,
284                                                        m_unp_reader);
285       if (status) {
286         return status;
287       }
288     }
289   }
290   return HA_EXIT_SUCCESS;
291 }
292 
293 /*
294   Rdb_key_def class implementation
295 */
Rdb_key_def(uint indexnr_arg,uint keyno_arg,rocksdb::ColumnFamilyHandle * cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_per_partition_cf_arg,const char * _name,Rdb_index_stats _stats,uint32 index_flags_bitmap,uint32 ttl_rec_offset,uint64 ttl_duration)296 Rdb_key_def::Rdb_key_def(uint indexnr_arg, uint keyno_arg,
297                          rocksdb::ColumnFamilyHandle *cf_handle_arg,
298                          uint16_t index_dict_version_arg, uchar index_type_arg,
299                          uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
300                          bool is_per_partition_cf_arg, const char *_name,
301                          Rdb_index_stats _stats, uint32 index_flags_bitmap,
302                          uint32 ttl_rec_offset, uint64 ttl_duration)
303     : m_index_number(indexnr_arg),
304       m_cf_handle(cf_handle_arg),
305       m_index_dict_version(index_dict_version_arg),
306       m_index_type(index_type_arg),
307       m_kv_format_version(kv_format_version_arg),
308       m_is_reverse_cf(is_reverse_cf_arg),
309       m_is_per_partition_cf(is_per_partition_cf_arg),
310       m_name(_name),
311       m_stats(_stats),
312       m_index_flags_bitmap(index_flags_bitmap),
313       m_ttl_rec_offset(ttl_rec_offset),
314       m_ttl_duration(ttl_duration),
315       m_ttl_column(""),
316       m_pk_part_no(nullptr),
317       m_pack_info(nullptr),
318       m_keyno(keyno_arg),
319       m_key_parts(0),
320       m_ttl_pk_key_part_offset(UINT_MAX),
321       m_ttl_field_index(UINT_MAX),
322       m_prefix_extractor(nullptr),
323       m_maxlength(0)  // means 'not intialized'
324 {
325   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
326   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
327   m_total_index_flags_length =
328       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
329   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
330                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
331                   m_total_index_flags_length == 0);
332   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
333                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
334                   m_total_index_flags_length == 0);
335   DBUG_ASSERT(m_cf_handle != nullptr);
336 }
337 
Rdb_key_def(const Rdb_key_def & k)338 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
339     : m_index_number(k.m_index_number),
340       m_cf_handle(k.m_cf_handle),
341       m_is_reverse_cf(k.m_is_reverse_cf),
342       m_is_per_partition_cf(k.m_is_per_partition_cf),
343       m_name(k.m_name),
344       m_stats(k.m_stats),
345       m_index_flags_bitmap(k.m_index_flags_bitmap),
346       m_ttl_rec_offset(k.m_ttl_rec_offset),
347       m_ttl_duration(k.m_ttl_duration),
348       m_ttl_column(k.m_ttl_column),
349       m_pk_part_no(k.m_pk_part_no),
350       m_pack_info(k.m_pack_info),
351       m_keyno(k.m_keyno),
352       m_key_parts(k.m_key_parts),
353       m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
354       m_ttl_field_index(UINT_MAX),
355       m_prefix_extractor(k.m_prefix_extractor),
356       m_maxlength(k.m_maxlength) {
357   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
358   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
359   m_total_index_flags_length =
360       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
361   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
362                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
363                   m_total_index_flags_length == 0);
364   DBUG_ASSERT_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
365                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
366                   m_total_index_flags_length == 0);
367   if (k.m_pack_info) {
368     const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
369     void *pack_info= my_malloc(size, MYF(0));
370     memcpy(pack_info, k.m_pack_info, size);
371     m_pack_info = reinterpret_cast<Rdb_field_packing *>(pack_info);
372   }
373 
374   if (k.m_pk_part_no) {
375     const size_t size = sizeof(uint) * m_key_parts;
376     m_pk_part_no = reinterpret_cast<uint *>(my_malloc(size, MYF(0)));
377     memcpy(m_pk_part_no, k.m_pk_part_no, size);
378   }
379 }
380 
~Rdb_key_def()381 Rdb_key_def::~Rdb_key_def() {
382   mysql_mutex_destroy(&m_mutex);
383 
384   my_free(m_pk_part_no);
385   m_pk_part_no = nullptr;
386 
387   my_free(m_pack_info);
388   m_pack_info = nullptr;
389 }
390 
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)391 void Rdb_key_def::setup(const TABLE *const tbl,
392                         const Rdb_tbl_def *const tbl_def) {
393   DBUG_ASSERT(tbl != nullptr);
394   DBUG_ASSERT(tbl_def != nullptr);
395 
396   /*
397     Set max_length based on the table.  This can be called concurrently from
398     multiple threads, so there is a mutex to protect this code.
399   */
400   const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
401   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
402   const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
403   if (!m_maxlength) {
404     RDB_MUTEX_LOCK_CHECK(m_mutex);
405     if (m_maxlength != 0) {
406       RDB_MUTEX_UNLOCK_CHECK(m_mutex);
407       return;
408     }
409 
410     KEY *key_info = nullptr;
411     KEY *pk_info = nullptr;
412     if (!is_hidden_pk) {
413       key_info = &tbl->key_info[m_keyno];
414       if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
415       m_name = std::string(key_info->name.str);
416     } else {
417       m_name = HIDDEN_PK_NAME;
418     }
419 
420     if (secondary_key) {
421       m_pk_key_parts= hidden_pk_exists ? 1 : pk_info->ext_key_parts;
422     } else {
423       pk_info = nullptr;
424       m_pk_key_parts = 0;
425     }
426 
427     // "unique" secondary keys support:
428     m_key_parts= is_hidden_pk ? 1 : key_info->ext_key_parts;
429 
430     if (secondary_key) {
431       /*
432         In most cases, SQL layer puts PK columns as invisible suffix at the
433         end of secondary key. There are cases where this doesn't happen:
434         - unique secondary indexes.
435         - partitioned tables.
436 
437         Internally, we always need PK columns as suffix (and InnoDB does,
438         too, if you were wondering).
439 
440         The loop below will attempt to put all PK columns at the end of key
441         definition.  Columns that are already included in the index (either
442         by the user or by "extended keys" feature) are not included for the
443         second time.
444       */
445       m_key_parts += m_pk_key_parts;
446     }
447 
448     if (secondary_key) {
449       m_pk_part_no = reinterpret_cast<uint *>(
450           my_malloc(sizeof(uint) * m_key_parts, MYF(0)));
451     } else {
452       m_pk_part_no = nullptr;
453     }
454 
455     const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
456     m_pack_info =
457         reinterpret_cast<Rdb_field_packing *>(my_malloc(size, MYF(0)));
458 
459     /*
460       Guaranteed not to error here as checks have been made already during
461       table creation.
462     */
463     Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
464                                  &m_ttl_field_index, true);
465 
466     size_t max_len = INDEX_NUMBER_SIZE;
467     int unpack_len = 0;
468     int max_part_len = 0;
469     bool simulating_extkey = false;
470     uint dst_i = 0;
471 
472     uint keyno_to_set = m_keyno;
473     uint keypart_to_set = 0;
474 
475     if (is_hidden_pk) {
476       Field *field = nullptr;
477       m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
478       m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
479       max_len += m_pack_info[dst_i].m_max_image_len;
480       max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
481       dst_i++;
482     } else {
483       KEY_PART_INFO *key_part = key_info->key_part;
484 
485       /* this loop also loops over the 'extended key' tail */
486       for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
487         Field *const field = key_part ? key_part->field : nullptr;
488 
489         if (simulating_extkey && !hidden_pk_exists) {
490           DBUG_ASSERT(secondary_key);
491           /* Check if this field is already present in the key definition */
492           bool found = false;
493           for (uint j= 0; j < key_info->ext_key_parts; j++) {
494             if (field->field_index ==
495                     key_info->key_part[j].field->field_index &&
496                 key_part->length == key_info->key_part[j].length) {
497               found = true;
498               break;
499             }
500           }
501 
502           if (found) {
503             key_part++;
504             continue;
505           }
506         }
507 
508         if (field && field->real_maybe_null()) max_len += 1;  // NULL-byte
509 
510         m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
511                                  key_part ? key_part->length : 0);
512         m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
513 
514         if (pk_info) {
515           m_pk_part_no[dst_i] = -1;
516           for (uint j = 0; j < m_pk_key_parts; j++) {
517             if (field->field_index == pk_info->key_part[j].field->field_index) {
518               m_pk_part_no[dst_i] = j;
519               break;
520             }
521           }
522         } else if (secondary_key && hidden_pk_exists) {
523           /*
524             The hidden pk can never be part of the sk.  So it is always
525             appended to the end of the sk.
526           */
527           m_pk_part_no[dst_i] = -1;
528           if (simulating_extkey) m_pk_part_no[dst_i] = 0;
529         }
530 
531         max_len += m_pack_info[dst_i].m_max_image_len;
532 
533         max_part_len =
534             std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
535 
536         /*
537           Check key part name here, if it matches the TTL column then we store
538           the offset of the TTL key part here.
539         */
540         if (!m_ttl_column.empty() &&
541             field_check_field_name_match(field, m_ttl_column.c_str())) {
542           DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
543           DBUG_ASSERT(field->key_type() == HA_KEYTYPE_ULONGLONG);
544           DBUG_ASSERT(!field->real_maybe_null());
545           m_ttl_pk_key_part_offset = dst_i;
546         }
547 
548         key_part++;
549         /*
550           For "unique" secondary indexes, pretend they have
551           "index extensions".
552 
553           MariaDB also has this property: if an index has a partially-covered
554           column like KEY(varchar_col(N)), then the SQL layer will think it is
555           not "extended" with PK columns. The code below handles this case,
556           also.
557          */
558         if (secondary_key && src_i+1 == key_info->ext_key_parts) {
559           simulating_extkey = true;
560           if (!hidden_pk_exists) {
561             keyno_to_set = tbl->s->primary_key;
562             key_part = pk_info->key_part;
563             keypart_to_set = (uint)-1;
564           } else {
565             keyno_to_set = tbl_def->m_key_count - 1;
566             key_part = nullptr;
567             keypart_to_set = 0;
568           }
569         }
570 
571         dst_i++;
572       }
573     }
574 
575     m_key_parts = dst_i;
576 
577     /* Initialize the memory needed by the stats structure */
578     m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
579 
580     /* Cache prefix extractor for bloom filter usage later */
581     rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
582     m_prefix_extractor = opt.prefix_extractor;
583 
584     /*
585       This should be the last member variable set before releasing the mutex
586       so that other threads can't see the object partially set up.
587      */
588     m_maxlength = max_len;
589 
590     RDB_MUTEX_UNLOCK_CHECK(m_mutex);
591   }
592 }
593 
594 /*
595   Determine if the table has TTL enabled by parsing the table comment.
596 
597   @param[IN]  table_arg
598   @param[IN]  tbl_def_arg
599   @param[OUT] ttl_duration        Default TTL value parsed from table comment
600 */
extract_ttl_duration(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,uint64 * ttl_duration)601 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
602                                        const Rdb_tbl_def *const tbl_def_arg,
603                                        uint64 *ttl_duration) {
604   DBUG_ASSERT(table_arg != nullptr);
605   DBUG_ASSERT(tbl_def_arg != nullptr);
606   DBUG_ASSERT(ttl_duration != nullptr);
607   std::string table_comment(table_arg->s->comment.str,
608                             table_arg->s->comment.length);
609 
610   bool ttl_duration_per_part_match_found = false;
611   std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
612       table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
613       RDB_TTL_DURATION_QUALIFIER);
614 
615   /* If we don't have a ttl duration, nothing to do here. */
616   if (ttl_duration_str.empty()) {
617     return HA_EXIT_SUCCESS;
618   }
619 
620   /*
621     Catch errors where a non-integral value was used as ttl duration, strtoull
622     will return 0.
623   */
624   *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
625   if (!*ttl_duration) {
626     my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
627     return HA_EXIT_FAILURE;
628   }
629 
630   return HA_EXIT_SUCCESS;
631 }
632 
633 /*
634   Determine if the table has TTL enabled by parsing the table comment.
635 
636   @param[IN]  table_arg
637   @param[IN]  tbl_def_arg
638   @param[OUT] ttl_column          TTL column in the table
639   @param[IN]  skip_checks         Skip validation checks (when called in
640                                   setup())
641 */
extract_ttl_col(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,std::string * ttl_column,uint * ttl_field_index,bool skip_checks)642 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
643                                   const Rdb_tbl_def *const tbl_def_arg,
644                                   std::string *ttl_column,
645                                   uint *ttl_field_index, bool skip_checks) {
646   std::string table_comment(table_arg->s->comment.str,
647                             table_arg->s->comment.length);
648   /*
649     Check if there is a TTL column specified. Note that this is not required
650     and if omitted, an 8-byte ttl field will be prepended to each record
651     implicitly.
652   */
653   bool ttl_col_per_part_match_found = false;
654   std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
655       table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
656       RDB_TTL_COL_QUALIFIER);
657 
658   if (skip_checks) {
659     for (uint i = 0; i < table_arg->s->fields; i++) {
660       Field *const field = table_arg->field[i];
661       if (field_check_field_name_match(field, ttl_col_str.c_str())) {
662         *ttl_column = ttl_col_str;
663         *ttl_field_index = i;
664       }
665     }
666     return HA_EXIT_SUCCESS;
667   }
668 
669   /* Check if TTL column exists in table */
670   if (!ttl_col_str.empty()) {
671     bool found = false;
672     for (uint i = 0; i < table_arg->s->fields; i++) {
673       Field *const field = table_arg->field[i];
674       if (field_check_field_name_match(field, ttl_col_str.c_str()) &&
675           field->real_type() == MYSQL_TYPE_LONGLONG &&
676           field->key_type() == HA_KEYTYPE_ULONGLONG &&
677           !field->real_maybe_null()) {
678         *ttl_column = ttl_col_str;
679         *ttl_field_index = i;
680         found = true;
681         break;
682       }
683     }
684 
685     if (!found) {
686       my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
687       return HA_EXIT_FAILURE;
688     }
689   }
690 
691   return HA_EXIT_SUCCESS;
692 }
693 
gen_qualifier_for_table(const char * const qualifier,const std::string & partition_name)694 const std::string Rdb_key_def::gen_qualifier_for_table(
695     const char *const qualifier, const std::string &partition_name) {
696   bool has_partition = !partition_name.empty();
697   std::string qualifier_str = "";
698 
699   if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
700     return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
701                          : qualifier_str + RDB_CF_NAME_QUALIFIER +
702                                RDB_QUALIFIER_VALUE_SEP;
703   } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
704     return has_partition
705                ? gen_ttl_duration_qualifier_for_partition(partition_name)
706                : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
707                      RDB_QUALIFIER_VALUE_SEP;
708   } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
709     return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
710                          : qualifier_str + RDB_TTL_COL_QUALIFIER +
711                                RDB_QUALIFIER_VALUE_SEP;
712   } else {
713     DBUG_ASSERT(0);
714   }
715 
716   return qualifier_str;
717 }
718 
719 /*
720   Formats the string and returns the column family name assignment part for a
721   specific partition.
722 */
gen_cf_name_qualifier_for_partition(const std::string & prefix)723 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
724     const std::string &prefix) {
725   DBUG_ASSERT(!prefix.empty());
726 
727   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
728          RDB_QUALIFIER_VALUE_SEP;
729 }
730 
gen_ttl_duration_qualifier_for_partition(const std::string & prefix)731 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
732     const std::string &prefix) {
733   DBUG_ASSERT(!prefix.empty());
734 
735   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
736          RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
737 }
738 
gen_ttl_col_qualifier_for_partition(const std::string & prefix)739 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
740     const std::string &prefix) {
741   DBUG_ASSERT(!prefix.empty());
742 
743   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
744          RDB_QUALIFIER_VALUE_SEP;
745 }
746 
parse_comment_for_qualifier(const std::string & comment,const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,bool * per_part_match_found,const char * const qualifier)747 const std::string Rdb_key_def::parse_comment_for_qualifier(
748     const std::string &comment, const TABLE *const table_arg,
749     const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
750     const char *const qualifier) {
751   DBUG_ASSERT(table_arg != nullptr);
752   DBUG_ASSERT(tbl_def_arg != nullptr);
753   DBUG_ASSERT(per_part_match_found != nullptr);
754   DBUG_ASSERT(qualifier != nullptr);
755 
756   std::string empty_result;
757 
758   // Flag which marks if partition specific options were found.
759   *per_part_match_found = false;
760 
761   if (comment.empty()) {
762     return empty_result;
763   }
764 
765   // Let's fetch the comment for a index and check if there's a custom key
766   // name specified for a partition we are handling.
767   std::vector<std::string> v =
768       myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
769 
770   std::string search_str = gen_qualifier_for_table(qualifier);
771 
772   // If table has partitions then we need to check if user has requested
773   // qualifiers on a per partition basis.
774   //
775   // NOTE: this means if you specify a qualifier for a specific partition it
776   // will take precedence the 'table level' qualifier if one exists.
777   std::string search_str_part;
778   if (IF_PARTITIONING(table_arg->part_info,nullptr) != nullptr) {
779     std::string partition_name = tbl_def_arg->base_partition();
780     DBUG_ASSERT(!partition_name.empty());
781     search_str_part = gen_qualifier_for_table(qualifier, partition_name);
782   }
783 
784   DBUG_ASSERT(!search_str.empty());
785 
786   // Basic O(N) search for a matching assignment. At most we expect maybe
787   // ten or so elements here.
788   if (!search_str_part.empty()) {
789     for (const auto &it : v) {
790       if (it.substr(0, search_str_part.length()) == search_str_part) {
791         // We found a prefix match. Try to parse it as an assignment.
792         std::vector<std::string> tokens =
793             myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
794 
795         // We found a custom qualifier, it was in the form we expected it to be.
796         // Return that instead of whatever we initially wanted to return. In
797         // a case below the `foo` part will be returned to the caller.
798         //
799         // p3_cfname=foo
800         //
801         // If no value was specified then we'll return an empty string which
802         // later gets translated into using a default CF.
803         if (tokens.size() == 2) {
804           *per_part_match_found = true;
805           return tokens[1];
806         } else {
807           return empty_result;
808         }
809       }
810     }
811   }
812 
813   // Do this loop again, this time searching for 'table level' qualifiers if we
814   // didn't find any partition level qualifiers above.
815   for (const auto &it : v) {
816     if (it.substr(0, search_str.length()) == search_str) {
817       std::vector<std::string> tokens =
818           myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
819       if (tokens.size() == 2) {
820         return tokens[1];
821       } else {
822         return empty_result;
823       }
824     }
825   }
826 
827   // If we didn't find any partitioned/non-partitioned qualifiers, return an
828   // empty string.
829   return empty_result;
830 }
831 
832 /**
833   Read a memcmp key part from a slice using the passed in reader.
834 
835   Returns -1 if field was null, 1 if error, 0 otherwise.
836 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const837 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
838                                       Rdb_string_reader *reader,
839                                       const uint part_num) const {
840   /* It is impossible to unpack the column. Skip it. */
841   if (m_pack_info[part_num].m_maybe_null) {
842     const char *nullp;
843     if (!(nullp = reader->read(1))) return 1;
844     if (*nullp == 0) {
845       /* This is a NULL value */
846       return -1;
847     } else {
848       /* If NULL marker is not '0', it can be only '1'  */
849       if (*nullp != 1) return 1;
850     }
851   }
852 
853   Rdb_field_packing *fpi = &m_pack_info[part_num];
854   DBUG_ASSERT(table_arg->s != nullptr);
855 
856   bool is_hidden_pk_part = (part_num + 1 == m_key_parts) &&
857                            (table_arg->s->primary_key == MAX_INDEXES);
858   Field *field = nullptr;
859   if (!is_hidden_pk_part) {
860     field = fpi->get_field_in_table(table_arg);
861   }
862   if ((fpi->m_skip_func)(fpi, field, reader)) {
863     return 1;
864   }
865   return 0;
866 }
867 
868 /**
869   Get a mem-comparable form of Primary Key from mem-comparable form of this key
870 
871   @param
872     pk_descr        Primary Key descriptor
873     key             Index tuple from this key in mem-comparable form
874     pk_buffer  OUT  Put here mem-comparable form of the Primary Key.
875 
876   @note
877     It may or may not be possible to restore primary key columns to their
878     mem-comparable form.  To handle all cases, this function copies mem-
879     comparable forms directly.
880 
881     RocksDB SE supports "Extended keys". This means that PK columns are present
882     at the end of every key.  If the key already includes PK columns, then
883     these columns are not present at the end of the key.
884 
885     Because of the above, we copy each primary key column.
886 
887   @todo
888     If we checked crc32 checksums in this function, we would catch some CRC
889     violations that we currently don't. On the other hand, there is a broader
890     set of queries for which we would check the checksum twice.
891 */
892 
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const893 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
894                                         const Rdb_key_def &pk_descr,
895                                         const rocksdb::Slice *const key,
896                                         uchar *const pk_buffer) const {
897   DBUG_ASSERT(table != nullptr);
898   DBUG_ASSERT(key != nullptr);
899   DBUG_ASSERT(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
900   DBUG_ASSERT(pk_buffer);
901 
902   uint size = 0;
903   uchar *buf = pk_buffer;
904   DBUG_ASSERT(m_pk_key_parts);
905 
906   /* Put the PK number */
907   rdb_netbuf_store_index(buf, pk_descr.m_index_number);
908   buf += INDEX_NUMBER_SIZE;
909   size += INDEX_NUMBER_SIZE;
910 
911   const char *start_offs[MAX_REF_PARTS];
912   const char *end_offs[MAX_REF_PARTS];
913   int pk_key_part;
914   uint i;
915   Rdb_string_reader reader(key);
916 
917   // Skip the index number
918   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
919 
920   for (i = 0; i < m_key_parts; i++) {
921     if ((pk_key_part = m_pk_part_no[i]) != -1) {
922       start_offs[pk_key_part] = reader.get_current_ptr();
923     }
924 
925     if (read_memcmp_key_part(table, &reader, i) > 0) {
926       return RDB_INVALID_KEY_LEN;
927     }
928 
929     if (pk_key_part != -1) {
930       end_offs[pk_key_part] = reader.get_current_ptr();
931     }
932   }
933 
934   for (i = 0; i < m_pk_key_parts; i++) {
935     const uint part_size = end_offs[i] - start_offs[i];
936     memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
937     buf += part_size;
938     size += part_size;
939   }
940 
941   return size;
942 }
943 
944 /**
945   Get a mem-comparable form of Secondary Key from mem-comparable form of this
946   key, without the extended primary key tail.
947 
948   @param
949     key                Index tuple from this key in mem-comparable form
950     sk_buffer     OUT  Put here mem-comparable form of the Secondary Key.
951     n_null_fields OUT  Put number of null fields contained within sk entry
952 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const953 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
954                                       const rocksdb::Slice &key,
955                                       uchar *sk_buffer,
956                                       uint *n_null_fields) const {
957   DBUG_ASSERT(table != nullptr);
958   DBUG_ASSERT(sk_buffer != nullptr);
959   DBUG_ASSERT(n_null_fields != nullptr);
960   DBUG_ASSERT(m_keyno != table->s->primary_key && !table_has_hidden_pk(table));
961 
962   uchar *buf = sk_buffer;
963 
964   int res;
965   Rdb_string_reader reader(&key);
966   const char *start = reader.get_current_ptr();
967 
968   // Skip the index number
969   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
970 
971   for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
972     if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
973       return RDB_INVALID_KEY_LEN;
974     } else if (res == -1) {
975       (*n_null_fields)++;
976     }
977   }
978 
979   uint sk_memcmp_len = reader.get_current_ptr() - start;
980   memcpy(buf, start, sk_memcmp_len);
981   return sk_memcmp_len;
982 }
983 
984 /**
985   Convert index tuple into storage (i.e. mem-comparable) format
986 
987   @detail
988     Currently this is done by unpacking into table->record[0] and then
989     packing index columns into storage format.
990 
991   @param pack_buffer Temporary area for packing varchar columns. Its
992                      size is at least max_storage_fmt_length() bytes.
993 */
994 
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,const uchar * const key_tuple,const key_part_map & keypart_map) const995 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
996                                    uchar *const packed_tuple,
997                                    const uchar *const key_tuple,
998                                    const key_part_map &keypart_map) const {
999   DBUG_ASSERT(tbl != nullptr);
1000   DBUG_ASSERT(pack_buffer != nullptr);
1001   DBUG_ASSERT(packed_tuple != nullptr);
1002   DBUG_ASSERT(key_tuple != nullptr);
1003 
1004   /* We were given a record in KeyTupleFormat. First, save it to record */
1005   const uint key_len = calculate_key_len(tbl, m_keyno, key_tuple, keypart_map);
1006   key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
1007 
1008   uint n_used_parts = my_count_bits(keypart_map);
1009   if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0;  // Full key is used
1010 
1011   /* Then, convert the record into a mem-comparable form */
1012   return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
1013                      false, 0, n_used_parts);
1014 }
1015 
1016 /**
1017   @brief
1018     Check if "unpack info" data includes checksum.
1019 
1020   @detail
1021     This is used only by CHECK TABLE to count the number of rows that have
1022     checksums.
1023 */
1024 
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)1025 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
1026   size_t size = unpack_info.size();
1027   if (size == 0) {
1028     return false;
1029   }
1030   const uchar *ptr = (const uchar *)unpack_info.data();
1031 
1032   // Skip unpack info if present.
1033   if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
1034     const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
1035     SHIP_ASSERT(size >= skip_len);
1036 
1037     size -= skip_len;
1038     ptr += skip_len;
1039   }
1040 
1041   return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
1042 }
1043 
1044 /*
1045   @return Number of bytes that were changed
1046 */
successor(uchar * const packed_tuple,const uint len)1047 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
1048   DBUG_ASSERT(packed_tuple != nullptr);
1049 
1050   int changed = 0;
1051   uchar *p = packed_tuple + len - 1;
1052   for (; p > packed_tuple; p--) {
1053     changed++;
1054     if (*p != uchar(0xFF)) {
1055       *p = *p + 1;
1056       break;
1057     }
1058     *p = '\0';
1059   }
1060   return changed;
1061 }
1062 
1063 /*
1064   @return Number of bytes that were changed
1065 */
predecessor(uchar * const packed_tuple,const uint len)1066 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
1067   DBUG_ASSERT(packed_tuple != nullptr);
1068 
1069   int changed = 0;
1070   uchar *p = packed_tuple + len - 1;
1071   for (; p > packed_tuple; p--) {
1072     changed++;
1073     if (*p != uchar(0x00)) {
1074       *p = *p - 1;
1075       break;
1076     }
1077     *p = 0xFF;
1078   }
1079   return changed;
1080 }
1081 
1082 static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
1083     {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
1084     {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
1085 
1086 /*
1087   @return The length in bytes of the header specified by the given tag
1088 */
get_unpack_header_size(char tag)1089 size_t Rdb_key_def::get_unpack_header_size(char tag) {
1090   DBUG_ASSERT(is_unpack_data_tag(tag));
1091   return UNPACK_HEADER_SIZES.at(tag);
1092 }
1093 
1094 /*
1095   Get a bitmap indicating which varchar columns must be covered for this
1096   lookup to be covered. If the bitmap is a subset of the covered bitmap, then
1097   the lookup is covered. If it can already be determined that the lookup is
1098   not covered, map->bitmap will be set to null.
1099  */
get_lookup_bitmap(const TABLE * table,MY_BITMAP * map) const1100 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
1101   DBUG_ASSERT(map->bitmap == nullptr);
1102   bitmap_init(map, nullptr, MAX_REF_PARTS, false);
1103   uint curr_bitmap_pos = 0;
1104 
1105   // Indicates which columns in the read set might be covered.
1106   MY_BITMAP maybe_covered_bitmap;
1107   bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
1108 
1109   for (uint i = 0; i < m_key_parts; i++) {
1110     if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
1111       continue;
1112     }
1113 
1114     Field *const field = m_pack_info[i].get_field_in_table(table);
1115 
1116     // Columns which are always covered are not stored in the covered bitmap so
1117     // we can ignore them here too.
1118     if (m_pack_info[i].m_covered &&
1119         bitmap_is_set(table->read_set, field->field_index)) {
1120       bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1121       continue;
1122     }
1123 
1124     switch (field->real_type()) {
1125       // This type may be covered depending on the record. If it was requested,
1126       // we require the covered bitmap to have this bit set.
1127       case MYSQL_TYPE_VARCHAR:
1128         if (curr_bitmap_pos < MAX_REF_PARTS) {
1129           if (bitmap_is_set(table->read_set, field->field_index)) {
1130             bitmap_set_bit(map, curr_bitmap_pos);
1131             bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1132           }
1133           curr_bitmap_pos++;
1134         } else {
1135           bitmap_free(&maybe_covered_bitmap);
1136           bitmap_free(map);
1137           return;
1138         }
1139         break;
1140       // This column is a type which is never covered. If it was requested, we
1141       // know this lookup will never be covered.
1142       default:
1143         if (bitmap_is_set(table->read_set, field->field_index)) {
1144           bitmap_free(&maybe_covered_bitmap);
1145           bitmap_free(map);
1146           return;
1147         }
1148         break;
1149     }
1150   }
1151 
1152   // If there are columns which are not covered in the read set, the lookup
1153   // can't be covered.
1154   if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
1155     bitmap_free(map);
1156   }
1157   bitmap_free(&maybe_covered_bitmap);
1158 }
1159 
1160 /*
1161   Return true if for this secondary index
1162   - All of the requested columns are in the index
1163   - All values for columns that are prefix-only indexes are shorter or equal
1164     in length to the prefix
1165  */
covers_lookup(const rocksdb::Slice * const unpack_info,const MY_BITMAP * const lookup_bitmap) const1166 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
1167                                 const MY_BITMAP *const lookup_bitmap) const {
1168   DBUG_ASSERT(lookup_bitmap != nullptr);
1169   if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
1170     return false;
1171   }
1172 
1173   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1174 
1175   // Check if this unpack_info has a covered_bitmap
1176   const char *unpack_header = unp_reader.get_current_ptr();
1177   const bool has_covered_unpack_info =
1178       unp_reader.remaining_bytes() &&
1179       unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
1180   if (!has_covered_unpack_info ||
1181       !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
1182     return false;
1183   }
1184 
1185   MY_BITMAP covered_bitmap;
1186   my_bitmap_map covered_bits;
1187   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1188   covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1189                                       sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1190                                       RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1191 
1192   return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
1193 }
1194 
1195 /* Indicates that all key parts can be unpacked to cover a secondary lookup */
can_cover_lookup() const1196 bool Rdb_key_def::can_cover_lookup() const {
1197   for (uint i = 0; i < m_key_parts; i++) {
1198     if (!m_pack_info[i].m_covered) return false;
1199   }
1200   return true;
1201 }
1202 
pack_field(Field * const field,Rdb_field_packing * pack_info,uchar * tuple,uchar * const packed_tuple,uchar * const pack_buffer,Rdb_string_writer * const unpack_info,uint * const n_null_fields) const1203 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
1204                                uchar *tuple, uchar *const packed_tuple,
1205                                uchar *const pack_buffer,
1206                                Rdb_string_writer *const unpack_info,
1207                                uint *const n_null_fields) const {
1208   if (field->real_maybe_null()) {
1209     DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 1));
1210     if (field->is_real_null()) {
1211       /* NULL value. store '\0' so that it sorts before non-NULL values */
1212       *tuple++ = 0;
1213       /* That's it, don't store anything else */
1214       if (n_null_fields) (*n_null_fields)++;
1215       return tuple;
1216     } else {
1217       /* Not a NULL value. Store '1' */
1218       *tuple++ = 1;
1219     }
1220   }
1221 
1222   const bool create_unpack_info =
1223       (unpack_info &&  // we were requested to generate unpack_info
1224        pack_info->uses_unpack_info());  // and this keypart uses it
1225   Rdb_pack_field_context pack_ctx(unpack_info);
1226 
1227   // Set the offset for methods which do not take an offset as an argument
1228   DBUG_ASSERT(
1229       is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
1230 
1231   (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
1232 
1233   /* Make "unpack info" to be stored in the value */
1234   if (create_unpack_info) {
1235     (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
1236                                          &pack_ctx);
1237   }
1238 
1239   return tuple;
1240 }
1241 
1242 /**
1243   Get index columns from the record and pack them into mem-comparable form.
1244 
1245   @param
1246     tbl                   Table we're working on
1247     record           IN   Record buffer with fields in table->record format
1248     pack_buffer      IN   Temporary area for packing varchars. The size is
1249                           at least max_storage_fmt_length() bytes.
1250     packed_tuple     OUT  Key in the mem-comparable form
1251     unpack_info      OUT  Unpack data
1252     unpack_info_len  OUT  Unpack data length
1253     n_key_parts           Number of keyparts to process. 0 means all of them.
1254     n_null_fields    OUT  Number of key fields with NULL value.
1255     ttl_bytes        IN   Previous ttl bytes from old record for update case or
1256                           current ttl bytes from just packed primary key/value
1257   @detail
1258     Some callers do not need the unpack information, they can pass
1259     unpack_info=nullptr, unpack_info_len=nullptr.
1260 
1261   @return
1262     Length of the packed tuple
1263 */
1264 
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool should_store_row_debug_checksums,const longlong hidden_pk_id,uint n_key_parts,uint * const n_null_fields,const char * const ttl_bytes) const1265 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
1266                               const uchar *const record,
1267                               uchar *const packed_tuple,
1268                               Rdb_string_writer *const unpack_info,
1269                               const bool should_store_row_debug_checksums,
1270                               const longlong hidden_pk_id, uint n_key_parts,
1271                               uint *const n_null_fields,
1272                               const char *const ttl_bytes) const {
1273   DBUG_ASSERT(tbl != nullptr);
1274   DBUG_ASSERT(pack_buffer != nullptr);
1275   DBUG_ASSERT(record != nullptr);
1276   DBUG_ASSERT(packed_tuple != nullptr);
1277   // Checksums for PKs are made when record is packed.
1278   // We should never attempt to make checksum just from PK values
1279   DBUG_ASSERT_IMP(should_store_row_debug_checksums,
1280                   (m_index_type == INDEX_TYPE_SECONDARY));
1281 
1282   uchar *tuple = packed_tuple;
1283   size_t unpack_start_pos = size_t(-1);
1284   size_t unpack_len_pos = size_t(-1);
1285   size_t covered_bitmap_pos = size_t(-1);
1286   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1287 
1288   rdb_netbuf_store_index(tuple, m_index_number);
1289   tuple += INDEX_NUMBER_SIZE;
1290 
1291   // If n_key_parts is 0, it means all columns.
1292   // The following includes the 'extended key' tail.
1293   // The 'extended key' includes primary key. This is done to 'uniqify'
1294   // non-unique indexes
1295   const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1296 
1297   // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1298   // hidden key part.  So we skip it (its always 1 part).
1299   if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
1300     n_key_parts = m_key_parts - 1;
1301   } else if (use_all_columns) {
1302     n_key_parts = m_key_parts;
1303   }
1304 
1305   if (n_null_fields) *n_null_fields = 0;
1306 
1307   // Check if we need a covered bitmap. If it is certain that all key parts are
1308   // covering, we don't need one.
1309   bool store_covered_bitmap = false;
1310   if (unpack_info && use_covered_bitmap_format()) {
1311     for (uint i = 0; i < n_key_parts; i++) {
1312       if (!m_pack_info[i].m_covered) {
1313         store_covered_bitmap = true;
1314         break;
1315       }
1316     }
1317   }
1318 
1319   const char tag =
1320       store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1321 
1322   if (unpack_info) {
1323     unpack_info->clear();
1324 
1325     if (m_index_type == INDEX_TYPE_SECONDARY &&
1326         m_total_index_flags_length > 0) {
1327       // Reserve space for index flag fields
1328       unpack_info->allocate(m_total_index_flags_length);
1329 
1330       // Insert TTL timestamp
1331       if (has_ttl() && ttl_bytes) {
1332         write_index_flag_field(unpack_info,
1333                                reinterpret_cast<const uchar *>(ttl_bytes),
1334                                Rdb_key_def::TTL_FLAG);
1335       }
1336     }
1337 
1338     unpack_start_pos = unpack_info->get_current_pos();
1339     unpack_info->write_uint8(tag);
1340     unpack_len_pos = unpack_info->get_current_pos();
1341     // we don't know the total length yet, so write a zero
1342     unpack_info->write_uint16(0);
1343 
1344     if (store_covered_bitmap) {
1345       // Reserve two bytes for the covered bitmap. This will store, for key
1346       // parts which are not always covering, whether or not it is covering
1347       // for this record.
1348       covered_bitmap_pos = unpack_info->get_current_pos();
1349       unpack_info->write_uint16(0);
1350     }
1351   }
1352 
1353   MY_BITMAP covered_bitmap;
1354   my_bitmap_map covered_bits;
1355   uint curr_bitmap_pos = 0;
1356   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1357 
1358   for (uint i = 0; i < n_key_parts; i++) {
1359     // Fill hidden pk id into the last key part for secondary keys for tables
1360     // with no pk
1361     if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1362       m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1363       break;
1364     }
1365 
1366     Field *const field = m_pack_info[i].get_field_in_table(tbl);
1367     DBUG_ASSERT(field != nullptr);
1368 
1369     uint field_offset = field->ptr - tbl->record[0];
1370     uint null_offset = field->null_offset(tbl->record[0]);
1371     bool maybe_null = field->real_maybe_null();
1372 
1373     field->move_field(
1374         const_cast<uchar *>(record) + field_offset,
1375         maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
1376         field->null_bit);
1377     // WARNING! Don't return without restoring field->ptr and field->null_ptr
1378 
1379     tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1380                        unpack_info, n_null_fields);
1381 
1382     // If this key part is a prefix of a VARCHAR field, check if it's covered.
1383     if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1384         !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1385       size_t data_length = field->data_length();
1386       uint16 key_length;
1387       if (m_pk_part_no[i] == (uint)-1) {
1388         key_length = tbl->key_info[get_keyno()].key_part[i].length;
1389       } else {
1390         key_length =
1391             tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1392       }
1393 
1394       if (m_pack_info[i].m_unpack_func != nullptr &&
1395           data_length <= key_length) {
1396         bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1397       }
1398       curr_bitmap_pos++;
1399     }
1400 
1401     // Restore field->ptr and field->null_ptr
1402     field->move_field(tbl->record[0] + field_offset,
1403                       maybe_null ? tbl->record[0] + null_offset : nullptr,
1404                       field->null_bit);
1405   }
1406 
1407   if (unpack_info) {
1408     const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1409     DBUG_ASSERT(len <= std::numeric_limits<uint16_t>::max());
1410 
1411     // Don't store the unpack_info if it has only the header (that is, there's
1412     // no meaningful content).
1413     // Primary Keys are special: for them, store the unpack_info even if it's
1414     // empty (provided m_maybe_unpack_info==true, see
1415     // ha_rocksdb::convert_record_to_storage_format)
1416     if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1417       if (len == get_unpack_header_size(tag) && !covered_bits) {
1418         unpack_info->truncate(unpack_start_pos);
1419       } else if (store_covered_bitmap) {
1420         unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1421       }
1422     } else {
1423       unpack_info->write_uint16_at(unpack_len_pos, len);
1424     }
1425 
1426     //
1427     // Secondary keys have key and value checksums in the value part
1428     // Primary key is a special case (the value part has non-indexed columns),
1429     // so the checksums are computed and stored by
1430     // ha_rocksdb::convert_record_to_storage_format
1431     //
1432     if (should_store_row_debug_checksums) {
1433       const uint32_t key_crc32 = crc32(0, packed_tuple, tuple - packed_tuple);
1434       const uint32_t val_crc32 =
1435           crc32(0, unpack_info->ptr(), unpack_info->get_current_pos());
1436 
1437       unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1438       unpack_info->write_uint32(key_crc32);
1439       unpack_info->write_uint32(val_crc32);
1440     }
1441   }
1442 
1443   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1444 
1445   return tuple - packed_tuple;
1446 }
1447 
1448 /**
1449   Pack the hidden primary key into mem-comparable form.
1450 
1451   @param
1452     tbl                   Table we're working on
1453     hidden_pk_id     IN   New value to be packed into key
1454     packed_tuple     OUT  Key in the mem-comparable form
1455 
1456   @return
1457     Length of the packed tuple
1458 */
1459 
pack_hidden_pk(const longlong hidden_pk_id,uchar * const packed_tuple) const1460 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
1461                                  uchar *const packed_tuple) const {
1462   DBUG_ASSERT(packed_tuple != nullptr);
1463 
1464   uchar *tuple = packed_tuple;
1465   rdb_netbuf_store_index(tuple, m_index_number);
1466   tuple += INDEX_NUMBER_SIZE;
1467   DBUG_ASSERT(m_key_parts == 1);
1468   DBUG_ASSERT(is_storage_available(tuple - packed_tuple,
1469                                    m_pack_info[0].m_max_image_len));
1470 
1471   m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1472 
1473   DBUG_ASSERT(is_storage_available(tuple - packed_tuple, 0));
1474   return tuple - packed_tuple;
1475 }
1476 
1477 /*
1478   Function of type rdb_index_field_pack_t
1479 */
1480 
pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1481 void Rdb_key_def::pack_with_make_sort_key(
1482     Rdb_field_packing *const fpi, Field *const field,
1483     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1484     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1485   DBUG_ASSERT(fpi != nullptr);
1486   DBUG_ASSERT(field != nullptr);
1487   DBUG_ASSERT(dst != nullptr);
1488   DBUG_ASSERT(*dst != nullptr);
1489 
1490   const int max_len = fpi->m_max_image_len;
1491   MY_BITMAP*old_map;
1492 
1493   old_map= dbug_tmp_use_all_columns(field->table,
1494                                     &field->table->read_set);
1495   field->sort_string(*dst, max_len);
1496   dbug_tmp_restore_column_map(&field->table->read_set, old_map);
1497   *dst += max_len;
1498 }
1499 
1500 /*
1501   Compares two keys without unpacking
1502 
1503   @detail
1504   @return
1505     0 - Ok. column_index is the index of the first column which is different.
1506           -1 if two kes are equal
1507     1 - Data format error.
1508 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const1509 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
1510                               const rocksdb::Slice *key2,
1511                               std::size_t *const column_index) const {
1512   DBUG_ASSERT(key1 != nullptr);
1513   DBUG_ASSERT(key2 != nullptr);
1514   DBUG_ASSERT(column_index != nullptr);
1515 
1516   // the caller should check the return value and
1517   // not rely on column_index being valid
1518   *column_index = 0xbadf00d;
1519 
1520   Rdb_string_reader reader1(key1);
1521   Rdb_string_reader reader2(key2);
1522 
1523   // Skip the index number
1524   if ((!reader1.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1525 
1526   if ((!reader2.read(INDEX_NUMBER_SIZE))) return HA_EXIT_FAILURE;
1527 
1528   for (uint i = 0; i < m_key_parts; i++) {
1529     const Rdb_field_packing *const fpi = &m_pack_info[i];
1530     if (fpi->m_maybe_null) {
1531       const auto nullp1 = reader1.read(1);
1532       const auto nullp2 = reader2.read(1);
1533 
1534       if (nullp1 == nullptr || nullp2 == nullptr) {
1535         return HA_EXIT_FAILURE;
1536       }
1537 
1538       if (*nullp1 != *nullp2) {
1539         *column_index = i;
1540         return HA_EXIT_SUCCESS;
1541       }
1542 
1543       if (*nullp1 == 0) {
1544         /* This is a NULL value */
1545         continue;
1546       }
1547     }
1548 
1549     const auto before_skip1 = reader1.get_current_ptr();
1550     const auto before_skip2 = reader2.get_current_ptr();
1551     DBUG_ASSERT(fpi->m_skip_func);
1552     if ((fpi->m_skip_func)(fpi, nullptr, &reader1)) {
1553       return HA_EXIT_FAILURE;
1554     }
1555     if ((fpi->m_skip_func)(fpi, nullptr, &reader2)) {
1556       return HA_EXIT_FAILURE;
1557     }
1558     const auto size1 = reader1.get_current_ptr() - before_skip1;
1559     const auto size2 = reader2.get_current_ptr() - before_skip2;
1560     if (size1 != size2) {
1561       *column_index = i;
1562       return HA_EXIT_SUCCESS;
1563     }
1564 
1565     if (memcmp(before_skip1, before_skip2, size1) != 0) {
1566       *column_index = i;
1567       return HA_EXIT_SUCCESS;
1568     }
1569   }
1570 
1571   *column_index = m_key_parts;
1572   return HA_EXIT_SUCCESS;
1573 }
1574 
1575 /*
1576   @brief
1577     Given a zero-padded key, determine its real key length
1578 
1579   @detail
1580     Fixed-size skip functions just read.
1581 */
1582 
key_length(const TABLE * const table,const rocksdb::Slice & key) const1583 size_t Rdb_key_def::key_length(const TABLE *const table,
1584                                const rocksdb::Slice &key) const {
1585   DBUG_ASSERT(table != nullptr);
1586 
1587   Rdb_string_reader reader(&key);
1588 
1589   if ((!reader.read(INDEX_NUMBER_SIZE))) {
1590     return size_t(-1);
1591   }
1592   for (uint i = 0; i < m_key_parts; i++) {
1593     const Rdb_field_packing *fpi = &m_pack_info[i];
1594     const Field *field = nullptr;
1595     if (m_index_type != INDEX_TYPE_HIDDEN_PRIMARY) {
1596       field = fpi->get_field_in_table(table);
1597     }
1598     if ((fpi->m_skip_func)(fpi, field, &reader)) {
1599       return size_t(-1);
1600     }
1601   }
1602   return key.size() - reader.remaining_bytes();
1603 }
1604 
1605 /*
1606   Take mem-comparable form and unpack_info and unpack it to Table->record
1607 
1608   @detail
1609     not all indexes support this
1610 
1611   @return
1612     HA_EXIT_SUCCESS    OK
1613     other              HA_ERR error code
1614 */
1615 
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool verify_row_debug_checksums) const1616 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
1617                                const rocksdb::Slice *const packed_key,
1618                                const rocksdb::Slice *const unpack_info,
1619                                const bool verify_row_debug_checksums) const {
1620   Rdb_string_reader reader(packed_key);
1621   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1622 
1623   // There is no checksuming data after unpack_info for primary keys, because
1624   // the layout there is different. The checksum is verified in
1625   // ha_rocksdb::convert_record_from_storage_format instead.
1626   DBUG_ASSERT_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
1627                   !verify_row_debug_checksums);
1628 
1629   // Skip the index number
1630   if ((!reader.read(INDEX_NUMBER_SIZE))) {
1631     return HA_ERR_ROCKSDB_CORRUPT_DATA;
1632   }
1633 
1634   // For secondary keys, we expect the value field to contain index flags,
1635   // unpack data, and checksum data in that order. One or all can be missing,
1636   // but they cannot be reordered.
1637   if (unp_reader.remaining_bytes()) {
1638     if (m_index_type == INDEX_TYPE_SECONDARY &&
1639         m_total_index_flags_length > 0 &&
1640         !unp_reader.read(m_total_index_flags_length)) {
1641       return HA_ERR_ROCKSDB_CORRUPT_DATA;
1642     }
1643   }
1644 
1645   const char *unpack_header = unp_reader.get_current_ptr();
1646   bool has_unpack_info =
1647       unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
1648   if (has_unpack_info) {
1649     if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
1650       return HA_ERR_ROCKSDB_CORRUPT_DATA;
1651     }
1652   }
1653 
1654   // Read the covered bitmap
1655   MY_BITMAP covered_bitmap;
1656   my_bitmap_map covered_bits;
1657   bool has_covered_bitmap =
1658       has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
1659   if (has_covered_bitmap) {
1660     bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1661     covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1662                                         sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1663                                         RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1664   }
1665 
1666   int err = HA_EXIT_SUCCESS;
1667 
1668 
1669   Rdb_key_field_iterator iter(
1670       this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
1671       has_covered_bitmap ? &covered_bitmap : nullptr, buf);
1672   while (iter.has_next()) {
1673     err = iter.next();
1674     if (err) {
1675       return err;
1676     }
1677   }
1678 
1679   /*
1680     Check checksum values if present
1681   */
1682   const char *ptr;
1683   if ((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG) {
1684     if (verify_row_debug_checksums) {
1685       uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
1686           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1687       const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
1688           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
1689 
1690       const uint32_t computed_key_chksum =
1691           crc32(0, (const uchar *)packed_key->data(), packed_key->size());
1692       const uint32_t computed_val_chksum =
1693           crc32(0, (const uchar *)unpack_info->data(),
1694                 unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1695 
1696       DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
1697                       stored_key_chksum++;);
1698 
1699       if (stored_key_chksum != computed_key_chksum) {
1700         report_checksum_mismatch(true, packed_key->data(), packed_key->size());
1701         return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1702       }
1703 
1704       if (stored_val_chksum != computed_val_chksum) {
1705         report_checksum_mismatch(false, unpack_info->data(),
1706                                  unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
1707         return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
1708       }
1709     } else {
1710       /* The checksums are present but we are not checking checksums */
1711     }
1712   }
1713 
1714   if (reader.remaining_bytes()) return HA_ERR_ROCKSDB_CORRUPT_DATA;
1715 
1716   return HA_EXIT_SUCCESS;
1717 }
1718 
table_has_hidden_pk(const TABLE * const table)1719 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
1720   return table->s->primary_key == MAX_INDEXES;
1721 }
1722 
report_checksum_mismatch(const bool is_key,const char * const data,const size_t data_size) const1723 void Rdb_key_def::report_checksum_mismatch(const bool is_key,
1724                                            const char *const data,
1725                                            const size_t data_size) const {
1726   // NO_LINT_DEBUG
1727   sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
1728                   is_key ? "key" : "value", get_index_number());
1729 
1730   const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
1731   // NO_LINT_DEBUG
1732   sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
1733                   (uint64_t)data_size, buf.c_str());
1734 
1735   my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
1736 }
1737 
index_format_min_check(const int pk_min,const int sk_min) const1738 bool Rdb_key_def::index_format_min_check(const int pk_min,
1739                                          const int sk_min) const {
1740   switch (m_index_type) {
1741     case INDEX_TYPE_PRIMARY:
1742     case INDEX_TYPE_HIDDEN_PRIMARY:
1743       return (m_kv_format_version >= pk_min);
1744     case INDEX_TYPE_SECONDARY:
1745       return (m_kv_format_version >= sk_min);
1746     default:
1747       DBUG_ASSERT(0);
1748       return false;
1749   }
1750 }
1751 
1752 ///////////////////////////////////////////////////////////////////////////////////////////
1753 // Rdb_field_packing
1754 ///////////////////////////////////////////////////////////////////////////////////////////
1755 
1756 /*
1757   Function of type rdb_index_field_skip_t
1758 */
1759 
skip_max_length(const Rdb_field_packing * const fpi,const Field * const field MY_ATTRIBUTE ((__unused__)),Rdb_string_reader * const reader)1760 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
1761                                  const Field *const field
1762                                      MY_ATTRIBUTE((__unused__)),
1763                                  Rdb_string_reader *const reader) {
1764   if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
1765   return HA_EXIT_SUCCESS;
1766 }
1767 
1768 /*
1769   (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
1770   split in the middle of an UTF-8 character. See the implementation of
1771   unpack_binary_or_utf8_varchar.
1772 */
1773 #define RDB_ESCAPE_LENGTH 9
1774 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
1775 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
1776               "RDB_ESCAPE_LENGTH-1 must be even.");
1777 
1778 #define RDB_ENCODED_SIZE(len)                                   \
1779   ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
1780       RDB_ESCAPE_LENGTH
1781 
1782 #define RDB_LEGACY_ENCODED_SIZE(len)                                          \
1783   ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
1784       RDB_LEGACY_ESCAPE_LENGTH
1785 
1786 /*
1787   Function of type rdb_index_field_skip_t
1788 */
1789 
skip_variable_length(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1790 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
1791                                       const Field *const field,
1792                                       Rdb_string_reader *const reader) {
1793   const uchar *ptr;
1794   bool finished = false;
1795 
1796   size_t dst_len; /* How much data can be there */
1797   if (field) {
1798     const Field_varstring *const field_var =
1799         static_cast<const Field_varstring *>(field);
1800     dst_len = field_var->pack_length() - field_var->length_bytes;
1801   } else {
1802     dst_len = UINT_MAX;
1803   }
1804 
1805   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
1806 
1807   /* Decode the length-emitted encoding here */
1808   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
1809     uint used_bytes;
1810 
1811     /* See pack_with_varchar_encoding. */
1812     if (use_legacy_format) {
1813       used_bytes = calc_unpack_legacy_variable_format(
1814           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1815     } else {
1816       used_bytes =
1817           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
1818     }
1819 
1820     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
1821       return HA_EXIT_FAILURE;  // Corruption in the data
1822     }
1823 
1824     if (finished) {
1825       break;
1826     }
1827 
1828     dst_len -= used_bytes;
1829   }
1830 
1831   if (!finished) {
1832     return HA_EXIT_FAILURE;
1833   }
1834 
1835   return HA_EXIT_SUCCESS;
1836 }
1837 
1838 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
1839 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
1840 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
1841 
1842 /*
1843   Skip a keypart that uses Variable-Length Space-Padded encoding
1844 */
1845 
skip_variable_space_pad(const Rdb_field_packing * const fpi,const Field * const field,Rdb_string_reader * const reader)1846 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
1847                                          const Field *const field,
1848                                          Rdb_string_reader *const reader) {
1849   const uchar *ptr;
1850   bool finished = false;
1851 
1852   size_t dst_len = UINT_MAX; /* How much data can be there */
1853 
1854   if (field) {
1855     const Field_varstring *const field_var =
1856         static_cast<const Field_varstring *>(field);
1857     dst_len = field_var->pack_length() - field_var->length_bytes;
1858   }
1859 
1860   /* Decode the length-emitted encoding here */
1861   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
1862     // See pack_with_varchar_space_pad
1863     const uchar c = ptr[fpi->m_segment_size - 1];
1864     if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
1865       // This is the last segment
1866       finished = true;
1867       break;
1868     } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
1869                c == VARCHAR_CMP_GREATER_THAN_SPACES) {
1870       // This is not the last segment
1871       if ((fpi->m_segment_size - 1) > dst_len) {
1872         // The segment is full of data but the table field can't hold that
1873         // much! This must be data corruption.
1874         return HA_EXIT_FAILURE;
1875       }
1876       dst_len -= (fpi->m_segment_size - 1);
1877     } else {
1878       // Encountered a value that's none of the VARCHAR_CMP* constants
1879       // It's data corruption.
1880       return HA_EXIT_FAILURE;
1881     }
1882   }
1883   return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
1884 }
1885 
1886 /*
1887   Function of type rdb_index_field_unpack_t
1888 */
1889 
unpack_integer(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))1890 int Rdb_key_def::unpack_integer(
1891     Rdb_field_packing *const fpi, Field *const field, uchar *const to,
1892     Rdb_string_reader *const reader,
1893     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
1894   const int length = fpi->m_max_image_len;
1895 
1896   const uchar *from;
1897   if (!(from = (const uchar *)reader->read(length))) {
1898     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
1899   }
1900 
1901 #ifdef WORDS_BIGENDIAN
1902   {
1903     if (static_cast<Field_num *>(field)->unsigned_flag) {
1904       to[0] = from[0];
1905     } else {
1906       to[0] = static_cast<char>(from[0] ^ 128);  // Reverse the sign bit.
1907     }
1908     memcpy(to + 1, from + 1, length - 1);
1909   }
1910 #else
1911   {
1912     const int sign_byte = from[0];
1913     if (static_cast<Field_num *>(field)->unsigned_flag) {
1914       to[length - 1] = sign_byte;
1915     } else {
1916       to[length - 1] =
1917           static_cast<char>(sign_byte ^ 128);  // Reverse the sign bit.
1918     }
1919     for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
1920   }
1921 #endif
1922   return UNPACK_SUCCESS;
1923 }
1924 
1925 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)1926 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
1927 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1928   // A few systems store the most-significant _word_ first on little-endian
1929   dst[0] = src[3];
1930   dst[1] = src[2];
1931   dst[2] = src[1];
1932   dst[3] = src[0];
1933   dst[4] = src[7];
1934   dst[5] = src[6];
1935   dst[6] = src[5];
1936   dst[7] = src[4];
1937 #else
1938   dst[0] = src[7];
1939   dst[1] = src[6];
1940   dst[2] = src[5];
1941   dst[3] = src[4];
1942   dst[4] = src[3];
1943   dst[5] = src[2];
1944   dst[6] = src[1];
1945   dst[7] = src[0];
1946 #endif
1947 }
1948 
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)1949 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
1950   dst[0] = src[3];
1951   dst[1] = src[2];
1952   dst[2] = src[1];
1953   dst[3] = src[0];
1954 }
1955 #else
1956 #define rdb_swap_double_bytes nullptr
1957 #define rdb_swap_float_bytes nullptr
1958 #endif
1959 
unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t size,const int exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))1960 int Rdb_key_def::unpack_floating_point(
1961     uchar *const dst, Rdb_string_reader *const reader, const size_t size,
1962     const int exp_digit, const uchar *const zero_pattern,
1963     const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
1964   const uchar *const from = (const uchar *)reader->read(size);
1965   if (from == nullptr) {
1966     /* Mem-comparable image doesn't have enough bytes */
1967     return UNPACK_FAILURE;
1968   }
1969 
1970   /* Check to see if the value is zero */
1971   if (memcmp(from, zero_pattern, size) == 0) {
1972     memcpy(dst, zero_val, size);
1973     return UNPACK_SUCCESS;
1974   }
1975 
1976 #if defined(WORDS_BIGENDIAN)
1977   // On big-endian, output can go directly into result
1978   uchar *const tmp = dst;
1979 #else
1980   // Otherwise use a temporary buffer to make byte-swapping easier later
1981   uchar tmp[8];
1982 #endif
1983 
1984   memcpy(tmp, from, size);
1985 
1986   if (tmp[0] & 0x80) {
1987     // If the high bit is set the original value was positive so
1988     // remove the high bit and subtract one from the exponent.
1989     ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
1990     exp_part &= 0x7FFF;                             // clear high bit;
1991     exp_part -= (ushort)1 << (16 - 1 - exp_digit);  // subtract from exponent
1992     tmp[0] = (uchar)(exp_part >> 8);
1993     tmp[1] = (uchar)exp_part;
1994   } else {
1995     // Otherwise the original value was negative and all bytes have been
1996     // negated.
1997     for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
1998   }
1999 
2000 #if !defined(WORDS_BIGENDIAN)
2001   // On little-endian, swap the bytes around
2002   swap_func(dst, tmp);
2003 #else
2004   DBUG_ASSERT(swap_func == nullptr);
2005 #endif
2006 
2007   return UNPACK_SUCCESS;
2008 }
2009 
2010 #if !defined(DBL_EXP_DIG)
2011 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
2012 #endif
2013 
2014 /*
2015   Function of type rdb_index_field_unpack_t
2016 
2017   Unpack a double by doing the reverse action of change_double_for_sort
2018   (sql/filesort.cc).  Note that this only works on IEEE values.
2019   Note also that this code assumes that NaN and +/-Infinity are never
2020   allowed in the database.
2021 */
unpack_double(Rdb_field_packing * const fpi MY_ATTRIBUTE ((__unused__)),Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2022 int Rdb_key_def::unpack_double(
2023     Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
2024     Field *const field MY_ATTRIBUTE((__unused__)), uchar *const field_ptr,
2025     Rdb_string_reader *const reader,
2026     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2027   static double zero_val = 0.0;
2028   static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
2029 
2030   return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
2031                                zero_pattern, (const uchar *)&zero_val,
2032                                rdb_swap_double_bytes);
2033 }
2034 
2035 #if !defined(FLT_EXP_DIG)
2036 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
2037 #endif
2038 
2039 /*
2040   Function of type rdb_index_field_unpack_t
2041 
2042   Unpack a float by doing the reverse action of Field_float::make_sort_key
2043   (sql/field.cc).  Note that this only works on IEEE values.
2044   Note also that this code assumes that NaN and +/-Infinity are never
2045   allowed in the database.
2046 */
unpack_float(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2047 int Rdb_key_def::unpack_float(
2048     Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2049     uchar *const field_ptr, Rdb_string_reader *const reader,
2050     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2051   static float zero_val = 0.0;
2052   static const uchar zero_pattern[4] = {128, 0, 0, 0};
2053 
2054   return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
2055                                zero_pattern, (const uchar *)&zero_val,
2056                                rdb_swap_float_bytes);
2057 }
2058 
2059 /*
2060   Function of type rdb_index_field_unpack_t used to
2061   Unpack by doing the reverse action to Field_newdate::make_sort_key.
2062 */
2063 
unpack_newdate(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2064 int Rdb_key_def::unpack_newdate(
2065     Rdb_field_packing *const fpi, Field *const field MY_ATTRIBUTE((__unused__)),
2066     uchar *const field_ptr, Rdb_string_reader *const reader,
2067     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2068   const char *from;
2069   DBUG_ASSERT(fpi->m_max_image_len == 3);
2070 
2071   if (!(from = reader->read(3))) {
2072     /* Mem-comparable image doesn't have enough bytes */
2073     return UNPACK_FAILURE;
2074   }
2075 
2076   field_ptr[0] = from[2];
2077   field_ptr[1] = from[1];
2078   field_ptr[2] = from[0];
2079   return UNPACK_SUCCESS;
2080 }
2081 
2082 /*
2083   Function of type rdb_index_field_unpack_t, used to
2084   Unpack the string by copying it over.
2085   This is for BINARY(n) where the value occupies the whole length.
2086 */
2087 
unpack_binary_str(Rdb_field_packing * const fpi,Field * const field,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2088 int Rdb_key_def::unpack_binary_str(
2089     Rdb_field_packing *const fpi, Field *const field, uchar *const to,
2090     Rdb_string_reader *const reader,
2091     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2092   const char *from;
2093   if (!(from = reader->read(fpi->m_max_image_len))) {
2094     /* Mem-comparable image doesn't have enough bytes */
2095     return UNPACK_FAILURE;
2096   }
2097 
2098   memcpy(to, from, fpi->m_max_image_len);
2099   return UNPACK_SUCCESS;
2100 }
2101 
2102 /*
2103   Function of type rdb_index_field_unpack_t.
2104   For UTF-8, we need to convert 2-byte wide-character entities back into
2105   UTF8 sequences.
2106 */
2107 
unpack_utf8_str(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2108 int Rdb_key_def::unpack_utf8_str(
2109     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2110     Rdb_string_reader *const reader,
2111     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2112   my_core::CHARSET_INFO *const cset = (my_core::CHARSET_INFO *)field->charset();
2113   const uchar *src;
2114   if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
2115     /* Mem-comparable image doesn't have enough bytes */
2116     return UNPACK_FAILURE;
2117   }
2118 
2119   const uchar *const src_end = src + fpi->m_max_image_len;
2120   uchar *const dst_end = dst + field->pack_length();
2121 
2122   while (src < src_end) {
2123     my_wc_t wc = (src[0] << 8) | src[1];
2124     src += 2;
2125     int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2126     DBUG_ASSERT(res > 0 && res <= 3);
2127     if (res < 0) return UNPACK_FAILURE;
2128     dst += res;
2129   }
2130 
2131   cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
2132                    cset->pad_char);
2133   return UNPACK_SUCCESS;
2134 }
2135 
2136 /*
2137   This is the original algorithm to encode a variable binary field.  It
2138   sets a flag byte every Nth byte.  The flag value is (255 - #pad) where
2139   #pad is the number of padding bytes that were needed (0 if all N-1
2140   bytes were used).
2141 
2142   If N=8 and the field is:
2143   * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2144   * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2145   And the 4 byte string compares as greater than the 3 byte string
2146 
2147   Unfortunately the algorithm has a flaw.  If the input is exactly a
2148   multiple of N-1, an extra N bytes are written.  Since we usually use
2149   N=9, an 8 byte input will generate 18 bytes of output instead of the
2150   9 bytes of output that is optimal.
2151 
2152   See pack_variable_format for the newer algorithm.
2153 */
pack_legacy_variable_format(const uchar * src,size_t src_len,uchar ** dst)2154 void Rdb_key_def::pack_legacy_variable_format(
2155     const uchar *src,  // The data to encode
2156     size_t src_len,    // The length of the data to encode
2157     uchar **dst)       // The location to encode the data
2158 {
2159   size_t copy_len;
2160   size_t padding_bytes;
2161   uchar *ptr = *dst;
2162 
2163   do {
2164     copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2165     padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2166     memcpy(ptr, src, copy_len);
2167     ptr += copy_len;
2168     src += copy_len;
2169     // pad with zeros if necessary
2170     if (padding_bytes > 0) {
2171       memset(ptr, 0, padding_bytes);
2172       ptr += padding_bytes;
2173     }
2174 
2175     *(ptr++) = 255 - padding_bytes;
2176 
2177     src_len -= copy_len;
2178   } while (padding_bytes == 0);
2179 
2180   *dst = ptr;
2181 }
2182 
2183 /*
2184   This is the new algorithm.  Similarly to the legacy format the input
2185   is split up into N-1 bytes and a flag byte is used as the Nth byte
2186   in the output.
2187 
2188   - If the previous segment needed any padding the flag is set to the
2189     number of bytes used (0..N-2).  0 is possible in the first segment
2190     if the input is 0 bytes long.
2191   - If no padding was used and there is no more data left in the input
2192     the flag is set to N-1
2193   - If no padding was used and there is still data left in the input the
2194     flag is set to N.
2195 
2196   For N=9, the following input values encode to the specified
2197   outout (where 'X' indicates a byte of the original input):
2198   - 0 bytes  is encoded as 0 0 0 0 0 0 0 0 0
2199   - 1 byte   is encoded as X 0 0 0 0 0 0 0 1
2200   - 2 bytes  is encoded as X X 0 0 0 0 0 0 2
2201   - 7 bytes  is encoded as X X X X X X X 0 7
2202   - 8 bytes  is encoded as X X X X X X X X 8
2203   - 9 bytes  is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2204   - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2205 */
pack_variable_format(const uchar * src,size_t src_len,uchar ** dst)2206 void Rdb_key_def::pack_variable_format(
2207     const uchar *src,  // The data to encode
2208     size_t src_len,    // The length of the data to encode
2209     uchar **dst)       // The location to encode the data
2210 {
2211   uchar *ptr = *dst;
2212 
2213   for (;;) {
2214     // Figure out how many bytes to copy, copy them and adjust pointers
2215     const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2216     memcpy(ptr, src, copy_len);
2217     ptr += copy_len;
2218     src += copy_len;
2219     src_len -= copy_len;
2220 
2221     // Are we at the end of the input?
2222     if (src_len == 0) {
2223       // pad with zeros if necessary;
2224       const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2225       if (padding_bytes > 0) {
2226         memset(ptr, 0, padding_bytes);
2227         ptr += padding_bytes;
2228       }
2229 
2230       // Put the flag byte (0 - N-1) in the output
2231       *(ptr++) = (uchar)copy_len;
2232       break;
2233     }
2234 
2235     // We have more data - put the flag byte (N) in and continue
2236     *(ptr++) = RDB_ESCAPE_LENGTH;
2237   }
2238 
2239   *dst = ptr;
2240 }
2241 
2242 /*
2243   Function of type rdb_index_field_pack_t
2244 */
2245 
pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))2246 void Rdb_key_def::pack_with_varchar_encoding(
2247     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2248     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
2249   const CHARSET_INFO *const charset = field->charset();
2250   Field_varstring *const field_var = (Field_varstring *)field;
2251 
2252   const size_t value_length = (field_var->length_bytes == 1)
2253                                   ? (uint)*field->ptr
2254                                   : uint2korr(field->ptr);
2255   size_t xfrm_len = charset->coll->strnxfrm(
2256       charset, buf, fpi->m_max_image_len, field_var->char_length(),
2257       field_var->ptr + field_var->length_bytes, value_length, 0);
2258 
2259   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2260   if (fpi->m_use_legacy_varbinary_format) {
2261     pack_legacy_variable_format(buf, xfrm_len, dst);
2262   } else {
2263     pack_variable_format(buf, xfrm_len, dst);
2264   }
2265 }
2266 
2267 /*
2268   Compare the string in [buf..buf_end) with a string that is an infinite
2269   sequence of strings in space_xfrm
2270 */
2271 
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)2272 static int rdb_compare_string_with_spaces(
2273     const uchar *buf, const uchar *const buf_end,
2274     const std::vector<uchar> *const space_xfrm) {
2275   int cmp = 0;
2276   while (buf < buf_end) {
2277     size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2278     if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
2279     buf += bytes;
2280   }
2281   return cmp;
2282 }
2283 
2284 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2285 /*
2286   Pack the data with Variable-Length Space-Padded Encoding.
2287 
2288   The encoding is there to meet two goals:
2289 
2290   Goal#1. Comparison. The SQL standard says
2291 
2292     " If the collation for the comparison has the PAD SPACE characteristic,
2293     for the purposes of the comparison, the shorter value is effectively
2294     extended to the length of the longer by concatenation of <space>s on the
2295     right.
2296 
2297   At the moment, all MySQL collations except one have the PAD SPACE
2298   characteristic.  The exception is the "binary" collation that is used by
2299   [VAR]BINARY columns. (Note that binary collations for specific charsets,
2300   like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2301   the PAD SPACE characteristic).
2302 
2303   Goal#2 is to preserve the number of trailing spaces in the original value.
2304 
2305   This is achieved by using the following encoding:
2306   The key part:
2307   - Stores mem-comparable image of the column
2308   - It is stored in chunks of fpi->m_segment_size bytes (*)
2309     = If the remainder of the chunk is not occupied, it is padded with mem-
2310       comparable image of the space character (cs->pad_char to be precise).
2311   - The last byte of the chunk shows how the rest of column's mem-comparable
2312     image would compare to mem-comparable image of the column extended with
2313     spaces. There are three possible values.
2314      - VARCHAR_CMP_LESS_THAN_SPACES,
2315      - VARCHAR_CMP_EQUAL_TO_SPACES
2316      - VARCHAR_CMP_GREATER_THAN_SPACES
2317 
2318   VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2319   is spaces, or something that sorts as spaces, so there is no reason to store
2320   it).
2321 
2322   Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2323 
2324    'abcd\0'   => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0    ' <VARCHAR_CMP_EQUAL> ]
2325    'abcd'     => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2326    'abcd   '  => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2327    'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2328 
2329   As mentioned above, the last chunk is padded with mem-comparable images of
2330   cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2331 
2332   fpi->m_segment_size depends on the used collation. It is chosen to be such
2333   that no mem-comparable image of space will ever stretch across the segments
2334   (see get_segment_size_from_collation).
2335 
2336   == The value part (aka unpack_info) ==
2337   The value part stores the number of space characters that one needs to add
2338   when unpacking the string.
2339   - If the number is positive, it means add this many spaces at the end
2340   - If the number is negative, it means padding has added extra spaces which
2341     must be removed.
2342 
2343   Storage considerations
2344   - depending on column's max size, the number may occupy 1 or 2 bytes
2345   - the number of spaces that need to be removed is not more than
2346     RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2347     then store it as unsigned.
2348 
2349   @seealso
2350     unpack_binary_or_utf8_varchar_space_pad
2351     unpack_simple_varchar_space_pad
2352     dummy_make_unpack_info
2353     skip_variable_space_pad
2354 */
2355 
pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)2356 void Rdb_key_def::pack_with_varchar_space_pad(
2357     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2358     Rdb_pack_field_context *const pack_ctx) {
2359   Rdb_string_writer *const unpack_info = pack_ctx->writer;
2360   const CHARSET_INFO *const charset = field->charset();
2361   const auto field_var = static_cast<Field_varstring *>(field);
2362 
2363   const size_t value_length = (field_var->length_bytes == 1)
2364                                   ? (uint)*field->ptr
2365                                   : uint2korr(field->ptr);
2366 
2367   const size_t trimmed_len = charset->cset->lengthsp(
2368       charset, (const char *)field_var->ptr + field_var->length_bytes,
2369       value_length);
2370   const size_t xfrm_len = charset->coll->strnxfrm(
2371       charset, buf, fpi->m_max_image_len, field_var->char_length(),
2372       field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2373 
2374   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2375   uchar *const buf_end = buf + xfrm_len;
2376 
2377   size_t encoded_size = 0;
2378   uchar *ptr = *dst;
2379   size_t padding_bytes;
2380   while (true) {
2381     const size_t copy_len =
2382         std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2383     padding_bytes = fpi->m_segment_size - 1 - copy_len;
2384     memcpy(ptr, buf, copy_len);
2385     ptr += copy_len;
2386     buf += copy_len;
2387 
2388     if (padding_bytes) {
2389       memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2390       ptr += padding_bytes;
2391       *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;  // last segment
2392     } else {
2393       // Compare the string suffix with a hypothetical infinite string of
2394       // spaces. It could be that the first difference is beyond the end of
2395       // current chunk.
2396       const int cmp =
2397           rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2398 
2399       if (cmp < 0) {
2400         *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2401       } else if (cmp > 0) {
2402         *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2403       } else {
2404         // It turns out all the rest are spaces.
2405         *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2406       }
2407     }
2408     encoded_size += fpi->m_segment_size;
2409 
2410     if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
2411   }
2412 
2413   // m_unpack_info_stores_value means unpack_info stores the whole original
2414   // value. There is no need to store the number of trimmed/padded endspaces
2415   // in that case.
2416   if (unpack_info && !fpi->m_unpack_info_stores_value) {
2417     // (value_length - trimmed_len) is the number of trimmed space *characters*
2418     // then, padding_bytes is the number of *bytes* added as padding
2419     // then, we add 8, because we don't store negative values.
2420     DBUG_ASSERT(padding_bytes % fpi->space_xfrm_len == 0);
2421     DBUG_ASSERT((value_length - trimmed_len) % fpi->space_mb_len == 0);
2422     const size_t removed_chars =
2423         RDB_TRIMMED_CHARS_OFFSET +
2424         (value_length - trimmed_len) / fpi->space_mb_len -
2425         padding_bytes / fpi->space_xfrm_len;
2426 
2427     if (fpi->m_unpack_info_uses_two_bytes) {
2428       unpack_info->write_uint16(removed_chars);
2429     } else {
2430       DBUG_ASSERT(removed_chars < 0x100);
2431       unpack_info->write_uint8(removed_chars);
2432     }
2433   }
2434 
2435   *dst += encoded_size;
2436 }
2437 
2438 /*
2439   Calculate the number of used bytes in the chunk and whether this is the
2440   last chunk in the input.  This is based on the old legacy format - see
2441   pack_legacy_variable_format.
2442  */
calc_unpack_legacy_variable_format(uchar flag,bool * done)2443 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
2444   uint pad = 255 - flag;
2445   uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2446   if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2447     return (uint)-1;
2448   }
2449 
2450   *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2451   return used_bytes;
2452 }
2453 
2454 /*
2455   Calculate the number of used bytes in the chunk and whether this is the
2456   last chunk in the input.  This is based on the new format - see
2457   pack_variable_format.
2458  */
calc_unpack_variable_format(uchar flag,bool * done)2459 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
2460   // Check for invalid flag values
2461   if (flag > RDB_ESCAPE_LENGTH) {
2462     return (uint)-1;
2463   }
2464 
2465   // Values from 1 to N-1 indicate this is the last chunk and that is how
2466   // many bytes were used
2467   if (flag < RDB_ESCAPE_LENGTH) {
2468     *done = true;
2469     return flag;
2470   }
2471 
2472   // A value of N means we used N-1 bytes and had more to go
2473   *done = false;
2474   return RDB_ESCAPE_LENGTH - 1;
2475 }
2476 
2477 /*
2478   Unpack data that has charset information.  Each two bytes of the input is
2479   treated as a wide-character and converted to its multibyte equivalent in
2480   the output.
2481  */
unpack_charset(const CHARSET_INFO * cset,const uchar * src,uint src_len,uchar * dst,uint dst_len,uint * used_bytes)2482 static int unpack_charset(
2483     const CHARSET_INFO *cset,  // character set information
2484     const uchar *src,          // source data to unpack
2485     uint src_len,              // length of source data
2486     uchar *dst,                // destination of unpacked data
2487     uint dst_len,              // length of destination data
2488     uint *used_bytes)          // output number of bytes used
2489 {
2490   if (src_len & 1) {
2491     /*
2492       UTF-8 characters are encoded into two-byte entities. There is no way
2493       we can have an odd number of bytes after encoding.
2494     */
2495     return UNPACK_FAILURE;
2496   }
2497 
2498   uchar *dst_end = dst + dst_len;
2499   uint used = 0;
2500 
2501   for (uint ii = 0; ii < src_len; ii += 2) {
2502     my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2503     int res = cset->cset->wc_mb(cset, wc, dst + used, dst_end);
2504     DBUG_ASSERT(res > 0 && res <= 3);
2505     if (res < 0) {
2506       return UNPACK_FAILURE;
2507     }
2508 
2509     used += res;
2510   }
2511 
2512   *used_bytes = used;
2513   return UNPACK_SUCCESS;
2514 }
2515 
2516 /*
2517   Function of type rdb_index_field_unpack_t
2518 */
2519 
unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2520 int Rdb_key_def::unpack_binary_or_utf8_varchar(
2521     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2522     Rdb_string_reader *const reader,
2523     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2524   const uchar *ptr;
2525   size_t len = 0;
2526   bool finished = false;
2527   uchar *d0 = dst;
2528   Field_varstring *const field_var = (Field_varstring *)field;
2529   dst += field_var->length_bytes;
2530   // How much we can unpack
2531   size_t dst_len = field_var->pack_length() - field_var->length_bytes;
2532 
2533   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
2534 
2535   /* Decode the length-emitted encoding here */
2536   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2537     uint used_bytes;
2538 
2539     /* See pack_with_varchar_encoding. */
2540     if (use_legacy_format) {
2541       used_bytes = calc_unpack_legacy_variable_format(
2542           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2543     } else {
2544       used_bytes =
2545           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2546     }
2547 
2548     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2549       return UNPACK_FAILURE;  // Corruption in the data
2550     }
2551 
2552     /*
2553       Now, we need to decode used_bytes of data and append them to the value.
2554     */
2555     if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2556       int err = unpack_charset(fpi->m_varchar_charset, ptr, used_bytes, dst,
2557                                dst_len, &used_bytes);
2558       if (err != UNPACK_SUCCESS) {
2559         return err;
2560       }
2561     } else {
2562       memcpy(dst, ptr, used_bytes);
2563     }
2564 
2565     dst += used_bytes;
2566     dst_len -= used_bytes;
2567     len += used_bytes;
2568 
2569     if (finished) {
2570       break;
2571     }
2572   }
2573 
2574   if (!finished) {
2575     return UNPACK_FAILURE;
2576   }
2577 
2578   /* Save the length */
2579   if (field_var->length_bytes == 1) {
2580     d0[0] = (uchar)len;
2581   } else {
2582     DBUG_ASSERT(field_var->length_bytes == 2);
2583     int2store(d0, len);
2584   }
2585   return UNPACK_SUCCESS;
2586 }
2587 
2588 /*
2589   @seealso
2590     pack_with_varchar_space_pad - packing function
2591     unpack_simple_varchar_space_pad - unpacking function for 'simple'
2592     charsets.
2593     skip_variable_space_pad - skip function
2594 */
unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2595 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
2596     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2597     Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2598   const uchar *ptr;
2599   size_t len = 0;
2600   bool finished = false;
2601   Field_varstring *const field_var = static_cast<Field_varstring *>(field);
2602   uchar *d0 = dst;
2603   uchar *dst_end = dst + field_var->pack_length();
2604   dst += field_var->length_bytes;
2605 
2606   uint space_padding_bytes = 0;
2607   uint extra_spaces;
2608   if ((fpi->m_unpack_info_uses_two_bytes
2609            ? unp_reader->read_uint16(&extra_spaces)
2610            : unp_reader->read_uint8(&extra_spaces))) {
2611     return UNPACK_FAILURE;
2612   }
2613 
2614   if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
2615     space_padding_bytes =
2616         -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
2617     extra_spaces = 0;
2618   } else {
2619     extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
2620   }
2621 
2622   space_padding_bytes *= fpi->space_xfrm_len;
2623 
2624   /* Decode the length-emitted encoding here */
2625   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2626     const char last_byte = ptr[fpi->m_segment_size - 1];
2627     size_t used_bytes;
2628     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES)  // this is the last segment
2629     {
2630       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2631         return UNPACK_FAILURE;  // Cannot happen, corrupted data
2632       }
2633       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2634       finished = true;
2635     } else {
2636       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2637           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2638         return UNPACK_FAILURE;  // Invalid value
2639       }
2640       used_bytes = fpi->m_segment_size - 1;
2641     }
2642 
2643     // Now, need to decode used_bytes of data and append them to the value.
2644     if (fpi->m_varchar_charset->number == COLLATION_UTF8_BIN) {
2645       if (used_bytes & 1) {
2646         /*
2647           UTF-8 characters are encoded into two-byte entities. There is no way
2648           we can have an odd number of bytes after encoding.
2649         */
2650         return UNPACK_FAILURE;
2651       }
2652 
2653       const uchar *src = ptr;
2654       const uchar *const src_end = ptr + used_bytes;
2655       while (src < src_end) {
2656         my_wc_t wc = (src[0] << 8) | src[1];
2657         src += 2;
2658         const CHARSET_INFO *cset = fpi->m_varchar_charset;
2659         int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2660         DBUG_ASSERT(res <= 3);
2661         if (res <= 0) return UNPACK_FAILURE;
2662         dst += res;
2663         len += res;
2664       }
2665     } else {
2666       if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
2667       memcpy(dst, ptr, used_bytes);
2668       dst += used_bytes;
2669       len += used_bytes;
2670     }
2671 
2672     if (finished) {
2673       if (extra_spaces) {
2674         // Both binary and UTF-8 charset store space as ' ',
2675         // so the following is ok:
2676         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2677         memset(dst, fpi->m_varchar_charset->pad_char, extra_spaces);
2678         len += extra_spaces;
2679       }
2680       break;
2681     }
2682   }
2683 
2684   if (!finished) return UNPACK_FAILURE;
2685 
2686   /* Save the length */
2687   if (field_var->length_bytes == 1) {
2688     d0[0] = (uchar)len;
2689   } else {
2690     DBUG_ASSERT(field_var->length_bytes == 2);
2691     int2store(d0, len);
2692   }
2693   return UNPACK_SUCCESS;
2694 }
2695 
2696 /////////////////////////////////////////////////////////////////////////
2697 
2698 /*
2699   Function of type rdb_make_unpack_info_t
2700 */
2701 
make_unpack_unknown(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2702 void Rdb_key_def::make_unpack_unknown(
2703     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2704     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2705   pack_ctx->writer->write(field->ptr, field->pack_length());
2706 }
2707 
2708 /*
2709   This point of this function is only to indicate that unpack_info is
2710   available.
2711 
2712   The actual unpack_info data is produced by the function that packs the key,
2713   that is, pack_with_varchar_space_pad.
2714 */
2715 
dummy_make_unpack_info(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * field MY_ATTRIBUTE ((__unused__)),Rdb_pack_field_context * pack_ctx MY_ATTRIBUTE ((__unused__)))2716 void Rdb_key_def::dummy_make_unpack_info(
2717     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
2718     const Field *field MY_ATTRIBUTE((__unused__)),
2719     Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
2720   // Do nothing
2721 }
2722 
2723 /*
2724   Function of type rdb_index_field_unpack_t
2725 */
2726 
unpack_unknown(Rdb_field_packing * const fpi,Field * const field,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2727 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi,
2728                                 Field *const field, uchar *const dst,
2729                                 Rdb_string_reader *const reader,
2730                                 Rdb_string_reader *const unp_reader) {
2731   const uchar *ptr;
2732   const uint len = fpi->m_unpack_data_len;
2733   // We don't use anything from the key, so skip over it.
2734   if (skip_max_length(fpi, field, reader)) {
2735     return UNPACK_FAILURE;
2736   }
2737 
2738   DBUG_ASSERT_IMP(len > 0, unp_reader != nullptr);
2739 
2740   if ((ptr = (const uchar *)unp_reader->read(len))) {
2741     memcpy(dst, ptr, len);
2742     return UNPACK_SUCCESS;
2743   }
2744   return UNPACK_FAILURE;
2745 }
2746 
2747 /*
2748   Function of type rdb_make_unpack_info_t
2749 */
2750 
make_unpack_unknown_varchar(const Rdb_collation_codec * const codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)2751 void Rdb_key_def::make_unpack_unknown_varchar(
2752     const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
2753     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
2754   const auto f = static_cast<const Field_varstring *>(field);
2755   uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2756   len += f->length_bytes;
2757   pack_ctx->writer->write(field->ptr, len);
2758 }
2759 
2760 /*
2761   Function of type rdb_index_field_unpack_t
2762 
2763   @detail
2764   Unpack a key part in an "unknown" collation from its
2765   (mem_comparable_form, unpack_info) form.
2766 
2767   "Unknown" means we have no clue about how mem_comparable_form is made from
2768   the original string, so we keep the whole original string in the unpack_info.
2769 
2770   @seealso
2771     make_unpack_unknown, unpack_unknown
2772 */
2773 
unpack_unknown_varchar(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2774 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
2775                                         Field *const field, uchar *dst,
2776                                         Rdb_string_reader *const reader,
2777                                         Rdb_string_reader *const unp_reader) {
2778   const uchar *ptr;
2779   uchar *const d0 = dst;
2780   const auto f = static_cast<Field_varstring *>(field);
2781   dst += f->length_bytes;
2782   const uint len_bytes = f->length_bytes;
2783   // We don't use anything from the key, so skip over it.
2784   if ((fpi->m_skip_func)(fpi, field, reader)) {
2785     return UNPACK_FAILURE;
2786   }
2787 
2788   DBUG_ASSERT(len_bytes > 0);
2789   DBUG_ASSERT(unp_reader != nullptr);
2790 
2791   if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
2792     memcpy(d0, ptr, len_bytes);
2793     const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
2794     if ((ptr = (const uchar *)unp_reader->read(len))) {
2795       memcpy(dst, ptr, len);
2796       return UNPACK_SUCCESS;
2797     }
2798   }
2799   return UNPACK_FAILURE;
2800 }
2801 
2802 /*
2803   Write unpack_data for a "simple" collation
2804 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)2805 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
2806                                     const Rdb_collation_codec *const codec,
2807                                     const uchar *const src,
2808                                     const size_t src_len) {
2809   for (uint i = 0; i < src_len; i++) {
2810     writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
2811   }
2812 }
2813 
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len,uchar * const dst)2814 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
2815                                    const Rdb_collation_codec *const codec,
2816                                    const uchar *const src, const size_t src_len,
2817                                    uchar *const dst) {
2818   for (uint i = 0; i < src_len; i++) {
2819     if (codec->m_dec_size[src[i]] > 0) {
2820       uint *ret;
2821       DBUG_ASSERT(reader != nullptr);
2822 
2823       if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
2824         return UNPACK_FAILURE;
2825       }
2826       dst[i] = codec->m_dec_idx[*ret][src[i]];
2827     } else {
2828       dst[i] = codec->m_dec_idx[0][src[i]];
2829     }
2830   }
2831 
2832   return UNPACK_SUCCESS;
2833 }
2834 
2835 /*
2836   Function of type rdb_make_unpack_info_t
2837 
2838   @detail
2839     Make unpack_data for VARCHAR(n) in a "simple" charset.
2840 */
2841 
make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2842 void Rdb_key_def::make_unpack_simple_varchar(
2843     const Rdb_collation_codec *const codec, const Field *const field,
2844     Rdb_pack_field_context *const pack_ctx) {
2845   const auto f = static_cast<const Field_varstring *>(field);
2846   uchar *const src = f->ptr + f->length_bytes;
2847   const size_t src_len =
2848       f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
2849   Rdb_bit_writer bit_writer(pack_ctx->writer);
2850   // The std::min compares characters with bytes, but for simple collations,
2851   // mbmaxlen = 1.
2852   rdb_write_unpack_simple(&bit_writer, codec, src,
2853                           std::min((size_t)f->char_length(), src_len));
2854 }
2855 
2856 /*
2857   Function of type rdb_index_field_unpack_t
2858 
2859   @seealso
2860     pack_with_varchar_space_pad - packing function
2861     unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
2862 */
2863 
unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2864 int Rdb_key_def::unpack_simple_varchar_space_pad(
2865     Rdb_field_packing *const fpi, Field *const field, uchar *dst,
2866     Rdb_string_reader *const reader, Rdb_string_reader *const unp_reader) {
2867   const uchar *ptr;
2868   size_t len = 0;
2869   bool finished = false;
2870   uchar *d0 = dst;
2871   const Field_varstring *const field_var =
2872       static_cast<Field_varstring *>(field);
2873   // For simple collations, char_length is also number of bytes.
2874   DBUG_ASSERT((size_t)fpi->m_max_image_len >= field_var->char_length());
2875   uchar *dst_end = dst + field_var->pack_length();
2876   dst += field_var->length_bytes;
2877   Rdb_bit_reader bit_reader(unp_reader);
2878 
2879   uint space_padding_bytes = 0;
2880   uint extra_spaces;
2881   DBUG_ASSERT(unp_reader != nullptr);
2882 
2883   if ((fpi->m_unpack_info_uses_two_bytes
2884            ? unp_reader->read_uint16(&extra_spaces)
2885            : unp_reader->read_uint8(&extra_spaces))) {
2886     return UNPACK_FAILURE;
2887   }
2888 
2889   if (extra_spaces <= 8) {
2890     space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
2891     extra_spaces = 0;
2892   } else {
2893     extra_spaces -= 8;
2894   }
2895 
2896   space_padding_bytes *= fpi->space_xfrm_len;
2897 
2898   /* Decode the length-emitted encoding here */
2899   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2900     const char last_byte =
2901         ptr[fpi->m_segment_size - 1];  // number of padding bytes
2902     size_t used_bytes;
2903     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
2904       // this is the last one
2905       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
2906         return UNPACK_FAILURE;  // Cannot happen, corrupted data
2907       }
2908       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
2909       finished = true;
2910     } else {
2911       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
2912           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
2913         return UNPACK_FAILURE;
2914       }
2915       used_bytes = fpi->m_segment_size - 1;
2916     }
2917 
2918     if (dst + used_bytes > dst_end) {
2919       // The value on disk is longer than the field definition allows?
2920       return UNPACK_FAILURE;
2921     }
2922 
2923     uint ret;
2924     if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
2925                                       used_bytes, dst)) != UNPACK_SUCCESS) {
2926       return ret;
2927     }
2928 
2929     dst += used_bytes;
2930     len += used_bytes;
2931 
2932     if (finished) {
2933       if (extra_spaces) {
2934         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
2935         // pad_char has a 1-byte form in all charsets that
2936         // are handled by rdb_init_collation_mapping.
2937         memset(dst, field_var->charset()->pad_char, extra_spaces);
2938         len += extra_spaces;
2939       }
2940       break;
2941     }
2942   }
2943 
2944   if (!finished) return UNPACK_FAILURE;
2945 
2946   /* Save the length */
2947   if (field_var->length_bytes == 1) {
2948     d0[0] = (uchar)len;
2949   } else {
2950     DBUG_ASSERT(field_var->length_bytes == 2);
2951     int2store(d0, len);
2952   }
2953   return UNPACK_SUCCESS;
2954 }
2955 
2956 /*
2957   Function of type rdb_make_unpack_info_t
2958 
2959   @detail
2960     Make unpack_data for CHAR(n) value in a "simple" charset.
2961     It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
2962 
2963   @seealso
2964     The VARCHAR variant is in make_unpack_simple_varchar
2965 */
2966 
make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)2967 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
2968                                      const Field *const field,
2969                                      Rdb_pack_field_context *const pack_ctx) {
2970   const uchar *const src = field->ptr;
2971   Rdb_bit_writer bit_writer(pack_ctx->writer);
2972   rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
2973 }
2974 
2975 /*
2976   Function of type rdb_index_field_unpack_t
2977 */
2978 
unpack_simple(Rdb_field_packing * const fpi,Field * const field MY_ATTRIBUTE ((__unused__)),uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)2979 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi,
2980                                Field *const field MY_ATTRIBUTE((__unused__)),
2981                                uchar *const dst,
2982                                Rdb_string_reader *const reader,
2983                                Rdb_string_reader *const unp_reader) {
2984   const uchar *ptr;
2985   const uint len = fpi->m_max_image_len;
2986   Rdb_bit_reader bit_reader(unp_reader);
2987 
2988   if (!(ptr = (const uchar *)reader->read(len))) {
2989     return UNPACK_FAILURE;
2990   }
2991 
2992   return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
2993                                 fpi->m_charset_codec, ptr, len, dst);
2994 }
2995 
2996 // See Rdb_charset_space_info::spaces_xfrm
2997 const int RDB_SPACE_XFRM_SIZE = 32;
2998 
2999 // A class holding information about how space character is represented in a
3000 // charset.
3001 class Rdb_charset_space_info {
3002  public:
3003   Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
3004   Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
3005   Rdb_charset_space_info() = default;
3006 
3007   // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
3008   std::vector<uchar> spaces_xfrm;
3009 
3010   // length(strxfrm(' '))
3011   size_t space_xfrm_len;
3012 
3013   // length of the space character itself
3014   // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
3015   // (length=2)
3016   size_t space_mb_len;
3017 };
3018 
3019 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
3020     rdb_mem_comparable_space;
3021 
3022 /*
3023   @brief
3024   For a given charset, get
3025    - strxfrm('    '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
3026    - length of strxfrm(charset, ' ')
3027    - length of the space character in the charset
3028 
3029   @param cs  IN    Charset to get the space for
3030   @param ptr OUT   A few space characters
3031   @param len OUT   Return length of the space (in bytes)
3032 
3033   @detail
3034     It is tempting to pre-generate mem-comparable form of space character for
3035     every charset on server startup.
3036     One can't do that: some charsets are not initialized until somebody
3037     attempts to use them (e.g. create or open a table that has a field that
3038     uses the charset).
3039 */
3040 
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)3041 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
3042                                          const std::vector<uchar> **xfrm,
3043                                          size_t *const xfrm_len,
3044                                          size_t *const mb_len) {
3045   DBUG_ASSERT(cs->number < MY_ALL_CHARSETS_SIZE);
3046   if (!rdb_mem_comparable_space[cs->number].get()) {
3047     RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
3048     if (!rdb_mem_comparable_space[cs->number].get()) {
3049       // Upper bound of how many bytes can be occupied by multi-byte form of a
3050       // character in any charset.
3051       const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
3052       DBUG_ASSERT(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
3053 
3054       // multi-byte form of the ' ' (space) character
3055       uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
3056 
3057       const size_t space_mb_len = cs->cset->wc_mb(
3058           cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
3059 
3060       // mem-comparable image of the space character
3061       std::array<uchar, 20> space;
3062 
3063       const size_t space_len = cs->coll->strnxfrm(
3064           cs, space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
3065       Rdb_charset_space_info *const info = new Rdb_charset_space_info;
3066       info->space_xfrm_len = space_len;
3067       info->space_mb_len = space_mb_len;
3068       while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
3069         info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
3070                                  space.data() + space_len);
3071       }
3072       rdb_mem_comparable_space[cs->number].reset(info);
3073     }
3074     RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
3075   }
3076 
3077   *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
3078   *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
3079   *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
3080 }
3081 
3082 mysql_mutex_t rdb_mem_cmp_space_mutex;
3083 
3084 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
3085     rdb_collation_data;
3086 mysql_mutex_t rdb_collation_data_mutex;
3087 
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)3088 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
3089   return cs->strxfrm_multiply==1 && cs->mbmaxlen == 1 &&
3090          !(cs->state & (MY_CS_BINSORT | MY_CS_NOPAD));
3091 }
3092 
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)3093 static const Rdb_collation_codec *rdb_init_collation_mapping(
3094     const my_core::CHARSET_INFO *const cs) {
3095   DBUG_ASSERT(cs && cs->state & MY_CS_AVAILABLE);
3096   const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
3097 
3098   if (codec == nullptr && rdb_is_collation_supported(cs)) {
3099     RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
3100 
3101     codec = rdb_collation_data[cs->number];
3102     if (codec == nullptr) {
3103       Rdb_collation_codec *cur = nullptr;
3104 
3105       // Compute reverse mapping for simple collations.
3106       if (rdb_is_collation_supported(cs)) {
3107         cur = new Rdb_collation_codec;
3108         std::map<uchar, std::vector<uchar>> rev_map;
3109         size_t max_conflict_size = 0;
3110         for (int src = 0; src < 256; src++) {
3111           uchar dst = cs->sort_order[src];
3112           rev_map[dst].push_back(src);
3113           max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
3114         }
3115         cur->m_dec_idx.resize(max_conflict_size);
3116 
3117         for (auto const &p : rev_map) {
3118           uchar dst = p.first;
3119           for (uint idx = 0; idx < p.second.size(); idx++) {
3120             uchar src = p.second[idx];
3121             uchar bits =
3122                 my_bit_log2(my_round_up_to_next_power(p.second.size()));
3123             cur->m_enc_idx[src] = idx;
3124             cur->m_enc_size[src] = bits;
3125             cur->m_dec_size[dst] = bits;
3126             cur->m_dec_idx[idx][dst] = src;
3127           }
3128         }
3129 
3130         cur->m_make_unpack_info_func = {Rdb_key_def::make_unpack_simple_varchar,
3131                                         Rdb_key_def::make_unpack_simple};
3132         cur->m_unpack_func = {Rdb_key_def::unpack_simple_varchar_space_pad,
3133                               Rdb_key_def::unpack_simple};
3134       } else {
3135         // Out of luck for now.
3136       }
3137 
3138       if (cur != nullptr) {
3139         codec = cur;
3140         cur->m_cs = cs;
3141         rdb_collation_data[cs->number] = cur;
3142       }
3143     }
3144 
3145     RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3146   }
3147 
3148   return codec;
3149 }
3150 
get_segment_size_from_collation(const CHARSET_INFO * const cs)3151 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3152   int ret;
3153   if (cs->number == COLLATION_UTF8MB4_BIN || cs->number == COLLATION_UTF16_BIN ||
3154       cs->number == COLLATION_UTF16LE_BIN || cs->number == COLLATION_UTF32_BIN) {
3155     /*
3156       In these collations, a character produces one weight, which is 3 bytes.
3157       Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3158       get 3*3+1=10
3159     */
3160     ret = 10;
3161   } else {
3162     /*
3163       All other collations. There are two classes:
3164       - Unicode-based, except for collations mentioned in the if-condition.
3165         For these all weights are 2 bytes long, a character may produce 0..8
3166         weights.
3167         in any case, 8 bytes of payload in the segment guarantee that the last
3168         space character won't span across segments.
3169 
3170       - Collations not based on unicode. These have length(strxfrm(' '))=1,
3171         there nothing to worry about.
3172 
3173       In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3174     */
3175     ret = 9;
3176   }
3177   DBUG_ASSERT(ret < RDB_SPACE_XFRM_SIZE);
3178   return ret;
3179 }
3180 
3181 /*
3182   @brief
3183     Setup packing of index field into its mem-comparable form
3184 
3185   @detail
3186     - It is possible produce mem-comparable form for any datatype.
3187     - Some datatypes also allow to unpack the original value from its
3188       mem-comparable form.
3189       = Some of these require extra information to be stored in "unpack_info".
3190         unpack_info is not a part of mem-comparable form, it is only used to
3191         restore the original value
3192 
3193   @param
3194     field  IN  field to be packed/un-packed
3195 
3196   @return
3197     TRUE  -  Field can be read with index-only reads
3198     FALSE -  Otherwise
3199 */
3200 
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint keynr_arg,const uint key_part_arg,const uint16 key_length)3201 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3202                               const Field *const field, const uint keynr_arg,
3203                               const uint key_part_arg,
3204                               const uint16 key_length) {
3205   int res = false;
3206   enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3207 
3208   m_keynr = keynr_arg;
3209   m_key_part = key_part_arg;
3210 
3211   m_maybe_null = field ? field->real_maybe_null() : false;
3212   m_unpack_func = nullptr;
3213   m_make_unpack_info_func = nullptr;
3214   m_unpack_data_len = 0;
3215   space_xfrm = nullptr;  // safety
3216   // whether to use legacy format for varchar
3217   m_use_legacy_varbinary_format = false;
3218   // ha_rocksdb::index_flags() will pass key_descr == null to
3219   // see whether field(column) can be read-only reads through return value,
3220   // but the legacy vs. new varchar format doesn't affect return value.
3221   // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
3222   if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3223     m_use_legacy_varbinary_format = true;
3224   }
3225   /* Calculate image length. By default, is is pack_length() */
3226   m_max_image_len =
3227       field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3228   m_skip_func = Rdb_key_def::skip_max_length;
3229   m_pack_func = Rdb_key_def::pack_with_make_sort_key;
3230 
3231   m_covered = false;
3232 
3233   switch (type) {
3234     case MYSQL_TYPE_LONGLONG:
3235     case MYSQL_TYPE_LONG:
3236     case MYSQL_TYPE_INT24:
3237     case MYSQL_TYPE_SHORT:
3238     case MYSQL_TYPE_TINY:
3239       m_unpack_func = Rdb_key_def::unpack_integer;
3240       m_covered = true;
3241       return true;
3242 
3243     case MYSQL_TYPE_DOUBLE:
3244       m_unpack_func = Rdb_key_def::unpack_double;
3245       m_covered = true;
3246       return true;
3247 
3248     case MYSQL_TYPE_FLOAT:
3249       m_unpack_func = Rdb_key_def::unpack_float;
3250       m_covered = true;
3251       return true;
3252 
3253     case MYSQL_TYPE_NEWDECIMAL:
3254     /*
3255       Decimal is packed with Field_new_decimal::make_sort_key, which just
3256       does memcpy.
3257       Unpacking decimal values was supported only after fix for issue#253,
3258       because of that ha_rocksdb::get_storage_type() handles decimal values
3259       in a special way.
3260     */
3261     case MYSQL_TYPE_DATETIME2:
3262     case MYSQL_TYPE_TIMESTAMP2:
3263     /* These are packed with Field_temporal_with_date_and_timef::make_sort_key
3264      */
3265     case MYSQL_TYPE_TIME2: /* TIME is packed with Field_timef::make_sort_key */
3266     case MYSQL_TYPE_YEAR:  /* YEAR is packed with  Field_tiny::make_sort_key */
3267       /* Everything that comes here is packed with just a memcpy(). */
3268       m_unpack_func = Rdb_key_def::unpack_binary_str;
3269       m_covered = true;
3270       return true;
3271 
3272     case MYSQL_TYPE_NEWDATE:
3273       /*
3274         This is packed by Field_newdate::make_sort_key. It assumes the data is
3275         3 bytes, and packing is done by swapping the byte order (for both big-
3276         and little-endian)
3277       */
3278       m_unpack_func = Rdb_key_def::unpack_newdate;
3279       m_covered = true;
3280       return true;
3281     case MYSQL_TYPE_TINY_BLOB:
3282     case MYSQL_TYPE_MEDIUM_BLOB:
3283     case MYSQL_TYPE_LONG_BLOB:
3284     case MYSQL_TYPE_BLOB: {
3285       if (key_descr) {
3286         // The my_charset_bin collation is special in that it will consider
3287         // shorter strings sorting as less than longer strings.
3288         //
3289         // See Field_blob::make_sort_key for details.
3290         m_max_image_len =
3291           key_length + (field->charset()->number == COLLATION_BINARY
3292                               ? reinterpret_cast<const Field_blob *>(field)
3293                                     ->pack_length_no_ptr()
3294                               : 0);
3295         // Return false because indexes on text/blob will always require
3296         // a prefix. With a prefix, the optimizer will not be able to do an
3297         // index-only scan since there may be content occuring after the prefix
3298         // length.
3299         return false;
3300       }
3301       break;
3302     }
3303     default:
3304       break;
3305   }
3306 
3307   m_unpack_info_stores_value = false;
3308   /* Handle [VAR](CHAR|BINARY) */
3309 
3310   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3311     /*
3312       For CHAR-based columns, check how strxfrm image will take.
3313       field->field_length = field->char_length() * cs->mbmaxlen.
3314     */
3315     const CHARSET_INFO *cs = field->charset();
3316     m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
3317   }
3318   const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3319   const CHARSET_INFO *cs = field->charset();
3320   // max_image_len before chunking is taken into account
3321   const int max_image_len_before_chunks = m_max_image_len;
3322 
3323   if (is_varchar) {
3324     // The default for varchar is variable-length, without space-padding for
3325     // comparisons
3326     m_varchar_charset = cs;
3327     m_skip_func = Rdb_key_def::skip_variable_length;
3328     m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3329     if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3330       m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3331     } else {
3332       // Calculate the maximum size of the short section plus the
3333       // maximum size of the long section
3334       m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3335     }
3336 
3337     const auto field_var = static_cast<const Field_varstring *>(field);
3338     m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3339   }
3340 
3341   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3342     // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3343     // information about character-based datatypes are compared.
3344     bool use_unknown_collation = false;
3345     DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3346                     use_unknown_collation = true;);
3347 
3348     if (cs->number == COLLATION_BINARY) {
3349       // - SQL layer pads BINARY(N) so that it always is N bytes long.
3350       // - For VARBINARY(N), values may have different lengths, so we're using
3351       //   variable-length encoding. This is also the only charset where the
3352       //   values are not space-padded for comparison.
3353       m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
3354                                  : Rdb_key_def::unpack_binary_str;
3355       res = true;
3356     } else if (cs->number == COLLATION_LATIN1_BIN || cs->number == COLLATION_UTF8_BIN) {
3357       // For _bin collations, mem-comparable form of the string is the string
3358       // itself.
3359 
3360       if (is_varchar) {
3361         // VARCHARs - are compared as if they were space-padded - but are
3362         // not actually space-padded (reading the value back produces the
3363         // original value, without the padding)
3364         m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3365         m_skip_func = Rdb_key_def::skip_variable_space_pad;
3366         m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3367         m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
3368         m_segment_size = get_segment_size_from_collation(cs);
3369         m_max_image_len =
3370             (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3371             m_segment_size;
3372         rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3373                                      &space_mb_len);
3374       } else {
3375         // SQL layer pads CHAR(N) values to their maximum length.
3376         // We just store that and restore it back.
3377         m_unpack_func = (cs->number == COLLATION_LATIN1_BIN)
3378                             ? Rdb_key_def::unpack_binary_str
3379                             : Rdb_key_def::unpack_utf8_str;
3380       }
3381       res = true;
3382     } else {
3383       // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3384 
3385       res = true;  // index-only scans are possible
3386       m_unpack_data_len = is_varchar ? 0 : field->field_length;
3387       const uint idx = is_varchar ? 0 : 1;
3388       const Rdb_collation_codec *codec = nullptr;
3389 
3390       if (is_varchar) {
3391         // VARCHAR requires space-padding for doing comparisons
3392         //
3393         // The check for cs->levels_for_order is to catch
3394         // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3395         // that Variable-Length Space Padded Encoding can't handle.
3396         // It is not expected to work for any other multi-level collations,
3397         // either.
3398         // Currently we handle these collations as NO_PAD, even if they have
3399         // PAD_SPACE attribute.
3400         if (cs->levels_for_order == 1) {
3401           m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3402           m_skip_func = Rdb_key_def::skip_variable_space_pad;
3403           m_segment_size = get_segment_size_from_collation(cs);
3404           m_max_image_len =
3405               (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3406               m_segment_size;
3407           rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3408                                        &space_mb_len);
3409         } else {
3410           //  NO_LINT_DEBUG
3411           sql_print_warning(
3412               "RocksDB: you're trying to create an index "
3413               "with a multi-level collation %s",
3414               cs->name);
3415           //  NO_LINT_DEBUG
3416           sql_print_warning(
3417               "MyRocks will handle this collation internally "
3418               " as if it had a NO_PAD attribute.");
3419           m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3420           m_skip_func = Rdb_key_def::skip_variable_length;
3421         }
3422       }
3423 
3424       if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
3425         // The collation allows to store extra information in the unpack_info
3426         // which can be used to restore the original value from the
3427         // mem-comparable form.
3428         m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
3429         m_unpack_func = codec->m_unpack_func[idx];
3430         m_charset_codec = codec;
3431       } else if (use_unknown_collation) {
3432         // We have no clue about how this collation produces mem-comparable
3433         // form. Our way of restoring the original value is to keep a copy of
3434         // the original value in unpack_info.
3435         m_unpack_info_stores_value = true;
3436         m_make_unpack_info_func = is_varchar
3437                                       ? Rdb_key_def::make_unpack_unknown_varchar
3438                                       : Rdb_key_def::make_unpack_unknown;
3439         m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
3440                                    : Rdb_key_def::unpack_unknown;
3441       } else {
3442         // Same as above: we don't know how to restore the value from its
3443         // mem-comparable form.
3444         // Here, we just indicate to the SQL layer we can't do it.
3445         DBUG_ASSERT(m_unpack_func == nullptr);
3446         m_unpack_info_stores_value = false;
3447         res = false;  // Indicate that index-only reads are not possible
3448       }
3449     }
3450 
3451     // Make an adjustment: if this column is partially covered, tell the SQL
3452     // layer we can't do index-only scans. Later when we perform an index read,
3453     // we'll check on a record-by-record basis if we can do an index-only scan
3454     // or not.
3455     uint field_length;
3456     if (field->table) {
3457       field_length = field->table->field[field->field_index]->field_length;
3458     } else {
3459       field_length = field->field_length;
3460     }
3461 
3462     if (field_length != key_length) {
3463       res = false;
3464       // If this index doesn't support covered bitmaps, then we won't know
3465       // during a read if the column is actually covered or not. If so, we need
3466       // to assume the column isn't covered and skip it during unpacking.
3467       //
3468       // If key_descr == NULL, then this is a dummy field and we probably don't
3469       // need to perform this step. However, to preserve the behavior before
3470       // this change, we'll only skip this step if we have an index which
3471       // supports covered bitmaps.
3472       if (!key_descr || !key_descr->use_covered_bitmap_format()) {
3473         m_unpack_func = nullptr;
3474         m_make_unpack_info_func = nullptr;
3475         m_unpack_info_stores_value = true;
3476       }
3477     }
3478   }
3479 
3480   m_covered = res;
3481   return res;
3482 }
3483 
get_field_in_table(const TABLE * const tbl) const3484 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
3485   return tbl->key_info[m_keynr].key_part[m_key_part].field;
3486 }
3487 
fill_hidden_pk_val(uchar ** dst,const longlong hidden_pk_id) const3488 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
3489                                            const longlong hidden_pk_id) const {
3490   DBUG_ASSERT(m_max_image_len == 8);
3491 
3492   String to;
3493   rdb_netstr_append_uint64(&to, hidden_pk_id);
3494   memcpy(*dst, to.ptr(), m_max_image_len);
3495 
3496   *dst += m_max_image_len;
3497 }
3498 
3499 ///////////////////////////////////////////////////////////////////////////////////////////
3500 // Rdb_ddl_manager
3501 ///////////////////////////////////////////////////////////////////////////////////////////
3502 
~Rdb_tbl_def()3503 Rdb_tbl_def::~Rdb_tbl_def() {
3504   auto ddl_manager = rdb_get_ddl_manager();
3505   /* Don't free key definitions */
3506   if (m_key_descr_arr) {
3507     for (uint i = 0; i < m_key_count; i++) {
3508       if (ddl_manager && m_key_descr_arr[i]) {
3509         ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
3510       }
3511 
3512       m_key_descr_arr[i] = nullptr;
3513     }
3514 
3515     delete[] m_key_descr_arr;
3516     m_key_descr_arr = nullptr;
3517   }
3518 }
3519 
3520 /*
3521   Put table definition DDL entry. Actual write is done at
3522   Rdb_dict_manager::commit.
3523 
3524   We write
3525     dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
3526 
3527   Where key entries are a tuple of
3528     ( cf_id, index_nr )
3529 */
3530 
put_dict(Rdb_dict_manager * const dict,rocksdb::WriteBatch * const batch,const rocksdb::Slice & key)3531 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
3532                            rocksdb::WriteBatch *const batch,
3533                            const rocksdb::Slice &key) {
3534   StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
3535   indexes.alloc(Rdb_key_def::VERSION_SIZE +
3536                 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
3537   rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
3538 
3539   for (uint i = 0; i < m_key_count; i++) {
3540     const Rdb_key_def &kd = *m_key_descr_arr[i];
3541 
3542     uchar flags =
3543         (kd.m_is_reverse_cf ? Rdb_key_def::REVERSE_CF_FLAG : 0) |
3544         (kd.m_is_per_partition_cf ? Rdb_key_def::PER_PARTITION_CF_FLAG : 0);
3545 
3546     const uint cf_id = kd.get_cf()->GetID();
3547     /*
3548       If cf_id already exists, cf_flags must be the same.
3549       To prevent race condition, reading/modifying/committing CF flags
3550       need to be protected by mutex (dict_manager->lock()).
3551       When RocksDB supports transaction with pessimistic concurrency
3552       control, we can switch to use it and removing mutex.
3553     */
3554     uint existing_cf_flags;
3555     const std::string cf_name = kd.get_cf()->GetName();
3556 
3557     if (dict->get_cf_flags(cf_id, &existing_cf_flags)) {
3558       // For the purposes of comparison we'll clear the partitioning bit. The
3559       // intent here is to make sure that both partitioned and non-partitioned
3560       // tables can refer to the same CF.
3561       existing_cf_flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3562       flags &= ~Rdb_key_def::CF_FLAGS_TO_IGNORE;
3563 
3564       if (existing_cf_flags != flags) {
3565         my_error(ER_CF_DIFFERENT, MYF(0), cf_name.c_str(), flags,
3566                  existing_cf_flags);
3567         return true;
3568       }
3569     } else {
3570       dict->add_cf_flags(batch, cf_id, flags);
3571     }
3572 
3573     rdb_netstr_append_uint32(&indexes, cf_id);
3574 
3575     uint32 index_number = kd.get_index_number();
3576     rdb_netstr_append_uint32(&indexes, index_number);
3577 
3578     struct Rdb_index_info index_info;
3579     index_info.m_gl_index_id = {cf_id, index_number};
3580     index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
3581     index_info.m_index_type = kd.m_index_type;
3582     index_info.m_kv_version = kd.m_kv_format_version;
3583     index_info.m_index_flags = kd.m_index_flags_bitmap;
3584     index_info.m_ttl_duration = kd.m_ttl_duration;
3585 
3586     dict->add_or_update_index_cf_mapping(batch, &index_info);
3587   }
3588 
3589   const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
3590 
3591   dict->put_key(batch, key, svalue);
3592   return false;
3593 }
3594 
get_create_time()3595 time_t Rdb_tbl_def::get_create_time() {
3596   time_t create_time = m_create_time;
3597 
3598   if (create_time == CREATE_TIME_UNKNOWN) {
3599     // Read it from the .frm file. It's not a problem if several threads do this
3600     // concurrently
3601     char path[FN_REFLEN];
3602     snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
3603              m_dbname.c_str(), m_tablename.c_str(), reg_ext);
3604     unpack_filename(path,path);
3605     MY_STAT f_stat;
3606     if (my_stat(path, &f_stat, MYF(0)))
3607       create_time = f_stat.st_ctime;
3608     else
3609       create_time = 0; // will be shown as SQL NULL
3610     m_create_time = create_time;
3611   }
3612   return create_time;
3613 }
3614 
3615 // Length that each index flag takes inside the record.
3616 // Each index in the array maps to the enum INDEX_FLAG
3617 static const std::array<uint, 1> index_flag_lengths = {
3618     {ROCKSDB_SIZEOF_TTL_RECORD}};
3619 
has_index_flag(uint32 index_flags,enum INDEX_FLAG flag)3620 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
3621   return flag & index_flags;
3622 }
3623 
calculate_index_flag_offset(uint32 index_flags,enum INDEX_FLAG flag,uint * const length)3624 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
3625                                                 enum INDEX_FLAG flag,
3626                                                 uint *const length) {
3627   DBUG_ASSERT_IMP(flag != MAX_FLAG,
3628                   Rdb_key_def::has_index_flag(index_flags, flag));
3629 
3630   uint offset = 0;
3631   for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
3632     int mask = 1 << bit;
3633 
3634     /* Exit once we've reached the proper flag */
3635     if (flag & mask) {
3636       if (length != nullptr) {
3637         *length = index_flag_lengths[bit];
3638       }
3639       break;
3640     }
3641 
3642     if (index_flags & mask) {
3643       offset += index_flag_lengths[bit];
3644     }
3645   }
3646 
3647   return offset;
3648 }
3649 
write_index_flag_field(Rdb_string_writer * const buf,const uchar * const val,enum INDEX_FLAG flag) const3650 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
3651                                          const uchar *const val,
3652                                          enum INDEX_FLAG flag) const {
3653   uint len;
3654   uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
3655   DBUG_ASSERT(offset + len <= buf->get_current_pos());
3656   memcpy(buf->ptr() + offset, val, len);
3657 }
3658 
check_if_is_mysql_system_table()3659 void Rdb_tbl_def::check_if_is_mysql_system_table() {
3660   static const char *const system_dbs[] = {
3661       "mysql",
3662       "performance_schema",
3663       "information_schema",
3664   };
3665 
3666   m_is_mysql_system_table = false;
3667   for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
3668     if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
3669       m_is_mysql_system_table = true;
3670       break;
3671     }
3672   }
3673 }
3674 
check_and_set_read_free_rpl_table()3675 void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
3676   m_is_read_free_rpl_table =
3677 #if 0 // MARIAROCKS_NOT_YET : read-free replication is not supported
3678       rdb_read_free_regex_handler.matches(base_tablename());
3679 #else
3680       false;
3681 #endif
3682 }
3683 
set_name(const std::string & name)3684 void Rdb_tbl_def::set_name(const std::string &name) {
3685   int err MY_ATTRIBUTE((__unused__));
3686 
3687   m_dbname_tablename = name;
3688   err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
3689                                        &m_partition);
3690   DBUG_ASSERT(err == 0);
3691 
3692   check_if_is_mysql_system_table();
3693 }
3694 
get_autoincr_gl_index_id()3695 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
3696   for (uint i = 0; i < m_key_count; i++) {
3697     auto &k = m_key_descr_arr[i];
3698     if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
3699         k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
3700       return k->get_gl_index_id();
3701     }
3702   }
3703 
3704   // Every table must have a primary key, even if it's hidden.
3705   abort();
3706   return GL_INDEX_ID();
3707 }
3708 
erase_index_num(const GL_INDEX_ID & gl_index_id)3709 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
3710   m_index_num_to_keydef.erase(gl_index_id);
3711 }
3712 
add_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3713 void Rdb_ddl_manager::add_uncommitted_keydefs(
3714     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3715   mysql_rwlock_wrlock(&m_rwlock);
3716   for (const auto &index : indexes) {
3717     m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
3718   }
3719   mysql_rwlock_unlock(&m_rwlock);
3720 }
3721 
remove_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)3722 void Rdb_ddl_manager::remove_uncommitted_keydefs(
3723     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
3724   mysql_rwlock_wrlock(&m_rwlock);
3725   for (const auto &index : indexes) {
3726     m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
3727   }
3728   mysql_rwlock_unlock(&m_rwlock);
3729 }
3730 
3731 namespace  // anonymous namespace = not visible outside this source file
3732 {
3733 struct Rdb_validate_tbls : public Rdb_tables_scanner {
3734   using tbl_info_t = std::pair<std::string, bool>;
3735   using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
3736 
3737   tbl_list_t m_list;
3738 
3739   int add_table(Rdb_tbl_def *tdef) override;
3740 
3741   bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
3742 
3743   bool scan_for_frms(const std::string &datadir, const std::string &dbname,
3744                      bool *has_errors);
3745 
3746   bool check_frm_file(const std::string &fullpath, const std::string &dbname,
3747                       const std::string &tablename, bool *has_errors);
3748 };
3749 }  // anonymous namespace
3750 
3751 /*
3752   Get a list of tables that we expect to have .frm files for.  This will use the
3753   information just read from the RocksDB data dictionary.
3754 */
add_table(Rdb_tbl_def * tdef)3755 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
3756   DBUG_ASSERT(tdef != nullptr);
3757 
3758   /* Add the database/table into the list that are not temp table */
3759   if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos) {
3760     bool is_partition = tdef->base_partition().size() != 0;
3761     m_list[tdef->base_dbname()].insert(
3762         tbl_info_t(tdef->base_tablename(), is_partition));
3763   }
3764 
3765   return HA_EXIT_SUCCESS;
3766 }
3767 
3768 /*
3769   Access the .frm file for this dbname/tablename and see if it is a RocksDB
3770   table (or partition table).
3771 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)3772 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
3773                                        const std::string &dbname,
3774                                        const std::string &tablename,
3775                                        bool *has_errors) {
3776   /* Check this .frm file to see what engine it uses */
3777   String fullfilename(fullpath.c_str(), &my_charset_bin);
3778   fullfilename.append(FN_DIRSEP);
3779   fullfilename.append(tablename.c_str());
3780   fullfilename.append(".frm");
3781 
3782   /*
3783     This function will return the legacy_db_type of the table.  Currently
3784     it does not reference the first parameter (THD* thd), but if it ever
3785     did in the future we would need to make a version that does it without
3786     the connection handle as we don't have one here.
3787   */
3788   char eng_type_buf[NAME_CHAR_LEN+1];
3789   LEX_CSTRING eng_type_str = {eng_type_buf, 0};
3790   bool is_sequence;
3791   enum Table_type type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type_str, &is_sequence);
3792   if (type == TABLE_TYPE_UNKNOWN) {
3793     // NO_LINT_DEBUG
3794     sql_print_warning("RocksDB: Failed to open/read .from file: %s",
3795                       fullfilename.ptr());
3796     return false;
3797   }
3798 
3799   if (type == TABLE_TYPE_NORMAL) {
3800     /* For a RocksDB table do we have a reference in the data dictionary? */
3801     if (!strncmp(eng_type_str.str, "ROCKSDB", eng_type_str.length)) {
3802       /*
3803         Attempt to remove the table entry from the list of tables.  If this
3804         fails then we know we had a .frm file that wasn't registered in RocksDB.
3805       */
3806       tbl_info_t element(tablename, false);
3807       if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
3808         // NO_LINT_DEBUG
3809         sql_print_warning(
3810             "RocksDB: Schema mismatch - "
3811             "A .frm file exists for table %s.%s, "
3812             "but that table is not registered in RocksDB",
3813             dbname.c_str(), tablename.c_str());
3814         *has_errors = true;
3815       }
3816     } else if (!strncmp(eng_type_str.str, "partition", eng_type_str.length)) {
3817       /*
3818         For partition tables, see if it is in the m_list as a partition,
3819         but don't generate an error if it isn't there - we don't know that the
3820         .frm is for RocksDB.
3821       */
3822       if (m_list.count(dbname) > 0) {
3823         m_list[dbname].erase(tbl_info_t(tablename, true));
3824       }
3825     }
3826   }
3827 
3828   return true;
3829 }
3830 
3831 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)3832 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
3833                                       const std::string &dbname,
3834                                       bool *has_errors) {
3835   bool result = true;
3836   std::string fullpath = datadir + dbname;
3837   struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
3838 
3839   /* Access the directory */
3840   if (dir_info == nullptr) {
3841     // NO_LINT_DEBUG
3842     sql_print_warning("RocksDB: Could not open database directory: %s",
3843                       fullpath.c_str());
3844     return false;
3845   }
3846 
3847   /* Scan through the files in the directory */
3848   struct fileinfo *file_info = dir_info->dir_entry;
3849   for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3850     /* Find .frm files that are not temp files (those that contain '#sql') */
3851     const char *ext = strrchr(file_info->name, '.');
3852     if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
3853         strcmp(ext, ".frm") == 0) {
3854       std::string tablename =
3855           std::string(file_info->name, ext - file_info->name);
3856 
3857       /* Check to see if the .frm file is from RocksDB */
3858       if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
3859         result = false;
3860         break;
3861       }
3862     }
3863   }
3864 
3865   /* Remove any databases who have no more tables listed */
3866   if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
3867     m_list.erase(dbname);
3868   }
3869 
3870   /* Release the directory entry */
3871   my_dirend(dir_info);
3872 
3873   return result;
3874 }
3875 
3876 /*
3877   Scan the datadir for all databases (subdirectories) and get a list of .frm
3878   files they contain
3879 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)3880 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
3881                                                  bool *has_errors) {
3882   bool result = true;
3883   struct st_my_dir *dir_info;
3884   struct fileinfo *file_info;
3885 
3886   dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
3887   if (dir_info == nullptr) {
3888     // NO_LINT_DEBUG
3889     sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
3890     return false;
3891   }
3892 
3893   file_info = dir_info->dir_entry;
3894   for (uint ii = 0; ii < dir_info->number_of_files; ii++, file_info++) {
3895     /* Ignore files/dirs starting with '.' */
3896     if (file_info->name[0] == '.') continue;
3897 
3898     /* Ignore all non-directory files */
3899     if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
3900 
3901     /* Scan all the .frm files in the directory */
3902     if (!scan_for_frms(datadir, file_info->name, has_errors)) {
3903       result = false;
3904       break;
3905     }
3906   }
3907 
3908   /* Release the directory info */
3909   my_dirend(dir_info);
3910 
3911   return result;
3912 }
3913 
3914 /*
3915   Validate that all auto increment values in the data dictionary are on a
3916   supported version.
3917 */
validate_auto_incr()3918 bool Rdb_ddl_manager::validate_auto_incr() {
3919   std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
3920 
3921   uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
3922   rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
3923   const rocksdb::Slice auto_incr_entry_slice(
3924       reinterpret_cast<char *>(auto_incr_entry),
3925       Rdb_key_def::INDEX_NUMBER_SIZE);
3926   for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
3927     const rocksdb::Slice key = it->key();
3928     const rocksdb::Slice val = it->value();
3929     GL_INDEX_ID gl_index_id;
3930 
3931     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
3932         memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
3933       break;
3934     }
3935 
3936     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
3937       return false;
3938     }
3939 
3940     if (val.size() <= Rdb_key_def::VERSION_SIZE) {
3941       return false;
3942     }
3943 
3944     // Check if we have orphaned entries for whatever reason by cross
3945     // referencing ddl entries.
3946     auto ptr = reinterpret_cast<const uchar *>(key.data());
3947     ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
3948     rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
3949     if (!m_dict->get_index_info(gl_index_id, nullptr)) {
3950       // NO_LINT_DEBUG
3951       sql_print_warning(
3952           "RocksDB: AUTOINC mismatch - "
3953           "Index number (%u, %u) found in AUTOINC "
3954           "but does not exist as a DDL entry",
3955           gl_index_id.cf_id, gl_index_id.index_id);
3956       return false;
3957     }
3958 
3959     ptr = reinterpret_cast<const uchar *>(val.data());
3960     const int version = rdb_netbuf_read_uint16(&ptr);
3961     if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
3962       // NO_LINT_DEBUG
3963       sql_print_warning(
3964           "RocksDB: AUTOINC mismatch - "
3965           "Index number (%u, %u) found in AUTOINC "
3966           "is on unsupported version %d",
3967           gl_index_id.cf_id, gl_index_id.index_id, version);
3968       return false;
3969     }
3970   }
3971 
3972   if (!it->status().ok()) {
3973     return false;
3974   }
3975 
3976   return true;
3977 }
3978 
3979 /*
3980   Validate that all the tables in the RocksDB database dictionary match the .frm
3981   files in the datadir
3982 */
validate_schemas(void)3983 bool Rdb_ddl_manager::validate_schemas(void) {
3984   bool has_errors = false;
3985   const std::string datadir = std::string(mysql_real_data_home);
3986   Rdb_validate_tbls table_list;
3987 
3988   /* Get the list of tables from the database dictionary */
3989   if (scan_for_tables(&table_list) != 0) {
3990     return false;
3991   }
3992 
3993   /* Compare that to the list of actual .frm files */
3994   if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
3995     return false;
3996   }
3997 
3998   /*
3999     Any tables left in the tables list are ones that are registered in RocksDB
4000     but don't have .frm files.
4001   */
4002   for (const auto &db : table_list.m_list) {
4003     for (const auto &table : db.second) {
4004       // NO_LINT_DEBUG
4005       sql_print_warning(
4006           "RocksDB: Schema mismatch - "
4007           "Table %s.%s is registered in RocksDB "
4008           "but does not have a .frm file",
4009           db.first.c_str(), table.first.c_str());
4010       has_errors = true;
4011     }
4012   }
4013 
4014   return !has_errors;
4015 }
4016 
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t validate_tables)4017 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4018                            Rdb_cf_manager *const cf_manager,
4019                            const uint32_t validate_tables) {
4020   m_dict = dict_arg;
4021   mysql_rwlock_init(0, &m_rwlock);
4022 
4023   /* Read the data dictionary and populate the hash */
4024   uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4025   rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4026   const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
4027                                        Rdb_key_def::INDEX_NUMBER_SIZE);
4028 
4029   /* Reading data dictionary should always skip bloom filter */
4030   rocksdb::Iterator *it = m_dict->new_iterator();
4031   int i = 0;
4032 
4033   uint max_index_id_in_dict = 0;
4034   m_dict->get_max_index_id(&max_index_id_in_dict);
4035 
4036   for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
4037     const uchar *ptr;
4038     const uchar *ptr_end;
4039     const rocksdb::Slice key = it->key();
4040     const rocksdb::Slice val = it->value();
4041 
4042     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4043         memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4044       break;
4045     }
4046 
4047     if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
4048       // NO_LINT_DEBUG
4049       sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
4050                       (int)key.size());
4051       return true;
4052     }
4053 
4054     Rdb_tbl_def *const tdef =
4055         new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
4056 
4057     // Now, read the DDLs.
4058     const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
4059     if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
4060       // NO_LINT_DEBUG
4061       sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
4062                       tdef->full_tablename().c_str());
4063       return true;
4064     }
4065     tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
4066     tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
4067 
4068     ptr = reinterpret_cast<const uchar *>(val.data());
4069     const int version = rdb_netbuf_read_uint16(&ptr);
4070     if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
4071       // NO_LINT_DEBUG
4072       sql_print_error(
4073           "RocksDB: DDL ENTRY Version was not expected."
4074           "Expected: %d, Actual: %d",
4075           Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
4076       return true;
4077     }
4078     ptr_end = ptr + real_val_size;
4079     for (uint keyno = 0; ptr < ptr_end; keyno++) {
4080       GL_INDEX_ID gl_index_id;
4081       rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4082       uint flags = 0;
4083       struct Rdb_index_info index_info;
4084       if (!m_dict->get_index_info(gl_index_id, &index_info)) {
4085         // NO_LINT_DEBUG
4086         sql_print_error(
4087             "RocksDB: Could not get index information "
4088             "for Index Number (%u,%u), table %s",
4089             gl_index_id.cf_id, gl_index_id.index_id,
4090             tdef->full_tablename().c_str());
4091         return true;
4092       }
4093       if (max_index_id_in_dict < gl_index_id.index_id) {
4094         // NO_LINT_DEBUG
4095         sql_print_error(
4096             "RocksDB: Found max index id %u from data dictionary "
4097             "but also found larger index id %u from dictionary. "
4098             "This should never happen and possibly a bug.",
4099             max_index_id_in_dict, gl_index_id.index_id);
4100         return true;
4101       }
4102       if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
4103         // NO_LINT_DEBUG
4104         sql_print_error(
4105             "RocksDB: Could not get Column Family Flags "
4106             "for CF Number %d, table %s",
4107             gl_index_id.cf_id, tdef->full_tablename().c_str());
4108         return true;
4109       }
4110 
4111       if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
4112         // The per-index cf option is deprecated.  Make sure we don't have the
4113         // flag set in any existing database.   NO_LINT_DEBUG
4114         // NO_LINT_DEBUG
4115         sql_print_error(
4116             "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
4117             "number %d, table %s",
4118             gl_index_id.cf_id, tdef->full_tablename().c_str());
4119       }
4120 
4121       rocksdb::ColumnFamilyHandle *const cfh =
4122           cf_manager->get_cf(gl_index_id.cf_id);
4123       DBUG_ASSERT(cfh != nullptr);
4124 
4125       uint32 ttl_rec_offset =
4126           Rdb_key_def::has_index_flag(index_info.m_index_flags,
4127                                       Rdb_key_def::TTL_FLAG)
4128               ? Rdb_key_def::calculate_index_flag_offset(
4129                     index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
4130               : UINT_MAX;
4131 
4132       /*
4133         We can't fully initialize Rdb_key_def object here, because full
4134         initialization requires that there is an open TABLE* where we could
4135         look at Field* objects and set max_length and other attributes
4136       */
4137       tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
4138           gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
4139           index_info.m_index_type, index_info.m_kv_version,
4140           flags & Rdb_key_def::REVERSE_CF_FLAG,
4141           flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
4142           m_dict->get_stats(gl_index_id), index_info.m_index_flags,
4143           ttl_rec_offset, index_info.m_ttl_duration);
4144     }
4145     put(tdef);
4146     i++;
4147   }
4148 
4149   /*
4150     If validate_tables is greater than 0 run the validation.  Only fail the
4151     initialzation if the setting is 1.  If the setting is 2 we continue.
4152   */
4153   if (validate_tables > 0) {
4154     std::string msg;
4155     if (!validate_schemas()) {
4156       msg =
4157           "RocksDB: Problems validating data dictionary "
4158           "against .frm files, exiting";
4159     } else if (!validate_auto_incr()) {
4160       msg =
4161           "RocksDB: Problems validating auto increment values in "
4162           "data dictionary, exiting";
4163     }
4164     if (validate_tables == 1 && !msg.empty()) {
4165       // NO_LINT_DEBUG
4166       sql_print_error("%s", msg.c_str());
4167       return true;
4168     }
4169   }
4170 
4171   // index ids used by applications should not conflict with
4172   // data dictionary index ids
4173   if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4174     max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4175   }
4176 
4177   m_sequence.init(max_index_id_in_dict + 1);
4178 
4179   if (!it->status().ok()) {
4180     rdb_log_status_error(it->status(), "Table_store load error");
4181     return true;
4182   }
4183   delete it;
4184   // NO_LINT_DEBUG
4185   sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4186                         i);
4187   return false;
4188 }
4189 
find(const std::string & table_name,const bool lock)4190 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4191                                    const bool lock) {
4192   if (lock) {
4193     mysql_rwlock_rdlock(&m_rwlock);
4194   }
4195 
4196   Rdb_tbl_def *rec = nullptr;
4197   const auto it = m_ddl_map.find(table_name);
4198   if (it != m_ddl_map.end()) {
4199     rec = it->second;
4200   }
4201 
4202   if (lock) {
4203     mysql_rwlock_unlock(&m_rwlock);
4204   }
4205 
4206   return rec;
4207 }
4208 
4209 // this is a safe version of the find() function below.  It acquires a read
4210 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4211 // are finding it.  Copying it into 'ret' increments the count making sure
4212 // that the object will not be discarded until we are finished with it.
safe_find(GL_INDEX_ID gl_index_id)4213 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
4214     GL_INDEX_ID gl_index_id) {
4215   std::shared_ptr<const Rdb_key_def> ret(nullptr);
4216 
4217   mysql_rwlock_rdlock(&m_rwlock);
4218 
4219   auto it = m_index_num_to_keydef.find(gl_index_id);
4220   if (it != m_index_num_to_keydef.end()) {
4221     const auto table_def = find(it->second.first, false);
4222     if (table_def && it->second.second < table_def->m_key_count) {
4223       const auto &kd = table_def->m_key_descr_arr[it->second.second];
4224       if (kd->max_storage_fmt_length() != 0) {
4225         ret = kd;
4226       }
4227     }
4228   } else {
4229     auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4230     if (it != m_index_num_to_uncommitted_keydef.end()) {
4231       const auto &kd = it->second;
4232       if (kd->max_storage_fmt_length() != 0) {
4233         ret = kd;
4234       }
4235     }
4236   }
4237 
4238   mysql_rwlock_unlock(&m_rwlock);
4239 
4240   return ret;
4241 }
4242 
4243 // this method assumes at least read-only lock on m_rwlock
find(GL_INDEX_ID gl_index_id)4244 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
4245     GL_INDEX_ID gl_index_id) {
4246   auto it = m_index_num_to_keydef.find(gl_index_id);
4247   if (it != m_index_num_to_keydef.end()) {
4248     auto table_def = find(it->second.first, false);
4249     if (table_def) {
4250       if (it->second.second < table_def->m_key_count) {
4251         return table_def->m_key_descr_arr[it->second.second];
4252       }
4253     }
4254   } else {
4255     auto it = m_index_num_to_uncommitted_keydef.find(gl_index_id);
4256     if (it != m_index_num_to_uncommitted_keydef.end()) {
4257       return it->second;
4258     }
4259   }
4260 
4261   static std::shared_ptr<Rdb_key_def> empty = nullptr;
4262 
4263   return empty;
4264 }
4265 
4266 // this method returns the name of the table based on an index id. It acquires
4267 // a read lock on m_rwlock.
safe_get_table_name(const GL_INDEX_ID & gl_index_id)4268 const std::string Rdb_ddl_manager::safe_get_table_name(
4269     const GL_INDEX_ID &gl_index_id) {
4270   std::string ret;
4271   mysql_rwlock_rdlock(&m_rwlock);
4272   auto it = m_index_num_to_keydef.find(gl_index_id);
4273   if (it != m_index_num_to_keydef.end()) {
4274     ret = it->second.first;
4275   }
4276   mysql_rwlock_unlock(&m_rwlock);
4277   return ret;
4278 }
4279 
set_stats(const std::unordered_map<GL_INDEX_ID,Rdb_index_stats> & stats)4280 void Rdb_ddl_manager::set_stats(
4281     const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4282   mysql_rwlock_wrlock(&m_rwlock);
4283   for (auto src : stats) {
4284     const auto &keydef = find(src.second.m_gl_index_id);
4285     if (keydef) {
4286       keydef->m_stats = src.second;
4287       m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4288     }
4289   }
4290   mysql_rwlock_unlock(&m_rwlock);
4291 }
4292 
adjust_stats(const std::vector<Rdb_index_stats> & new_data,const std::vector<Rdb_index_stats> & deleted_data)4293 void Rdb_ddl_manager::adjust_stats(
4294     const std::vector<Rdb_index_stats> &new_data,
4295     const std::vector<Rdb_index_stats> &deleted_data) {
4296   mysql_rwlock_wrlock(&m_rwlock);
4297   int i = 0;
4298   for (const auto &data : {new_data, deleted_data}) {
4299     for (const auto &src : data) {
4300       const auto &keydef = find(src.m_gl_index_id);
4301       if (keydef) {
4302         keydef->m_stats.m_distinct_keys_per_prefix.resize(
4303             keydef->get_key_parts());
4304         keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4305         m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4306       }
4307     }
4308     i++;
4309   }
4310   const bool should_save_stats = !m_stats2store.empty();
4311   mysql_rwlock_unlock(&m_rwlock);
4312   if (should_save_stats) {
4313     // Queue an async persist_stats(false) call to the background thread.
4314     rdb_queue_save_stats_request();
4315   }
4316 }
4317 
persist_stats(const bool sync)4318 void Rdb_ddl_manager::persist_stats(const bool sync) {
4319   mysql_rwlock_wrlock(&m_rwlock);
4320   const auto local_stats2store = std::move(m_stats2store);
4321   m_stats2store.clear();
4322   mysql_rwlock_unlock(&m_rwlock);
4323 
4324   // Persist stats
4325   const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4326   std::vector<Rdb_index_stats> stats;
4327   std::transform(local_stats2store.begin(), local_stats2store.end(),
4328                  std::back_inserter(stats),
4329                  [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4330                    return s.second;
4331                  });
4332   m_dict->add_stats(wb.get(), stats);
4333   m_dict->commit(wb.get(), sync);
4334 }
4335 
4336 /*
4337   Put table definition of `tbl` into the mapping, and also write it to the
4338   on-disk data dictionary.
4339 */
4340 
put_and_write(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch)4341 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
4342                                    rocksdb::WriteBatch *const batch) {
4343   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
4344 
4345   buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4346 
4347   const std::string &dbname_tablename = tbl->full_tablename();
4348   buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4349 
4350   int res;
4351   if ((res = tbl->put_dict(m_dict, batch, buf_writer.to_slice()))) {
4352     return res;
4353   }
4354   if ((res = put(tbl))) {
4355     return res;
4356   }
4357   return HA_EXIT_SUCCESS;
4358 }
4359 
4360 /* Return 0 - ok, other value - error */
4361 /* TODO:
4362   This function modifies m_ddl_map and m_index_num_to_keydef.
4363   However, these changes need to be reversed if dict_manager.commit fails
4364   See the discussion here: https://reviews.facebook.net/D35925#inline-259167
4365   Tracked by https://github.com/facebook/mysql-5.6/issues/33
4366 */
put(Rdb_tbl_def * const tbl,const bool lock)4367 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
4368   Rdb_tbl_def *rec;
4369   const std::string &dbname_tablename = tbl->full_tablename();
4370 
4371   if (lock) mysql_rwlock_wrlock(&m_rwlock);
4372 
4373   // We have to do this find because 'tbl' is not yet in the list.  We need
4374   // to find the one we are replacing ('rec')
4375   rec = find(dbname_tablename, false);
4376   if (rec) {
4377     // Free the old record.
4378     delete rec;
4379     m_ddl_map.erase(dbname_tablename);
4380   }
4381   m_ddl_map.emplace(dbname_tablename, tbl);
4382 
4383   for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
4384     m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
4385         std::make_pair(dbname_tablename, keyno);
4386   }
4387   tbl->check_and_set_read_free_rpl_table();
4388 
4389   if (lock) mysql_rwlock_unlock(&m_rwlock);
4390   return 0;
4391 }
4392 
remove(Rdb_tbl_def * const tbl,rocksdb::WriteBatch * const batch,const bool lock)4393 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
4394                              rocksdb::WriteBatch *const batch,
4395                              const bool lock) {
4396   if (lock) mysql_rwlock_wrlock(&m_rwlock);
4397 
4398   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
4399   key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4400   const std::string &dbname_tablename = tbl->full_tablename();
4401   key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4402 
4403   m_dict->delete_key(batch, key_writer.to_slice());
4404 
4405   const auto it = m_ddl_map.find(dbname_tablename);
4406   if (it != m_ddl_map.end()) {
4407     // Free Rdb_tbl_def
4408     delete it->second;
4409 
4410     m_ddl_map.erase(it);
4411   }
4412 
4413   if (lock) mysql_rwlock_unlock(&m_rwlock);
4414 }
4415 
rename(const std::string & from,const std::string & to,rocksdb::WriteBatch * const batch)4416 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
4417                              rocksdb::WriteBatch *const batch) {
4418   Rdb_tbl_def *rec;
4419   Rdb_tbl_def *new_rec;
4420   bool res = true;
4421   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
4422 
4423   mysql_rwlock_wrlock(&m_rwlock);
4424   if (!(rec = find(from, false))) {
4425     mysql_rwlock_unlock(&m_rwlock);
4426     return true;
4427   }
4428 
4429   new_rec = new Rdb_tbl_def(to);
4430 
4431   new_rec->m_key_count = rec->m_key_count;
4432   new_rec->m_auto_incr_val =
4433       rec->m_auto_incr_val.load(std::memory_order_relaxed);
4434   new_rec->m_key_descr_arr = rec->m_key_descr_arr;
4435 
4436   new_rec->m_hidden_pk_val =
4437       rec->m_hidden_pk_val.load(std::memory_order_relaxed);
4438 
4439   // so that it's not free'd when deleting the old rec
4440   rec->m_key_descr_arr = nullptr;
4441 
4442   // Create a new key
4443   new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4444 
4445   const std::string &dbname_tablename = new_rec->full_tablename();
4446   new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
4447 
4448   // Create a key to add
4449   if (!new_rec->put_dict(m_dict, batch, new_buf_writer.to_slice())) {
4450     remove(rec, batch, false);
4451     put(new_rec, false);
4452     res = false;  // ok
4453   }
4454 
4455   mysql_rwlock_unlock(&m_rwlock);
4456   return res;
4457 }
4458 
cleanup()4459 void Rdb_ddl_manager::cleanup() {
4460   for (const auto &kv : m_ddl_map) {
4461     delete kv.second;
4462   }
4463   m_ddl_map.clear();
4464 
4465   mysql_rwlock_destroy(&m_rwlock);
4466   m_sequence.cleanup();
4467 }
4468 
scan_for_tables(Rdb_tables_scanner * const tables_scanner)4469 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
4470   int ret;
4471   Rdb_tbl_def *rec;
4472 
4473   DBUG_ASSERT(tables_scanner != nullptr);
4474 
4475   mysql_rwlock_rdlock(&m_rwlock);
4476 
4477   ret = 0;
4478 
4479   for (const auto &kv : m_ddl_map) {
4480     rec = kv.second;
4481     ret = tables_scanner->add_table(rec);
4482     if (ret) break;
4483   }
4484 
4485   mysql_rwlock_unlock(&m_rwlock);
4486   return ret;
4487 }
4488 
4489 /*
4490   Rdb_binlog_manager class implementation
4491 */
4492 
init(Rdb_dict_manager * const dict_arg)4493 bool Rdb_binlog_manager::init(Rdb_dict_manager *const dict_arg) {
4494   DBUG_ASSERT(dict_arg != nullptr);
4495   m_dict = dict_arg;
4496 
4497   m_key_writer.reset();
4498   m_key_writer.write_index(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER);
4499   m_key_slice = m_key_writer.to_slice();
4500   return false;
4501 }
4502 
cleanup()4503 void Rdb_binlog_manager::cleanup() {}
4504 
4505 /**
4506   Set binlog name, pos and optionally gtid into WriteBatch.
4507   This function should be called as part of transaction commit,
4508   since binlog info is set only at transaction commit.
4509   Actual write into RocksDB is not done here, so checking if
4510   write succeeded or not is not possible here.
4511   @param binlog_name   Binlog name
4512   @param binlog_pos    Binlog pos
4513   @param batch         WriteBatch
4514 */
update(const char * const binlog_name,const my_off_t binlog_pos,rocksdb::WriteBatchBase * const batch)4515 void Rdb_binlog_manager::update(const char *const binlog_name,
4516                                 const my_off_t binlog_pos,
4517                                 rocksdb::WriteBatchBase *const batch) {
4518   if (binlog_name && binlog_pos) {
4519     // max binlog length (512) + binlog pos (4) + binlog gtid (57) < 1024
4520     const size_t RDB_MAX_BINLOG_INFO_LEN = 1024;
4521     Rdb_buf_writer<RDB_MAX_BINLOG_INFO_LEN> value_writer;
4522 
4523     // store version
4524     value_writer.write_uint16(Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION);
4525 
4526     // store binlog file name length
4527     DBUG_ASSERT(strlen(binlog_name) <= FN_REFLEN);
4528     const uint16_t binlog_name_len = strlen(binlog_name);
4529     value_writer.write_uint16(binlog_name_len);
4530 
4531     // store binlog file name
4532     value_writer.write(binlog_name, binlog_name_len);
4533 
4534     // store binlog pos
4535     value_writer.write_uint32(binlog_pos);
4536 
4537 #ifdef MARIADB_MERGE_2019
4538     // store binlog gtid length.
4539     // If gtid was not set, store 0 instead
4540     const uint16_t binlog_max_gtid_len =
4541         binlog_max_gtid ? strlen(binlog_max_gtid) : 0;
4542     value_writer.write_uint16(binlog_max_gtid_len);
4543 
4544     if (binlog_max_gtid_len > 0) {
4545       // store binlog gtid
4546       value_writer.write(binlog_max_gtid, binlog_max_gtid_len);
4547     }
4548 #endif
4549 
4550     m_dict->put_key(batch, m_key_slice, value_writer.to_slice());
4551   }
4552 }
4553 
4554 /**
4555   Read binlog committed entry stored in RocksDB, then unpack
4556   @param[OUT] binlog_name  Binlog name
4557   @param[OUT] binlog_pos   Binlog pos
4558   @param[OUT] binlog_gtid  Binlog GTID
4559   @return
4560     true is binlog info was found (valid behavior)
4561     false otherwise
4562 */
read(char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4563 bool Rdb_binlog_manager::read(char *const binlog_name,
4564                               my_off_t *const binlog_pos,
4565                               char *const binlog_gtid) const {
4566   bool ret = false;
4567   if (binlog_name) {
4568     std::string value;
4569     rocksdb::Status status = m_dict->get_value(m_key_slice, &value);
4570     if (status.ok()) {
4571       if (!unpack_value((const uchar *)value.c_str(), value.size(), binlog_name, binlog_pos,
4572                         binlog_gtid)) {
4573         ret = true;
4574       }
4575     }
4576   }
4577   return ret;
4578 }
4579 
4580 /**
4581   Unpack value then split into binlog_name, binlog_pos (and binlog_gtid)
4582   @param[IN]  value        Binlog state info fetched from RocksDB
4583   @param[OUT] binlog_name  Binlog name
4584   @param[OUT] binlog_pos   Binlog pos
4585   @param[OUT] binlog_gtid  Binlog GTID
4586   @return     true on error
4587 */
unpack_value(const uchar * const value,size_t value_size_arg,char * const binlog_name,my_off_t * const binlog_pos,char * const binlog_gtid) const4588 bool Rdb_binlog_manager::unpack_value(const uchar *const value,
4589                                       size_t value_size_arg,
4590                                       char *const binlog_name,
4591                                       my_off_t *const binlog_pos,
4592                                       char *const binlog_gtid) const {
4593   uint pack_len = 0;
4594   intmax_t value_size= value_size_arg;
4595 
4596   DBUG_ASSERT(binlog_pos != nullptr);
4597 
4598   if ((value_size -= Rdb_key_def::VERSION_SIZE) < 0)
4599     return true;
4600   // read version
4601   const uint16_t version = rdb_netbuf_to_uint16(value);
4602 
4603   pack_len += Rdb_key_def::VERSION_SIZE;
4604   if (version != Rdb_key_def::BINLOG_INFO_INDEX_NUMBER_VERSION) return true;
4605 
4606   if ((value_size -= sizeof(uint16)) < 0)
4607     return true;
4608 
4609   // read binlog file name length
4610   const uint16_t binlog_name_len = rdb_netbuf_to_uint16(value + pack_len);
4611   pack_len += sizeof(uint16);
4612 
4613   if (binlog_name_len >= (FN_REFLEN+1))
4614     return true;
4615 
4616   if ((value_size -= binlog_name_len) < 0)
4617     return true;
4618 
4619   if (binlog_name_len) {
4620     // read and set binlog name
4621     memcpy(binlog_name, value + pack_len, binlog_name_len);
4622     binlog_name[binlog_name_len] = '\0';
4623     pack_len += binlog_name_len;
4624 
4625     if ((value_size -= sizeof(uint32)) < 0)
4626       return true;
4627     // read and set binlog pos
4628     *binlog_pos = rdb_netbuf_to_uint32(value + pack_len);
4629     pack_len += sizeof(uint32);
4630 
4631     if ((value_size -= sizeof(uint16)) < 0)
4632       return true;
4633     // read gtid length
4634     const uint16_t binlog_gtid_len = rdb_netbuf_to_uint16(value + pack_len);
4635     pack_len += sizeof(uint16);
4636 
4637     if (binlog_gtid_len >= GTID_BUF_LEN)
4638       return true;
4639     if ((value_size -= binlog_gtid_len) < 0)
4640       return true;
4641 
4642     if (binlog_gtid && binlog_gtid_len > 0) {
4643       // read and set gtid
4644       memcpy(binlog_gtid, value + pack_len, binlog_gtid_len);
4645       binlog_gtid[binlog_gtid_len] = '\0';
4646       pack_len += binlog_gtid_len;
4647     }
4648   }
4649   return false;
4650 }
4651 
4652 /**
4653   Inserts a row into mysql.slave_gtid_info table. Doing this inside
4654   storage engine is more efficient than inserting/updating through MySQL.
4655 
4656   @param[IN] id Primary key of the table.
4657   @param[IN] db Database name. This is column 2 of the table.
4658   @param[IN] gtid Gtid in human readable form. This is column 3 of the table.
4659   @param[IN] write_batch Handle to storage engine writer.
4660 */
update_slave_gtid_info(const uint id,const char * const db,const char * const gtid,rocksdb::WriteBatchBase * const write_batch)4661 void Rdb_binlog_manager::update_slave_gtid_info(
4662     const uint id, const char *const db, const char *const gtid,
4663     rocksdb::WriteBatchBase *const write_batch) {
4664   if (id && db && gtid) {
4665     // Make sure that if the slave_gtid_info table exists we have a
4666     // pointer to it via m_slave_gtid_info_tbl.
4667     if (!m_slave_gtid_info_tbl.load()) {
4668       m_slave_gtid_info_tbl.store(
4669           rdb_get_ddl_manager()->find("mysql.slave_gtid_info"));
4670     }
4671     if (!m_slave_gtid_info_tbl.load()) {
4672       // slave_gtid_info table is not present. Simply return.
4673       return;
4674     }
4675     DBUG_ASSERT(m_slave_gtid_info_tbl.load()->m_key_count == 1);
4676 
4677     const std::shared_ptr<const Rdb_key_def> &kd =
4678         m_slave_gtid_info_tbl.load()->m_key_descr_arr[0];
4679     String value;
4680 
4681     // Build key
4682     Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE + 4> key_writer;
4683     key_writer.write_index(kd->get_index_number());
4684     key_writer.write_uint32(id);
4685 
4686     // Build value
4687     Rdb_buf_writer<128> value_writer;
4688     DBUG_ASSERT(gtid);
4689     const uint db_len = strlen(db);
4690     const uint gtid_len = strlen(gtid);
4691     // 1 byte used for flags. Empty here.
4692     value_writer.write_byte(0);
4693 
4694     // Write column 1.
4695     DBUG_ASSERT(strlen(db) <= 64);
4696     value_writer.write_byte(db_len);
4697     value_writer.write(db, db_len);
4698 
4699     // Write column 2.
4700     DBUG_ASSERT(gtid_len <= 56);
4701     value_writer.write_byte(gtid_len);
4702     value_writer.write(gtid, gtid_len);
4703 
4704     write_batch->Put(kd->get_cf(), key_writer.to_slice(),
4705                      value_writer.to_slice());
4706   }
4707 }
4708 
init(rocksdb::TransactionDB * const rdb_dict,Rdb_cf_manager * const cf_manager)4709 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
4710                             Rdb_cf_manager *const cf_manager) {
4711   DBUG_ASSERT(rdb_dict != nullptr);
4712   DBUG_ASSERT(cf_manager != nullptr);
4713 
4714   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
4715 
4716   m_db = rdb_dict;
4717 
4718   m_system_cfh = cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME);
4719   rocksdb::ColumnFamilyHandle *default_cfh =
4720       cf_manager->get_cf(DEFAULT_CF_NAME);
4721 
4722   // System CF and default CF should be initialized
4723   if (m_system_cfh == nullptr || default_cfh == nullptr) {
4724     return HA_EXIT_FAILURE;
4725   }
4726 
4727   rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
4728 
4729   m_key_slice_max_index_id =
4730       rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
4731                      Rdb_key_def::INDEX_NUMBER_SIZE);
4732 
4733   resume_drop_indexes();
4734   rollback_ongoing_index_creation();
4735 
4736   // Initialize system CF and default CF flags
4737   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
4738   rocksdb::WriteBatch *const batch = wb.get();
4739 
4740   add_cf_flags(batch, m_system_cfh->GetID(), 0);
4741   add_cf_flags(batch, default_cfh->GetID(), 0);
4742   commit(batch);
4743 
4744   return HA_EXIT_SUCCESS;
4745 }
4746 
begin() const4747 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
4748   return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
4749 }
4750 
put_key(rocksdb::WriteBatchBase * const batch,const rocksdb::Slice & key,const rocksdb::Slice & value) const4751 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
4752                                const rocksdb::Slice &key,
4753                                const rocksdb::Slice &value) const {
4754   batch->Put(m_system_cfh, key, value);
4755 }
4756 
get_value(const rocksdb::Slice & key,std::string * const value) const4757 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
4758                                             std::string *const value) const {
4759   rocksdb::ReadOptions options;
4760   options.total_order_seek = true;
4761   return m_db->Get(options, m_system_cfh, key, value);
4762 }
4763 
delete_key(rocksdb::WriteBatchBase * batch,const rocksdb::Slice & key) const4764 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
4765                                   const rocksdb::Slice &key) const {
4766   batch->Delete(m_system_cfh, key);
4767 }
4768 
new_iterator() const4769 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
4770   /* Reading data dictionary should always skip bloom filter */
4771   rocksdb::ReadOptions read_options;
4772   read_options.total_order_seek = true;
4773   return m_db->NewIterator(read_options, m_system_cfh);
4774 }
4775 
commit(rocksdb::WriteBatch * const batch,const bool sync) const4776 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
4777                              const bool sync) const {
4778   if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
4779   int res = HA_EXIT_SUCCESS;
4780   rocksdb::WriteOptions options;
4781   options.sync = sync;
4782   rocksdb::TransactionDBWriteOptimizations optimize;
4783   optimize.skip_concurrency_control = true;
4784   rocksdb::Status s = m_db->Write(options, optimize, batch);
4785   res = !s.ok();  // we return true when something failed
4786   if (res) {
4787     rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
4788   }
4789   batch->Clear();
4790   return res;
4791 }
4792 
dump_index_id(uchar * const netbuf,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id)4793 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
4794                                      Rdb_key_def::DATA_DICT_TYPE dict_type,
4795                                      const GL_INDEX_ID &gl_index_id) {
4796   rdb_netbuf_store_uint32(netbuf, dict_type);
4797   rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
4798                           gl_index_id.cf_id);
4799   rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
4800                           gl_index_id.index_id);
4801 }
4802 
delete_with_prefix(rocksdb::WriteBatch * const batch,Rdb_key_def::DATA_DICT_TYPE dict_type,const GL_INDEX_ID & gl_index_id) const4803 void Rdb_dict_manager::delete_with_prefix(
4804     rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
4805     const GL_INDEX_ID &gl_index_id) const {
4806   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4807   dump_index_id(&key_writer, dict_type, gl_index_id);
4808 
4809   delete_key(batch, key_writer.to_slice());
4810 }
4811 
add_or_update_index_cf_mapping(rocksdb::WriteBatch * batch,struct Rdb_index_info * const index_info) const4812 void Rdb_dict_manager::add_or_update_index_cf_mapping(
4813     rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
4814   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4815   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
4816                 index_info->m_gl_index_id);
4817 
4818   Rdb_buf_writer<256> value_writer;
4819 
4820   value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
4821   value_writer.write_byte(index_info->m_index_type);
4822   value_writer.write_uint16(index_info->m_kv_version);
4823   value_writer.write_uint32(index_info->m_index_flags);
4824   value_writer.write_uint64(index_info->m_ttl_duration);
4825 
4826   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4827 }
4828 
add_cf_flags(rocksdb::WriteBatch * const batch,const uint32_t cf_id,const uint32_t cf_flags) const4829 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
4830                                     const uint32_t cf_id,
4831                                     const uint32_t cf_flags) const {
4832   DBUG_ASSERT(batch != nullptr);
4833 
4834   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4835   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4836   key_writer.write_uint32(cf_id);
4837 
4838   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
4839       value_writer;
4840   value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
4841   value_writer.write_uint32(cf_flags);
4842 
4843   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
4844 }
4845 
delete_index_info(rocksdb::WriteBatch * batch,const GL_INDEX_ID & gl_index_id) const4846 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
4847                                          const GL_INDEX_ID &gl_index_id) const {
4848   delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
4849   delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
4850   delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
4851 }
4852 
get_index_info(const GL_INDEX_ID & gl_index_id,struct Rdb_index_info * const index_info) const4853 bool Rdb_dict_manager::get_index_info(
4854     const GL_INDEX_ID &gl_index_id,
4855     struct Rdb_index_info *const index_info) const {
4856   if (index_info) {
4857     index_info->m_gl_index_id = gl_index_id;
4858   }
4859 
4860   bool found = false;
4861   bool error = false;
4862   std::string value;
4863   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
4864   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
4865 
4866   const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
4867   if (status.ok()) {
4868     if (!index_info) {
4869       return true;
4870     }
4871 
4872     const uchar *const val = (const uchar *)value.c_str();
4873     const uchar *ptr = val;
4874     index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
4875     ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
4876 
4877     switch (index_info->m_index_dict_version) {
4878       case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
4879         /* Sanity check to prevent reading bogus TTL record. */
4880         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4881                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4882                                 RDB_SIZEOF_INDEX_FLAGS +
4883                                 ROCKSDB_SIZEOF_TTL_RECORD) {
4884           error = true;
4885           break;
4886         }
4887         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4888         ptr += RDB_SIZEOF_INDEX_TYPE;
4889         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4890         ptr += RDB_SIZEOF_KV_VERSION;
4891         index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
4892         ptr += RDB_SIZEOF_INDEX_FLAGS;
4893         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4894         found = true;
4895         break;
4896 
4897       case Rdb_key_def::INDEX_INFO_VERSION_TTL:
4898         /* Sanity check to prevent reading bogus into TTL record. */
4899         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
4900                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
4901                                 ROCKSDB_SIZEOF_TTL_RECORD) {
4902           error = true;
4903           break;
4904         }
4905         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4906         ptr += RDB_SIZEOF_INDEX_TYPE;
4907         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4908         ptr += RDB_SIZEOF_KV_VERSION;
4909         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
4910         if ((index_info->m_kv_version ==
4911              Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
4912             index_info->m_ttl_duration > 0) {
4913           index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
4914         }
4915         found = true;
4916         break;
4917 
4918       case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
4919       case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
4920         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
4921         ptr += RDB_SIZEOF_INDEX_TYPE;
4922         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
4923         found = true;
4924         break;
4925 
4926       default:
4927         error = true;
4928         break;
4929     }
4930 
4931     switch (index_info->m_index_type) {
4932       case Rdb_key_def::INDEX_TYPE_PRIMARY:
4933       case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
4934         error = index_info->m_kv_version >
4935                 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
4936         break;
4937       }
4938       case Rdb_key_def::INDEX_TYPE_SECONDARY:
4939         error = index_info->m_kv_version >
4940                 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
4941         break;
4942       default:
4943         error = true;
4944         break;
4945     }
4946   }
4947 
4948   if (error) {
4949     // NO_LINT_DEBUG
4950     sql_print_error(
4951         "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
4952         "from data dictionary. This should never happen "
4953         "and it may be a bug.",
4954         index_info->m_index_dict_version, index_info->m_index_type,
4955         index_info->m_kv_version, index_info->m_ttl_duration);
4956     abort();
4957   }
4958 
4959   return found;
4960 }
4961 
get_cf_flags(const uint32_t cf_id,uint32_t * const cf_flags) const4962 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
4963                                     uint32_t *const cf_flags) const {
4964   DBUG_ASSERT(cf_flags != nullptr);
4965 
4966   bool found = false;
4967   std::string value;
4968   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
4969 
4970   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
4971   key_writer.write_uint32(cf_id);
4972 
4973   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
4974 
4975   if (status.ok()) {
4976     const uchar *val = (const uchar *)value.c_str();
4977     DBUG_ASSERT(val);
4978 
4979     const uint16_t version = rdb_netbuf_to_uint16(val);
4980 
4981     if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
4982       *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
4983       found = true;
4984     }
4985   }
4986 
4987   return found;
4988 }
4989 
4990 /*
4991   Returning index ids that were marked as deleted (via DROP TABLE) but
4992   still not removed by drop_index_thread yet, or indexes that are marked as
4993   ongoing creation.
4994  */
get_ongoing_index_operation(std::unordered_set<GL_INDEX_ID> * gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const4995 void Rdb_dict_manager::get_ongoing_index_operation(
4996     std::unordered_set<GL_INDEX_ID> *gl_index_ids,
4997     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
4998   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
4999               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5000 
5001   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
5002   index_writer.write_uint32(dd_type);
5003   const rocksdb::Slice index_slice = index_writer.to_slice();
5004 
5005   rocksdb::Iterator *it = new_iterator();
5006   for (it->Seek(index_slice); it->Valid(); it->Next()) {
5007     rocksdb::Slice key = it->key();
5008     const uchar *const ptr = (const uchar *)key.data();
5009 
5010     /*
5011       Ongoing drop/create index operations require key to be of the form:
5012       dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
5013 
5014       This may need to be changed in the future if we want to process a new
5015       ddl_type with different format.
5016     */
5017     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
5018         rdb_netbuf_to_uint32(ptr) != dd_type) {
5019       break;
5020     }
5021 
5022     // We don't check version right now since currently we always store only
5023     // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
5024     // If increasing version number, we need to add version check logic here.
5025     GL_INDEX_ID gl_index_id;
5026     gl_index_id.cf_id =
5027         rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5028     gl_index_id.index_id =
5029         rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
5030     gl_index_ids->insert(gl_index_id);
5031   }
5032   delete it;
5033 }
5034 
5035 /*
5036   Returning true if index_id is create/delete ongoing (undergoing creation or
5037   marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
5038   or not.
5039  */
is_index_operation_ongoing(const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5040 bool Rdb_dict_manager::is_index_operation_ongoing(
5041     const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5042   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5043               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5044 
5045   bool found = false;
5046   std::string value;
5047   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5048   dump_index_id(&key_writer, dd_type, gl_index_id);
5049 
5050   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5051   if (status.ok()) {
5052     found = true;
5053   }
5054   return found;
5055 }
5056 
5057 /*
5058   Adding index_id to data dictionary so that the index id is removed
5059   by drop_index_thread, or to track online index creation.
5060  */
start_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5061 void Rdb_dict_manager::start_ongoing_index_operation(
5062     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5063     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5064   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5065               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5066 
5067   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5068   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
5069 
5070   dump_index_id(&key_writer, dd_type, gl_index_id);
5071 
5072   // version as needed
5073   if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5074     value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
5075   } else {
5076     value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
5077   }
5078 
5079   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5080 }
5081 
5082 /*
5083   Removing index_id from data dictionary to confirm drop_index_thread
5084   completed dropping entire key/values of the index_id
5085  */
end_ongoing_index_operation(rocksdb::WriteBatch * const batch,const GL_INDEX_ID & gl_index_id,Rdb_key_def::DATA_DICT_TYPE dd_type) const5086 void Rdb_dict_manager::end_ongoing_index_operation(
5087     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5088     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5089   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5090               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5091 
5092   delete_with_prefix(batch, dd_type, gl_index_id);
5093 }
5094 
5095 /*
5096   Returning true if there is no target index ids to be removed
5097   by drop_index_thread
5098  */
is_drop_index_empty() const5099 bool Rdb_dict_manager::is_drop_index_empty() const {
5100   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5101   get_ongoing_drop_indexes(&gl_index_ids);
5102   return gl_index_ids.empty();
5103 }
5104 
5105 /*
5106   This function is supposed to be called by DROP TABLE. Logging messages
5107   that dropping indexes started, and adding data dictionary so that
5108   all associated indexes to be removed
5109  */
add_drop_table(std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,rocksdb::WriteBatch * const batch) const5110 void Rdb_dict_manager::add_drop_table(
5111     std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5112     rocksdb::WriteBatch *const batch) const {
5113   std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5114   for (uint32 i = 0; i < n_keys; i++) {
5115     dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5116   }
5117 
5118   add_drop_index(dropped_index_ids, batch);
5119 }
5120 
5121 /*
5122   Called during inplace index drop operations. Logging messages
5123   that dropping indexes started, and adding data dictionary so that
5124   all associated indexes to be removed
5125  */
add_drop_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5126 void Rdb_dict_manager::add_drop_index(
5127     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5128     rocksdb::WriteBatch *const batch) const {
5129   for (const auto &gl_index_id : gl_index_ids) {
5130     log_start_drop_index(gl_index_id, "Begin");
5131     start_drop_index(batch, gl_index_id);
5132   }
5133 }
5134 
5135 /*
5136   Called during inplace index creation operations. Logging messages
5137   that adding indexes started, and updates data dictionary with all associated
5138   indexes to be added.
5139  */
add_create_index(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,rocksdb::WriteBatch * const batch) const5140 void Rdb_dict_manager::add_create_index(
5141     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5142     rocksdb::WriteBatch *const batch) const {
5143   for (const auto &gl_index_id : gl_index_ids) {
5144     // NO_LINT_DEBUG
5145     sql_print_verbose_info("RocksDB: Begin index creation (%u,%u)",
5146                            gl_index_id.cf_id, gl_index_id.index_id);
5147     start_create_index(batch, gl_index_id);
5148   }
5149 }
5150 
5151 /*
5152   This function is supposed to be called by drop_index_thread, when it
5153   finished dropping any index, or at the completion of online index creation.
5154  */
finish_indexes_operation(const std::unordered_set<GL_INDEX_ID> & gl_index_ids,Rdb_key_def::DATA_DICT_TYPE dd_type) const5155 void Rdb_dict_manager::finish_indexes_operation(
5156     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5157     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5158   DBUG_ASSERT(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5159               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5160 
5161   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5162   rocksdb::WriteBatch *const batch = wb.get();
5163 
5164   std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5165   get_ongoing_create_indexes(&incomplete_create_indexes);
5166 
5167   for (const auto &gl_index_id : gl_index_ids) {
5168     if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5169       end_ongoing_index_operation(batch, gl_index_id, dd_type);
5170 
5171       /*
5172         Remove the corresponding incomplete create indexes from data
5173         dictionary as well
5174       */
5175       if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5176         if (incomplete_create_indexes.count(gl_index_id)) {
5177           end_ongoing_index_operation(batch, gl_index_id,
5178                                       Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5179         }
5180       }
5181     }
5182 
5183     if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5184       delete_index_info(batch, gl_index_id);
5185     }
5186   }
5187   commit(batch);
5188 }
5189 
5190 /*
5191   This function is supposed to be called when initializing
5192   Rdb_dict_manager (at startup). If there is any index ids that are
5193   drop ongoing, printing out messages for diagnostics purposes.
5194  */
resume_drop_indexes() const5195 void Rdb_dict_manager::resume_drop_indexes() const {
5196   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5197   get_ongoing_drop_indexes(&gl_index_ids);
5198 
5199   uint max_index_id_in_dict = 0;
5200   get_max_index_id(&max_index_id_in_dict);
5201 
5202   for (const auto &gl_index_id : gl_index_ids) {
5203     log_start_drop_index(gl_index_id, "Resume");
5204     if (max_index_id_in_dict < gl_index_id.index_id) {
5205       // NO_LINT_DEBUG
5206       sql_print_error(
5207           "RocksDB: Found max index id %u from data dictionary "
5208           "but also found dropped index id (%u,%u) from drop_index "
5209           "dictionary. This should never happen and is possibly a "
5210           "bug.",
5211           max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
5212       abort();
5213     }
5214   }
5215 }
5216 
rollback_ongoing_index_creation() const5217 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5218   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5219   rocksdb::WriteBatch *const batch = wb.get();
5220 
5221   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5222   get_ongoing_create_indexes(&gl_index_ids);
5223 
5224   for (const auto &gl_index_id : gl_index_ids) {
5225     // NO_LINT_DEBUG
5226     sql_print_verbose_info("RocksDB: Removing incomplete create index (%u,%u)",
5227                            gl_index_id.cf_id, gl_index_id.index_id);
5228 
5229     start_drop_index(batch, gl_index_id);
5230   }
5231 
5232   commit(batch);
5233 }
5234 
log_start_drop_table(const std::shared_ptr<Rdb_key_def> * const key_descr,const uint32 n_keys,const char * const log_action) const5235 void Rdb_dict_manager::log_start_drop_table(
5236     const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5237     const char *const log_action) const {
5238   for (uint32 i = 0; i < n_keys; i++) {
5239     log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5240   }
5241 }
5242 
log_start_drop_index(GL_INDEX_ID gl_index_id,const char * log_action) const5243 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5244                                             const char *log_action) const {
5245   struct Rdb_index_info index_info;
5246   if (!get_index_info(gl_index_id, &index_info)) {
5247     /*
5248       If we don't find the index info, it could be that it's because it was a
5249       partially created index that isn't in the data dictionary yet that needs
5250       to be rolled back.
5251     */
5252     std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5253     get_ongoing_create_indexes(&incomplete_create_indexes);
5254 
5255     if (!incomplete_create_indexes.count(gl_index_id)) {
5256       /* If it's not a partially created index, something is very wrong. */
5257       // NO_LINT_DEBUG
5258       sql_print_error(
5259           "RocksDB: Failed to get column family info "
5260           "from index id (%u,%u). MyRocks data dictionary may "
5261           "get corrupted.",
5262           gl_index_id.cf_id, gl_index_id.index_id);
5263       if (rocksdb_ignore_datadic_errors)
5264       {
5265         sql_print_error("RocksDB: rocksdb_ignore_datadic_errors=1, "
5266                         "trying to continue");
5267         return;
5268       }
5269       abort();
5270     }
5271   }
5272 }
5273 
get_max_index_id(uint32_t * const index_id) const5274 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5275   bool found = false;
5276   std::string value;
5277 
5278   const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5279   if (status.ok()) {
5280     const uchar *const val = (const uchar *)value.c_str();
5281     const uint16_t version = rdb_netbuf_to_uint16(val);
5282     if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5283       *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5284       found = true;
5285     }
5286   }
5287   return found;
5288 }
5289 
update_max_index_id(rocksdb::WriteBatch * const batch,const uint32_t index_id) const5290 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5291                                            const uint32_t index_id) const {
5292   DBUG_ASSERT(batch != nullptr);
5293 
5294   uint32_t old_index_id = -1;
5295   if (get_max_index_id(&old_index_id)) {
5296     if (old_index_id > index_id) {
5297       // NO_LINT_DEBUG
5298       sql_print_error(
5299           "RocksDB: Found max index id %u from data dictionary "
5300           "but trying to update to older value %u. This should "
5301           "never happen and possibly a bug.",
5302           old_index_id, index_id);
5303       return true;
5304     }
5305   }
5306 
5307   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5308       value_writer;
5309   value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
5310   value_writer.write_uint32(index_id);
5311 
5312   batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
5313   return false;
5314 }
5315 
add_stats(rocksdb::WriteBatch * const batch,const std::vector<Rdb_index_stats> & stats) const5316 void Rdb_dict_manager::add_stats(
5317     rocksdb::WriteBatch *const batch,
5318     const std::vector<Rdb_index_stats> &stats) const {
5319   DBUG_ASSERT(batch != nullptr);
5320 
5321   for (const auto &it : stats) {
5322     Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5323     dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5324 
5325     // IndexStats::materialize takes complete care of serialization including
5326     // storing the version
5327     const auto value =
5328         Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5329 
5330     batch->Put(m_system_cfh, key_writer.to_slice(), value);
5331   }
5332 }
5333 
get_stats(GL_INDEX_ID gl_index_id) const5334 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5335   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5336   dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5337 
5338   std::string value;
5339   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5340   if (status.ok()) {
5341     std::vector<Rdb_index_stats> v;
5342     // unmaterialize checks if the version matches
5343     if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5344       return v[0];
5345     }
5346   }
5347 
5348   return Rdb_index_stats();
5349 }
5350 
put_auto_incr_val(rocksdb::WriteBatchBase * batch,const GL_INDEX_ID & gl_index_id,ulonglong val,bool overwrite) const5351 rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
5352     rocksdb::WriteBatchBase *batch, const GL_INDEX_ID &gl_index_id,
5353     ulonglong val, bool overwrite) const {
5354   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5355   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5356 
5357   // Value is constructed by storing the version and the value.
5358   Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5359                  ROCKSDB_SIZEOF_AUTOINC_VALUE>
5360       value_writer;
5361   value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
5362   value_writer.write_uint64(val);
5363 
5364   if (overwrite) {
5365     return batch->Put(m_system_cfh, key_writer.to_slice(),
5366                       value_writer.to_slice());
5367   }
5368   return batch->Merge(m_system_cfh, key_writer.to_slice(),
5369                       value_writer.to_slice());
5370 }
5371 
get_auto_incr_val(const GL_INDEX_ID & gl_index_id,ulonglong * new_val) const5372 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5373                                          ulonglong *new_val) const {
5374   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5375   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5376 
5377   std::string value;
5378   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5379 
5380   if (status.ok()) {
5381     const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5382 
5383     if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5384       *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5385       return true;
5386     }
5387   }
5388   return false;
5389 }
5390 
get_and_update_next_number(Rdb_dict_manager * const dict)5391 uint Rdb_seq_generator::get_and_update_next_number(
5392     Rdb_dict_manager *const dict) {
5393   DBUG_ASSERT(dict != nullptr);
5394 
5395   uint res;
5396   RDB_MUTEX_LOCK_CHECK(m_mutex);
5397 
5398   res = m_next_number++;
5399 
5400   const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
5401   rocksdb::WriteBatch *const batch = wb.get();
5402 
5403   DBUG_ASSERT(batch != nullptr);
5404   dict->update_max_index_id(batch, res);
5405   dict->commit(batch);
5406 
5407   RDB_MUTEX_UNLOCK_CHECK(m_mutex);
5408 
5409   return res;
5410 }
5411 
5412 }  // namespace myrocks
5413