1 /*
2    Copyright (c) 2012,2013 Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16 
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation  // gcc: Class implementation
19 #endif
20 
21 /* This C++ file's header file */
22 #include "./rdb_datadic.h"
23 
24 /* C++ standard header files */
25 #include <algorithm>
26 #include <array>
27 #include <limits>
28 #include <map>
29 #include <set>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 /* MySQL header files */
35 #include "field.h"
36 #include "key.h"
37 #include "m_ctype.h"
38 #include "my_bit.h"
39 #include "my_bitmap.h"
40 #include "my_stacktrace.h"
41 #include "sql_table.h"
42 
43 /* MyRocks header files */
44 #include "./ha_rocksdb.h"
45 #include "./ha_rocksdb_proto.h"
46 #include "./rdb_cf_manager.h"
47 #include "./rdb_psi.h"
48 #include "./rdb_utils.h"
49 
50 #include "partitioning/partition_base.h"
51 
52 namespace myrocks {
53 
54 void get_mem_comparable_space(const CHARSET_INFO *cs,
55                               const std::vector<uchar> **xfrm, size_t *xfrm_len,
56                               size_t *mb_len);
57 
58 /*
59   Decode  current key field
60   @param  fpi               IN      data structure contains field metadata
61   @param  field             IN      current field
62   @param  reader            IN      key slice reader
63   @param  unp_reader        IN      unpack information reader
64   @return
65     HA_EXIT_SUCCESS    OK
66     other              HA_ERR error code
67 */
decode_field(Rdb_field_packing * fpi,TABLE * table,uchar * buf,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)68 int Rdb_convert_to_record_key_decoder::decode_field(
69     Rdb_field_packing *fpi, TABLE *table, uchar *buf, Rdb_string_reader *reader,
70     Rdb_string_reader *unpack_reader) {
71   if (fpi->m_field_maybe_null) {
72     const char *nullp;
73     if (!(nullp = reader->read(1))) {
74       return HA_EXIT_FAILURE;
75     }
76 
77     if (likely(*nullp == 1)) {
78       /* Clear the NULL-bit of this field */
79       buf[fpi->m_field_null_offset] &= (uchar) ~(fpi->m_field_null_bit_mask);
80     } else if (*nullp == 0) {
81       /* Set the NULL-bit of this field */
82       buf[fpi->m_field_null_offset] |= fpi->m_field_null_bit_mask;
83 
84       /* Also set the field to its default value */
85       auto default_value = table->s->default_values + fpi->m_field_offset;
86       memcpy(buf + fpi->m_field_offset, default_value,
87              fpi->m_field_pack_length);
88       return HA_EXIT_SUCCESS;
89     } else {
90       return HA_EXIT_FAILURE;
91     }
92   }
93 
94   return (fpi->m_unpack_func)(fpi, buf + fpi->m_field_offset, reader,
95                               unpack_reader);
96 }
97 
98 /*
99   Decode  current key field
100 
101   @param  buf               OUT     the buf starting address
102   @param  offset            OUT     the bytes offset when data is written
103   @param  fpi               IN      data structure contains field metadata
104   @param  table             IN      current table
105   @param  field             IN      current field
106   @param  has_unpack_inf    IN      whether contains unpack inf
107   @param  reader            IN      key slice reader
108   @param  unp_reader        IN      unpack information reader
109   @return
110     HA_EXIT_SUCCESS    OK
111     other              HA_ERR error code
112 */
decode(uchar * const buf,Rdb_field_packing * fpi,TABLE * table,bool has_unpack_info,Rdb_string_reader * reader,Rdb_string_reader * unpack_reader)113 int Rdb_convert_to_record_key_decoder::decode(
114     uchar *const buf, Rdb_field_packing *fpi, TABLE *table,
115     bool has_unpack_info, Rdb_string_reader *reader,
116     Rdb_string_reader *unpack_reader) {
117   assert(buf != nullptr);
118 
119   // If we need unpack info, but there is none, tell the unpack function
120   // this by passing unp_reader as nullptr. If we never read unpack_info
121   // during unpacking anyway, then there won't an error.
122   bool maybe_missing_unpack = !has_unpack_info && fpi->uses_unpack_info();
123 
124   int res = decode_field(fpi, table, buf, reader,
125                          maybe_missing_unpack ? nullptr : unpack_reader);
126 
127   if (res != UNPACK_SUCCESS) {
128     return HA_ERR_ROCKSDB_CORRUPT_DATA;
129   }
130   return HA_EXIT_SUCCESS;
131 }
132 
133 /*
134   Skip current key field
135 
136   @param  fpi          IN    data structure contains field metadata
137   @param  field        IN    current field
138   @param  reader       IN    key slice reader
139   @param  unp_reader   IN    unpack information reader
140   @return
141     HA_EXIT_SUCCESS    OK
142     other              HA_ERR error code
143 */
skip(const Rdb_field_packing * fpi,const Field * field,Rdb_string_reader * reader,Rdb_string_reader * unp_reader)144 int Rdb_convert_to_record_key_decoder::skip(const Rdb_field_packing *fpi,
145                                             const Field *field,
146                                             Rdb_string_reader *reader,
147                                             Rdb_string_reader *unp_reader) {
148   /* It is impossible to unpack the column. Skip it. */
149   if (fpi->m_field_maybe_null) {
150     const char *nullp;
151     if (!(nullp = reader->read(1))) {
152       return HA_ERR_ROCKSDB_CORRUPT_DATA;
153     }
154     if (*nullp == 0) {
155       /* This is a NULL value */
156       return HA_EXIT_SUCCESS;
157     }
158     /* If NULL marker is not '0', it can be only '1'  */
159     if (*nullp != 1) {
160       return HA_ERR_ROCKSDB_CORRUPT_DATA;
161     }
162   }
163   if ((fpi->m_skip_func)(fpi, reader)) {
164     return HA_ERR_ROCKSDB_CORRUPT_DATA;
165   }
166   // If this is a space padded varchar, we need to skip the indicator
167   // bytes for trailing bytes. They're useless since we can't restore the
168   // field anyway.
169   //
170   // There is a special case for prefixed varchars where we do not
171   // generate unpack info, because we know prefixed varchars cannot be
172   // unpacked. In this case, it is not necessary to skip.
173   if (fpi->m_skip_func == &Rdb_key_def::skip_variable_space_pad &&
174       !fpi->m_unpack_info_stores_value) {
175     unp_reader->read(fpi->m_unpack_info_uses_two_bytes ? 2 : 1);
176   }
177   return HA_EXIT_SUCCESS;
178 }
179 
Rdb_key_field_iterator(const Rdb_key_def * key_def,Rdb_field_packing * pack_info,Rdb_string_reader * reader,Rdb_string_reader * unp_reader,TABLE * table,bool has_unpack_info,const MY_BITMAP * covered_bitmap,uchar * const buf)180 Rdb_key_field_iterator::Rdb_key_field_iterator(
181     const Rdb_key_def *key_def, Rdb_field_packing *pack_info,
182     Rdb_string_reader *reader, Rdb_string_reader *unp_reader, TABLE *table,
183     bool has_unpack_info, const MY_BITMAP *covered_bitmap, uchar *const buf) {
184   m_key_def = key_def;
185   m_fpi = pack_info;
186   m_fpi_end = pack_info + key_def->get_key_parts();
187   m_reader = reader;
188   m_unp_reader = unp_reader;
189   m_table = table;
190   m_has_unpack_info = has_unpack_info;
191   m_covered_bitmap = covered_bitmap;
192   m_buf = buf;
193   m_secondary_key =
194       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
195   m_hidden_pk_exists = Rdb_key_def::table_has_hidden_pk(table);
196   m_is_hidden_pk =
197       (key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
198   m_curr_bitmap_pos = 0;
199 }
200 
has_next()201 bool Rdb_key_field_iterator::has_next() { return m_fpi < m_fpi_end; }
202 
203 /**
204  Iterate each field in the key and decode/skip one by one
205 */
next()206 int Rdb_key_field_iterator::next() {
207   int status = HA_EXIT_SUCCESS;
208   while (m_fpi < m_fpi_end) {
209     auto fpi = m_fpi++;
210 
211     /*
212       Hidden pk field is packed at the end of the secondary keys, but the SQL
213       layer does not know about it. Skip retrieving field if hidden pk.
214     */
215     if ((m_secondary_key && m_hidden_pk_exists && fpi + 1 == m_fpi_end) ||
216         m_is_hidden_pk) {
217       assert(fpi->m_unpack_func);
218       if ((fpi->m_skip_func)(fpi, m_reader)) {
219         return HA_ERR_ROCKSDB_CORRUPT_DATA;
220       }
221       return HA_EXIT_SUCCESS;
222     }
223 
224     bool covered_column = true;
225     if (m_covered_bitmap != nullptr &&
226         fpi->m_field_real_type == MYSQL_TYPE_VARCHAR && !fpi->m_covered) {
227       covered_column = m_curr_bitmap_pos < MAX_REF_PARTS &&
228                        bitmap_is_set(m_covered_bitmap, m_curr_bitmap_pos++);
229     }
230 
231     if (fpi->m_unpack_func && covered_column) {
232       /* It is possible to unpack this column. Do it. */
233       status = Rdb_convert_to_record_key_decoder::decode(
234           m_buf, fpi, m_table, m_has_unpack_info, m_reader, m_unp_reader);
235       if (status) {
236         return status;
237       }
238       break;
239     } else {
240       auto field = fpi->get_field_in_table(m_table);
241       status = Rdb_convert_to_record_key_decoder::skip(fpi, field, m_reader,
242                                                        m_unp_reader);
243       if (status) {
244         return status;
245       }
246     }
247   }
248   return HA_EXIT_SUCCESS;
249 }
250 
251 /*
252   Rdb_key_def class implementation
253 */
Rdb_key_def(uint indexnr_arg,uint keyno_arg,std::shared_ptr<rocksdb::ColumnFamilyHandle> cf_handle_arg,uint16_t index_dict_version_arg,uchar index_type_arg,uint16_t kv_format_version_arg,bool is_reverse_cf_arg,bool is_per_partition_cf_arg,const char * _name,Rdb_index_stats _stats,uint32 index_flags_bitmap,uint32 ttl_rec_offset,uint64 ttl_duration)254 Rdb_key_def::Rdb_key_def(
255     uint indexnr_arg, uint keyno_arg,
256     std::shared_ptr<rocksdb::ColumnFamilyHandle> cf_handle_arg,
257     uint16_t index_dict_version_arg, uchar index_type_arg,
258     uint16_t kv_format_version_arg, bool is_reverse_cf_arg,
259     bool is_per_partition_cf_arg, const char *_name, Rdb_index_stats _stats,
260     uint32 index_flags_bitmap, uint32 ttl_rec_offset, uint64 ttl_duration)
261     : m_index_number(indexnr_arg),
262       m_cf_handle(cf_handle_arg),
263       m_index_dict_version(index_dict_version_arg),
264       m_index_type(index_type_arg),
265       m_kv_format_version(kv_format_version_arg),
266       m_is_reverse_cf(is_reverse_cf_arg),
267       m_is_per_partition_cf(is_per_partition_cf_arg),
268       m_name(_name),
269       m_stats(_stats),
270       m_index_flags_bitmap(index_flags_bitmap),
271       m_ttl_rec_offset(ttl_rec_offset),
272       m_ttl_duration(ttl_duration),
273       m_ttl_column(""),
274       m_pk_part_no(nullptr),
275       m_pack_info(nullptr),
276       m_keyno(keyno_arg),
277       m_key_parts(0),
278       m_ttl_pk_key_part_offset(UINT_MAX),
279       m_ttl_field_index(UINT_MAX),
280       m_prefix_extractor(nullptr),
281       m_maxlength(0)  // means 'not intialized'
282 {
283   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
284   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
285   m_total_index_flags_length =
286       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
287   assert_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
288                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
289                   m_total_index_flags_length == 0);
290   assert_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
291                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
292                   m_total_index_flags_length == 0);
293   assert(m_cf_handle);
294 }
295 
Rdb_key_def(const Rdb_key_def & k)296 Rdb_key_def::Rdb_key_def(const Rdb_key_def &k)
297     : m_index_number(k.m_index_number),
298       m_cf_handle(k.m_cf_handle),
299       m_is_reverse_cf(k.m_is_reverse_cf),
300       m_is_per_partition_cf(k.m_is_per_partition_cf),
301       m_name(k.m_name),
302       m_stats(k.m_stats),
303       m_index_flags_bitmap(k.m_index_flags_bitmap),
304       m_ttl_rec_offset(k.m_ttl_rec_offset),
305       m_ttl_duration(k.m_ttl_duration),
306       m_ttl_column(k.m_ttl_column),
307       m_pk_part_no(k.m_pk_part_no),
308       m_pack_info(nullptr),
309       m_keyno(k.m_keyno),
310       m_key_parts(k.m_key_parts),
311       m_ttl_pk_key_part_offset(k.m_ttl_pk_key_part_offset),
312       m_ttl_field_index(UINT_MAX),
313       m_prefix_extractor(k.m_prefix_extractor),
314       m_maxlength(k.m_maxlength) {
315   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
316   rdb_netbuf_store_index(m_index_number_storage_form, m_index_number);
317   m_total_index_flags_length =
318       calculate_index_flag_offset(m_index_flags_bitmap, MAX_FLAG);
319   assert_IMP(m_index_type == INDEX_TYPE_SECONDARY &&
320                       m_kv_format_version <= SECONDARY_FORMAT_VERSION_UPDATE2,
321                   m_total_index_flags_length == 0);
322   assert_IMP(m_index_type == INDEX_TYPE_PRIMARY &&
323                       m_kv_format_version <= PRIMARY_FORMAT_VERSION_UPDATE2,
324                   m_total_index_flags_length == 0);
325   if (k.m_pack_info) {
326     const size_t size = sizeof(Rdb_field_packing) * k.m_key_parts;
327 #ifdef HAVE_PSI_INTERFACE
328     void *buf = my_malloc(rdb_datadic_memory_key, size, MYF(0));
329 #else
330     void *buf = my_malloc(PSI_NOT_INSTRUMENTED, size, MYF(0));
331 #endif
332     m_pack_info = new (buf) Rdb_field_packing(*k.m_pack_info);
333   }
334 
335   if (k.m_pk_part_no) {
336     const size_t size = sizeof(uint) * m_key_parts;
337 #ifdef HAVE_PSI_INTERFACE
338     m_pk_part_no =
339         static_cast<uint *>(my_malloc(rdb_datadic_memory_key, size, MYF(0)));
340 #else
341     m_pk_part_no =
342         static_cast<uint *>(my_malloc(PSI_NOT_INSTRUMENTED, size, MYF(0)));
343 #endif
344     memcpy(m_pk_part_no, k.m_pk_part_no, size);
345   }
346 }
347 
~Rdb_key_def()348 Rdb_key_def::~Rdb_key_def() {
349   mysql_mutex_destroy(&m_mutex);
350 
351   my_free(m_pk_part_no);
352   m_pk_part_no = nullptr;
353 
354   if (m_pack_info) {
355     m_pack_info->~Rdb_field_packing();
356     my_free(m_pack_info);
357   }
358   m_pack_info = nullptr;
359 }
360 
setup(const TABLE * const tbl,const Rdb_tbl_def * const tbl_def)361 void Rdb_key_def::setup(const TABLE *const tbl,
362                         const Rdb_tbl_def *const tbl_def) {
363   assert(tbl != nullptr);
364   assert(tbl_def != nullptr);
365 
366   /*
367     Set max_length based on the table.  This can be called concurrently from
368     multiple threads, so there is a mutex to protect this code.
369   */
370   const bool is_hidden_pk = (m_index_type == INDEX_TYPE_HIDDEN_PRIMARY);
371   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
372   const bool secondary_key = (m_index_type == INDEX_TYPE_SECONDARY);
373   if (!m_maxlength) {
374     RDB_MUTEX_LOCK_CHECK(m_mutex);
375     if (m_maxlength != 0) {
376       RDB_MUTEX_UNLOCK_CHECK(m_mutex);
377       return;
378     }
379 
380     KEY *key_info = nullptr;
381     KEY *pk_info = nullptr;
382     if (!is_hidden_pk) {
383       key_info = &tbl->key_info[m_keyno];
384       if (!hidden_pk_exists) pk_info = &tbl->key_info[tbl->s->primary_key];
385       m_name = std::string(key_info->name);
386     } else {
387       m_name = HIDDEN_PK_NAME;
388     }
389 
390     if (secondary_key) {
391       m_pk_key_parts = hidden_pk_exists ? 1 : pk_info->actual_key_parts;
392     } else {
393       pk_info = nullptr;
394       m_pk_key_parts = 0;
395     }
396 
397     // "unique" secondary keys support:
398     m_key_parts = is_hidden_pk ? 1 : key_info->actual_key_parts;
399 
400     if (secondary_key) {
401       /*
402         In most cases, SQL layer puts PK columns as invisible suffix at the
403         end of secondary key. There are cases where this doesn't happen:
404         - unique secondary indexes.
405         - partitioned tables.
406 
407         Internally, we always need PK columns as suffix (and InnoDB does,
408         too, if you were wondering).
409 
410         The loop below will attempt to put all PK columns at the end of key
411         definition.  Columns that are already included in the index (either
412         by the user or by "extended keys" feature) are not included for the
413         second time.
414       */
415       m_key_parts += m_pk_key_parts;
416     }
417 
418     if (secondary_key) {
419 #ifdef HAVE_PSI_INTERFACE
420       m_pk_part_no = static_cast<uint *>(my_malloc(
421           rdb_datadic_memory_key, sizeof(uint) * m_key_parts, MYF(0)));
422 #else
423       m_pk_part_no = static_cast<uint *>(
424           my_malloc(PSI_NOT_INSTRUMENTED, sizeof(uint) * m_key_parts, MYF(0)));
425 #endif
426     } else {
427       m_pk_part_no = nullptr;
428     }
429 
430     const size_t size = sizeof(Rdb_field_packing) * m_key_parts;
431 #ifdef HAVE_PSI_INTERFACE
432     void *buf = my_malloc(rdb_datadic_memory_key, size, MYF(0));
433 #else
434     void *buf = my_malloc(PSI_NOT_INSTRUMENTED, size, MYF(0));
435 #endif
436     m_pack_info = new (buf) Rdb_field_packing;
437 
438     /*
439       Guaranteed not to error here as checks have been made already during
440       table creation.
441     */
442     Rdb_key_def::extract_ttl_col(tbl, tbl_def, &m_ttl_column,
443                                  &m_ttl_field_index, true);
444 
445     size_t max_len = INDEX_NUMBER_SIZE;
446     int unpack_len = 0;
447     int max_part_len = 0;
448     bool simulating_extkey = false;
449     uint dst_i = 0;
450 
451     uint keyno_to_set = m_keyno;
452     uint keypart_to_set = 0;
453 
454     if (is_hidden_pk) {
455       Field *field = nullptr;
456       m_pack_info[dst_i].setup(this, field, keyno_to_set, 0, 0);
457       m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
458       max_len += m_pack_info[dst_i].m_max_image_len;
459       max_part_len = std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
460       dst_i++;
461     } else {
462       KEY_PART_INFO *key_part = key_info->key_part;
463 
464       /* this loop also loops over the 'extended key' tail */
465       for (uint src_i = 0; src_i < m_key_parts; src_i++, keypart_to_set++) {
466         Field *const field = key_part ? key_part->field : nullptr;
467 
468         if (simulating_extkey && !hidden_pk_exists) {
469           assert(secondary_key);
470           /* Check if this field is already present in the key definition */
471           bool found = false;
472           for (uint j = 0; j < key_info->actual_key_parts; j++) {
473             if (field->field_index ==
474                     key_info->key_part[j].field->field_index &&
475                 key_part->length == key_info->key_part[j].length) {
476               found = true;
477               break;
478             }
479           }
480 
481           if (found) {
482             key_part++;
483             continue;
484           }
485         }
486 
487         if (field && field->real_maybe_null()) max_len += 1;  // NULL-byte
488 
489         m_pack_info[dst_i].setup(this, field, keyno_to_set, keypart_to_set,
490                                  key_part ? key_part->length : 0);
491         m_pack_info[dst_i].m_unpack_data_offset = unpack_len;
492 
493         if (pk_info) {
494           m_pk_part_no[dst_i] = -1;
495           for (uint j = 0; j < m_pk_key_parts; j++) {
496             if (field->field_index == pk_info->key_part[j].field->field_index) {
497               m_pk_part_no[dst_i] = j;
498               break;
499             }
500           }
501         } else if (secondary_key && hidden_pk_exists) {
502           /*
503             The hidden pk can never be part of the sk.  So it is always
504             appended to the end of the sk.
505           */
506           m_pk_part_no[dst_i] = -1;
507           if (simulating_extkey) m_pk_part_no[dst_i] = 0;
508         }
509 
510         max_len += m_pack_info[dst_i].m_max_image_len;
511 
512         max_part_len =
513             std::max(max_part_len, m_pack_info[dst_i].m_max_image_len);
514 
515         /*
516           Check key part name here, if it matches the TTL column then we store
517           the offset of the TTL key part here.
518         */
519         if (!m_ttl_column.empty() &&
520             my_strcasecmp(system_charset_info, field->field_name,
521                           m_ttl_column.c_str()) == 0) {
522           assert(field->real_type() == MYSQL_TYPE_LONGLONG);
523           assert(field->key_type() == HA_KEYTYPE_ULONGLONG);
524           assert(!field->real_maybe_null());
525           m_ttl_pk_key_part_offset = dst_i;
526         }
527 
528         key_part++;
529         /*
530           For "unique" secondary indexes, pretend they have
531           "index extensions"
532          */
533         if (secondary_key && src_i + 1 == key_info->actual_key_parts) {
534           simulating_extkey = true;
535           if (!hidden_pk_exists) {
536             keyno_to_set = tbl->s->primary_key;
537             key_part = pk_info->key_part;
538             keypart_to_set = (uint)-1;
539           } else {
540             keyno_to_set = tbl_def->m_key_count - 1;
541             key_part = nullptr;
542             keypart_to_set = 0;
543           }
544         }
545 
546         dst_i++;
547       }
548     }
549 
550     m_key_parts = dst_i;
551 
552     /* Initialize the memory needed by the stats structure */
553     m_stats.m_distinct_keys_per_prefix.resize(get_key_parts());
554 
555     /* Cache prefix extractor for bloom filter usage later */
556     rocksdb::Options opt = rdb_get_rocksdb_db()->GetOptions(get_cf());
557     m_prefix_extractor = opt.prefix_extractor;
558 
559     /*
560       This should be the last member variable set before releasing the mutex
561       so that other threads can't see the object partially set up.
562      */
563     m_maxlength = max_len;
564 
565     RDB_MUTEX_UNLOCK_CHECK(m_mutex);
566   }
567 }
568 
569 /*
570   Determine if the table has TTL enabled by parsing the table comment.
571 
572   @param[IN]  table_arg
573   @param[IN]  tbl_def_arg
574   @param[OUT] ttl_duration        Default TTL value parsed from table comment
575 */
extract_ttl_duration(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,uint64 * ttl_duration)576 uint Rdb_key_def::extract_ttl_duration(const TABLE *const table_arg,
577                                        const Rdb_tbl_def *const tbl_def_arg,
578                                        uint64 *ttl_duration) {
579   assert(table_arg != nullptr);
580   assert(tbl_def_arg != nullptr);
581   assert(ttl_duration != nullptr);
582   std::string table_comment(table_arg->s->comment.str,
583                             table_arg->s->comment.length);
584 
585   bool ttl_duration_per_part_match_found = false;
586   std::string ttl_duration_str = Rdb_key_def::parse_comment_for_qualifier(
587       table_comment, table_arg, tbl_def_arg, &ttl_duration_per_part_match_found,
588       RDB_TTL_DURATION_QUALIFIER);
589 
590   /* If we don't have a ttl duration, nothing to do here. */
591   if (ttl_duration_str.empty()) {
592     return HA_EXIT_SUCCESS;
593   }
594 
595   /*
596     Catch errors where a non-integral value was used as ttl duration, strtoull
597     will return 0.
598   */
599   *ttl_duration = std::strtoull(ttl_duration_str.c_str(), nullptr, 0);
600   if (!*ttl_duration) {
601     my_error(ER_RDB_TTL_DURATION_FORMAT, MYF(0), ttl_duration_str.c_str());
602     return HA_EXIT_FAILURE;
603   }
604 
605   return HA_EXIT_SUCCESS;
606 }
607 
608 /*
609   Determine if the table has TTL enabled by parsing the table comment.
610 
611   @param[IN]  table_arg
612   @param[IN]  tbl_def_arg
613   @param[OUT] ttl_column          TTL column in the table
614   @param[IN]  skip_checks         Skip validation checks (when called in
615                                   setup())
616 */
extract_ttl_col(const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,std::string * ttl_column,uint * ttl_field_index,bool skip_checks)617 uint Rdb_key_def::extract_ttl_col(const TABLE *const table_arg,
618                                   const Rdb_tbl_def *const tbl_def_arg,
619                                   std::string *ttl_column,
620                                   uint *ttl_field_index, bool skip_checks) {
621   std::string table_comment(table_arg->s->comment.str,
622                             table_arg->s->comment.length);
623   /*
624     Check if there is a TTL column specified. Note that this is not required
625     and if omitted, an 8-byte ttl field will be prepended to each record
626     implicitly.
627   */
628   bool ttl_col_per_part_match_found = false;
629   std::string ttl_col_str = Rdb_key_def::parse_comment_for_qualifier(
630       table_comment, table_arg, tbl_def_arg, &ttl_col_per_part_match_found,
631       RDB_TTL_COL_QUALIFIER);
632 
633   if (skip_checks) {
634     for (uint i = 0; i < table_arg->s->fields; i++) {
635       Field *const field = table_arg->field[i];
636       if (my_strcasecmp(system_charset_info, field->field_name,
637                         ttl_col_str.c_str()) == 0) {
638         *ttl_column = ttl_col_str;
639         *ttl_field_index = i;
640       }
641     }
642     return HA_EXIT_SUCCESS;
643   }
644 
645   /* Check if TTL column exists in table */
646   if (!ttl_col_str.empty()) {
647     bool found = false;
648     for (uint i = 0; i < table_arg->s->fields; i++) {
649       Field *const field = table_arg->field[i];
650       if (my_strcasecmp(system_charset_info, field->field_name,
651                         ttl_col_str.c_str()) == 0 &&
652           field->real_type() == MYSQL_TYPE_LONGLONG &&
653           field->key_type() == HA_KEYTYPE_ULONGLONG &&
654           !field->real_maybe_null()) {
655         *ttl_column = ttl_col_str;
656         *ttl_field_index = i;
657         found = true;
658         break;
659       }
660     }
661 
662     if (!found) {
663       my_error(ER_RDB_TTL_COL_FORMAT, MYF(0), ttl_col_str.c_str());
664       return HA_EXIT_FAILURE;
665     }
666   }
667 
668   return HA_EXIT_SUCCESS;
669 }
670 
gen_qualifier_for_table(const char * const qualifier,const std::string & partition_name)671 const std::string Rdb_key_def::gen_qualifier_for_table(
672     const char *const qualifier, const std::string &partition_name) {
673   bool has_partition = !partition_name.empty();
674   std::string qualifier_str = "";
675 
676   if (!strcmp(qualifier, RDB_CF_NAME_QUALIFIER)) {
677     return has_partition ? gen_cf_name_qualifier_for_partition(partition_name)
678                          : qualifier_str + RDB_CF_NAME_QUALIFIER +
679                                RDB_QUALIFIER_VALUE_SEP;
680   } else if (!strcmp(qualifier, RDB_TTL_DURATION_QUALIFIER)) {
681     return has_partition
682                ? gen_ttl_duration_qualifier_for_partition(partition_name)
683                : qualifier_str + RDB_TTL_DURATION_QUALIFIER +
684                      RDB_QUALIFIER_VALUE_SEP;
685   } else if (!strcmp(qualifier, RDB_TTL_COL_QUALIFIER)) {
686     return has_partition ? gen_ttl_col_qualifier_for_partition(partition_name)
687                          : qualifier_str + RDB_TTL_COL_QUALIFIER +
688                                RDB_QUALIFIER_VALUE_SEP;
689   } else {
690     assert(0);
691   }
692 
693   return qualifier_str;
694 }
695 
696 /*
697   Formats the string and returns the column family name assignment part for a
698   specific partition.
699 */
gen_cf_name_qualifier_for_partition(const std::string & prefix)700 const std::string Rdb_key_def::gen_cf_name_qualifier_for_partition(
701     const std::string &prefix) {
702   assert(!prefix.empty());
703 
704   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_CF_NAME_QUALIFIER +
705          RDB_QUALIFIER_VALUE_SEP;
706 }
707 
gen_ttl_duration_qualifier_for_partition(const std::string & prefix)708 const std::string Rdb_key_def::gen_ttl_duration_qualifier_for_partition(
709     const std::string &prefix) {
710   assert(!prefix.empty());
711 
712   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP +
713          RDB_TTL_DURATION_QUALIFIER + RDB_QUALIFIER_VALUE_SEP;
714 }
715 
gen_ttl_col_qualifier_for_partition(const std::string & prefix)716 const std::string Rdb_key_def::gen_ttl_col_qualifier_for_partition(
717     const std::string &prefix) {
718   assert(!prefix.empty());
719 
720   return prefix + RDB_PER_PARTITION_QUALIFIER_NAME_SEP + RDB_TTL_COL_QUALIFIER +
721          RDB_QUALIFIER_VALUE_SEP;
722 }
723 
parse_comment_for_qualifier(const std::string & comment,const TABLE * const table_arg,const Rdb_tbl_def * const tbl_def_arg,bool * per_part_match_found,const char * const qualifier)724 const std::string Rdb_key_def::parse_comment_for_qualifier(
725     const std::string &comment, const TABLE *const table_arg,
726     const Rdb_tbl_def *const tbl_def_arg, bool *per_part_match_found,
727     const char *const qualifier) {
728   assert(table_arg != nullptr);
729   assert(tbl_def_arg != nullptr);
730   assert(per_part_match_found != nullptr);
731   assert(qualifier != nullptr);
732 
733   std::string empty_result;
734 
735   // Flag which marks if partition specific options were found.
736   *per_part_match_found = false;
737 
738   if (comment.empty()) {
739     return empty_result;
740   }
741 
742   // Let's fetch the comment for a index and check if there's a custom key
743   // name specified for a partition we are handling.
744   std::vector<std::string> v =
745       myrocks::parse_into_tokens(comment, RDB_QUALIFIER_SEP);
746 
747   std::string search_str = gen_qualifier_for_table(qualifier);
748 
749   // If table has partitions then we need to check if user has requested
750   // qualifiers on a per partition basis.
751   //
752   // NOTE: this means if you specify a qualifier for a specific partition it
753   // will take precedence the 'table level' qualifier if one exists.
754   std::string search_str_part;
755   if (table_arg->part_info != nullptr) {
756     std::string partition_name = tbl_def_arg->base_partition();
757     assert(!partition_name.empty());
758     search_str_part = gen_qualifier_for_table(qualifier, partition_name);
759   }
760 
761   assert(!search_str.empty());
762 
763   // Basic O(N) search for a matching assignment. At most we expect maybe
764   // ten or so elements here.
765   if (!search_str_part.empty()) {
766     for (const auto &it : v) {
767       if (it.substr(0, search_str_part.length()) == search_str_part) {
768         // We found a prefix match. Try to parse it as an assignment.
769         std::vector<std::string> tokens =
770             myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
771 
772         // We found a custom qualifier, it was in the form we expected it to be.
773         // Return that instead of whatever we initially wanted to return. In
774         // a case below the `foo` part will be returned to the caller.
775         //
776         // p3_cfname=foo
777         //
778         // If no value was specified then we'll return an empty string which
779         // later gets translated into using a default CF.
780         if (tokens.size() == 2) {
781           *per_part_match_found = true;
782           return tokens[1];
783         } else {
784           return empty_result;
785         }
786       }
787     }
788   }
789 
790   // Do this loop again, this time searching for 'table level' qualifiers if we
791   // didn't find any partition level qualifiers above.
792   for (const auto &it : v) {
793     if (it.substr(0, search_str.length()) == search_str) {
794       std::vector<std::string> tokens =
795           myrocks::parse_into_tokens(it, RDB_QUALIFIER_VALUE_SEP);
796       if (tokens.size() == 2) {
797         return tokens[1];
798       } else {
799         return empty_result;
800       }
801     }
802   }
803 
804   // If we didn't find any partitioned/non-partitioned qualifiers, return an
805   // empty string.
806   return empty_result;
807 }
808 
809 /**
810   Read a memcmp key part from a slice using the passed in reader.
811 
812   Returns -1 if field was null, 1 if error, 0 otherwise.
813 */
read_memcmp_key_part(const TABLE * table_arg,Rdb_string_reader * reader,const uint part_num) const814 int Rdb_key_def::read_memcmp_key_part(const TABLE *table_arg,
815                                       Rdb_string_reader *reader,
816                                       const uint part_num) const {
817   /* It is impossible to unpack the column. Skip it. */
818   if (m_pack_info[part_num].m_field_maybe_null) {
819     const char *nullp;
820     if (!(nullp = reader->read(1))) return 1;
821     if (*nullp == 0) {
822       /* This is a NULL value */
823       return -1;
824     } else {
825       /* If NULL marker is not '0', it can be only '1'  */
826       if (*nullp != 1) return 1;
827     }
828   }
829 
830   Rdb_field_packing *fpi = &m_pack_info[part_num];
831   assert(table_arg->s != nullptr);
832 
833   if ((fpi->m_skip_func)(fpi, reader)) {
834     return 1;
835   }
836   return 0;
837 }
838 
839 /**
840   Get a mem-comparable form of Primary Key from mem-comparable form of this key
841 
842   @param
843     pk_descr        Primary Key descriptor
844     key             Index tuple from this key in mem-comparable form
845     pk_buffer  OUT  Put here mem-comparable form of the Primary Key.
846 
847   @note
848     It may or may not be possible to restore primary key columns to their
849     mem-comparable form.  To handle all cases, this function copies mem-
850     comparable forms directly.
851 
852     RocksDB SE supports "Extended keys". This means that PK columns are present
853     at the end of every key.  If the key already includes PK columns, then
854     these columns are not present at the end of the key.
855 
856     Because of the above, we copy each primary key column.
857 
858   @todo
859     If we checked crc32 checksums in this function, we would catch some CRC
860     violations that we currently don't. On the other hand, there is a broader
861     set of queries for which we would check the checksum twice.
862 */
863 
get_primary_key_tuple(const TABLE * const table,const Rdb_key_def & pk_descr,const rocksdb::Slice * const key,uchar * const pk_buffer) const864 uint Rdb_key_def::get_primary_key_tuple(const TABLE *const table,
865                                         const Rdb_key_def &pk_descr,
866                                         const rocksdb::Slice *const key,
867                                         uchar *const pk_buffer) const {
868   assert(table != nullptr);
869   assert(key != nullptr);
870   assert(m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY);
871   assert(pk_buffer);
872 
873   uint size = 0;
874   uchar *buf = pk_buffer;
875   assert(m_pk_key_parts);
876 
877   /* Put the PK number */
878   rdb_netbuf_store_index(buf, pk_descr.m_index_number);
879   buf += INDEX_NUMBER_SIZE;
880   size += INDEX_NUMBER_SIZE;
881 
882   const char *start_offs[MAX_REF_PARTS];
883   const char *end_offs[MAX_REF_PARTS];
884   int pk_key_part;
885   uint i;
886   Rdb_string_reader reader(key);
887 
888   // Skip the index number
889   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
890 
891   for (i = 0; i < m_key_parts; i++) {
892     if ((pk_key_part = m_pk_part_no[i]) != -1) {
893       start_offs[pk_key_part] = reader.get_current_ptr();
894     }
895 
896     if (read_memcmp_key_part(table, &reader, i) > 0) {
897       return RDB_INVALID_KEY_LEN;
898     }
899 
900     if (pk_key_part != -1) {
901       end_offs[pk_key_part] = reader.get_current_ptr();
902     }
903   }
904 
905   for (i = 0; i < m_pk_key_parts; i++) {
906     const uint part_size = end_offs[i] - start_offs[i];
907     memcpy(buf, start_offs[i], end_offs[i] - start_offs[i]);
908     buf += part_size;
909     size += part_size;
910   }
911 
912   return size;
913 }
914 
915 /**
916   Get a mem-comparable form of Secondary Key from mem-comparable form of this
917   key, without the extended primary key tail.
918 
919   @param
920     key                Index tuple from this key in mem-comparable form
921     sk_buffer     OUT  Put here mem-comparable form of the Secondary Key.
922     n_null_fields OUT  Put number of null fields contained within sk entry
923 */
get_memcmp_sk_parts(const TABLE * table,const rocksdb::Slice & key,uchar * sk_buffer,uint * n_null_fields) const924 uint Rdb_key_def::get_memcmp_sk_parts(const TABLE *table,
925                                       const rocksdb::Slice &key,
926                                       uchar *sk_buffer,
927                                       uint *n_null_fields) const {
928   assert(table != nullptr);
929   assert(sk_buffer != nullptr);
930   assert(n_null_fields != nullptr);
931   assert(m_keyno != table->s->primary_key);
932   assert(!table_has_hidden_pk(table));
933 
934   uchar *buf = sk_buffer;
935 
936   int res;
937   Rdb_string_reader reader(&key);
938   const char *start = reader.get_current_ptr();
939 
940   // Skip the index number
941   if ((!reader.read(INDEX_NUMBER_SIZE))) return RDB_INVALID_KEY_LEN;
942 
943   for (uint i = 0; i < table->key_info[m_keyno].user_defined_key_parts; i++) {
944     if ((res = read_memcmp_key_part(table, &reader, i)) > 0) {
945       return RDB_INVALID_KEY_LEN;
946     } else if (res == -1) {
947       (*n_null_fields)++;
948     }
949   }
950 
951   uint sk_memcmp_len = reader.get_current_ptr() - start;
952   memcpy(buf, start, sk_memcmp_len);
953   return sk_memcmp_len;
954 }
955 
956 /**
957   Convert index tuple into storage (i.e. mem-comparable) format
958 
959   @detail
960     Currently this is done by unpacking into table->record[0] and then
961     packing index columns into storage format.
962 
963   @param pack_buffer Temporary area for packing varchar columns. Its
964                      size is at least max_storage_fmt_length() bytes.
965 */
966 
pack_index_tuple(TABLE * const tbl,uchar * const pack_buffer,uchar * const packed_tuple,const uchar * const key_tuple,const key_part_map & keypart_map) const967 uint Rdb_key_def::pack_index_tuple(TABLE *const tbl, uchar *const pack_buffer,
968                                    uchar *const packed_tuple,
969                                    const uchar *const key_tuple,
970                                    const key_part_map &keypart_map) const {
971   assert(tbl != nullptr);
972   assert(pack_buffer != nullptr);
973   assert(packed_tuple != nullptr);
974   assert(key_tuple != nullptr);
975 
976   /* We were given a record in KeyTupleFormat. First, save it to record */
977   const uint key_len = calculate_key_len(tbl, m_keyno, keypart_map);
978   key_restore(tbl->record[0], key_tuple, &tbl->key_info[m_keyno], key_len);
979 
980   uint n_used_parts = my_count_bits(keypart_map);
981   if (keypart_map == HA_WHOLE_KEY) n_used_parts = 0;  // Full key is used
982 
983   /* Then, convert the record into a mem-comparable form */
984   return pack_record(tbl, pack_buffer, tbl->record[0], packed_tuple, nullptr,
985                      false, 0, n_used_parts);
986 }
987 
988 /**
989   @brief
990     Check if "unpack info" data includes checksum.
991 
992   @detail
993     This is used only by CHECK TABLE to count the number of rows that have
994     checksums.
995 */
996 
unpack_info_has_checksum(const rocksdb::Slice & unpack_info)997 bool Rdb_key_def::unpack_info_has_checksum(const rocksdb::Slice &unpack_info) {
998   size_t size = unpack_info.size();
999   if (size == 0) {
1000     return false;
1001   }
1002   const uchar *ptr = (const uchar *)unpack_info.data();
1003 
1004   // Skip unpack info if present.
1005   if (is_unpack_data_tag(ptr[0]) && size >= get_unpack_header_size(ptr[0])) {
1006     const uint16 skip_len = rdb_netbuf_to_uint16(ptr + 1);
1007     SHIP_ASSERT(size >= skip_len);
1008 
1009     size -= skip_len;
1010     ptr += skip_len;
1011   }
1012 
1013   return (size == RDB_CHECKSUM_CHUNK_SIZE && ptr[0] == RDB_CHECKSUM_DATA_TAG);
1014 }
1015 
1016 /*
1017   @return Number of bytes that were changed
1018 */
successor(uchar * const packed_tuple,const uint len)1019 int Rdb_key_def::successor(uchar *const packed_tuple, const uint len) {
1020   assert(packed_tuple != nullptr);
1021 
1022   int changed = 0;
1023   uchar *p = packed_tuple + len - 1;
1024   for (; p > packed_tuple; p--) {
1025     changed++;
1026     if (*p != uchar(0xFF)) {
1027       *p = *p + 1;
1028       break;
1029     }
1030     *p = '\0';
1031   }
1032   return changed;
1033 }
1034 
1035 /*
1036   @return Number of bytes that were changed
1037 */
predecessor(uchar * const packed_tuple,const uint len)1038 int Rdb_key_def::predecessor(uchar *const packed_tuple, const uint len) {
1039   assert(packed_tuple != nullptr);
1040 
1041   int changed = 0;
1042   uchar *p = packed_tuple + len - 1;
1043   for (; p > packed_tuple; p--) {
1044     changed++;
1045     if (*p != uchar(0x00)) {
1046       *p = *p - 1;
1047       break;
1048     }
1049     *p = 0xFF;
1050   }
1051   return changed;
1052 }
1053 
1054 static const std::map<char, size_t> UNPACK_HEADER_SIZES = {
1055     {RDB_UNPACK_DATA_TAG, RDB_UNPACK_HEADER_SIZE},
1056     {RDB_UNPACK_COVERED_DATA_TAG, RDB_UNPACK_COVERED_HEADER_SIZE}};
1057 
1058 /*
1059   @return The length in bytes of the header specified by the given tag
1060 */
get_unpack_header_size(char tag)1061 size_t Rdb_key_def::get_unpack_header_size(char tag) {
1062   assert(is_unpack_data_tag(tag));
1063   return UNPACK_HEADER_SIZES.at(tag);
1064 }
1065 
1066 /*
1067   Get a bitmap indicating which varchar columns must be covered for this
1068   lookup to be covered. If the bitmap is a subset of the covered bitmap, then
1069   the lookup is covered. If it can already be determined that the lookup is
1070   not covered, map->bitmap will be set to null.
1071  */
get_lookup_bitmap(const TABLE * table,MY_BITMAP * map) const1072 void Rdb_key_def::get_lookup_bitmap(const TABLE *table, MY_BITMAP *map) const {
1073   assert(map->bitmap == nullptr);
1074   bitmap_init(map, nullptr, MAX_REF_PARTS, false);
1075   uint curr_bitmap_pos = 0;
1076 
1077   // Indicates which columns in the read set might be covered.
1078   MY_BITMAP maybe_covered_bitmap;
1079   bitmap_init(&maybe_covered_bitmap, nullptr, table->read_set->n_bits, false);
1080 
1081   for (uint i = 0; i < m_key_parts; i++) {
1082     if (table_has_hidden_pk(table) && i + 1 == m_key_parts) {
1083       continue;
1084     }
1085 
1086     Field *const field = m_pack_info[i].get_field_in_table(table);
1087 
1088     // Columns which are always covered are not stored in the covered bitmap so
1089     // we can ignore them here too.
1090     if (m_pack_info[i].m_covered &&
1091         bitmap_is_set(table->read_set, field->field_index)) {
1092       bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1093       continue;
1094     }
1095 
1096     switch (field->real_type()) {
1097       // This type may be covered depending on the record. If it was requested,
1098       // we require the covered bitmap to have this bit set.
1099       case MYSQL_TYPE_VARCHAR:
1100         if (curr_bitmap_pos < MAX_REF_PARTS) {
1101           if (bitmap_is_set(table->read_set, field->field_index)) {
1102             bitmap_set_bit(map, curr_bitmap_pos);
1103             bitmap_set_bit(&maybe_covered_bitmap, field->field_index);
1104           }
1105           curr_bitmap_pos++;
1106         } else {
1107           bitmap_free(&maybe_covered_bitmap);
1108           bitmap_free(map);
1109           return;
1110         }
1111         break;
1112       // This column is a type which is never covered. If it was requested, we
1113       // know this lookup will never be covered.
1114       default:
1115         if (bitmap_is_set(table->read_set, field->field_index)) {
1116           bitmap_free(&maybe_covered_bitmap);
1117           bitmap_free(map);
1118           return;
1119         }
1120         break;
1121     }
1122   }
1123 
1124   // If there are columns which are not covered in the read set, the lookup
1125   // can't be covered.
1126   if (!bitmap_cmp(table->read_set, &maybe_covered_bitmap)) {
1127     bitmap_free(map);
1128   }
1129   bitmap_free(&maybe_covered_bitmap);
1130 }
1131 
1132 /*
1133   Return true if for this secondary index
1134   - All of the requested columns are in the index
1135   - All values for columns that are prefix-only indexes are shorter or equal
1136     in length to the prefix
1137  */
covers_lookup(const rocksdb::Slice * const unpack_info,const MY_BITMAP * const lookup_bitmap) const1138 bool Rdb_key_def::covers_lookup(const rocksdb::Slice *const unpack_info,
1139                                 const MY_BITMAP *const lookup_bitmap) const {
1140   assert(lookup_bitmap != nullptr);
1141   if (!use_covered_bitmap_format() || lookup_bitmap->bitmap == nullptr) {
1142     return false;
1143   }
1144 
1145   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
1146 
1147   // Check if this unpack_info has a covered_bitmap
1148   const char *unpack_header = unp_reader.get_current_ptr();
1149   const bool has_covered_unpack_info =
1150       unp_reader.remaining_bytes() &&
1151       unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG;
1152   if (!has_covered_unpack_info ||
1153       !unp_reader.read(RDB_UNPACK_COVERED_HEADER_SIZE)) {
1154     return false;
1155   }
1156 
1157   MY_BITMAP covered_bitmap;
1158   my_bitmap_map covered_bits;
1159   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1160   covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
1161                                       sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
1162                                       RDB_UNPACK_COVERED_DATA_LEN_SIZE);
1163 
1164   return bitmap_is_subset(lookup_bitmap, &covered_bitmap);
1165 }
1166 
1167 /* Indicates that all key parts can be unpacked to cover a secondary lookup */
can_cover_lookup() const1168 bool Rdb_key_def::can_cover_lookup() const {
1169   for (uint i = 0; i < m_key_parts; i++) {
1170     if (!m_pack_info[i].m_covered) return false;
1171   }
1172   return true;
1173 }
1174 
pack_field(Field * const field,Rdb_field_packing * pack_info,uchar * tuple,uchar * const packed_tuple,uchar * const pack_buffer,Rdb_string_writer * const unpack_info,uint * const n_null_fields) const1175 uchar *Rdb_key_def::pack_field(Field *const field, Rdb_field_packing *pack_info,
1176                                uchar *tuple, uchar *const packed_tuple,
1177                                uchar *const pack_buffer,
1178                                Rdb_string_writer *const unpack_info,
1179                                uint *const n_null_fields) const {
1180   if (field->real_maybe_null()) {
1181     assert(is_storage_available(tuple - packed_tuple, 1));
1182     if (field->is_real_null()) {
1183       /* NULL value. store '\0' so that it sorts before non-NULL values */
1184       *tuple++ = 0;
1185       /* That's it, don't store anything else */
1186       if (n_null_fields) (*n_null_fields)++;
1187       return tuple;
1188     } else {
1189       /* Not a NULL value. Store '1' */
1190       *tuple++ = 1;
1191     }
1192   }
1193 
1194   const bool create_unpack_info =
1195       (unpack_info &&  // we were requested to generate unpack_info
1196        pack_info->uses_unpack_info());  // and this keypart uses it
1197   Rdb_pack_field_context pack_ctx(unpack_info);
1198 
1199   // Set the offset for methods which do not take an offset as an argument
1200   assert(
1201       is_storage_available(tuple - packed_tuple, pack_info->m_max_image_len));
1202 
1203   (pack_info->m_pack_func)(pack_info, field, pack_buffer, &tuple, &pack_ctx);
1204 
1205   /* Make "unpack info" to be stored in the value */
1206   if (create_unpack_info) {
1207     (pack_info->m_make_unpack_info_func)(pack_info->m_charset_codec, field,
1208                                          &pack_ctx);
1209   }
1210 
1211   return tuple;
1212 }
1213 
1214 /**
1215   Get index columns from the record and pack them into mem-comparable form.
1216 
1217   @param
1218     tbl                   Table we're working on
1219     record           IN   Record buffer with fields in table->record format
1220     pack_buffer      IN   Temporary area for packing varchars. The size is
1221                           at least max_storage_fmt_length() bytes.
1222     packed_tuple     OUT  Key in the mem-comparable form
1223     unpack_info      OUT  Unpack data
1224     unpack_info_len  OUT  Unpack data length
1225     n_key_parts           Number of keyparts to process. 0 means all of them.
1226     n_null_fields    OUT  Number of key fields with NULL value.
1227     ttl_bytes        IN   Previous ttl bytes from old record for update case or
1228                           current ttl bytes from just packed primary key/value
1229   @detail
1230     Some callers do not need the unpack information, they can pass
1231     unpack_info=nullptr, unpack_info_len=nullptr.
1232 
1233   @return
1234     Length of the packed tuple
1235 */
1236 
pack_record(const TABLE * const tbl,uchar * const pack_buffer,const uchar * const record,uchar * const packed_tuple,Rdb_string_writer * const unpack_info,const bool should_store_row_debug_checksums,const longlong hidden_pk_id,uint n_key_parts,uint * const n_null_fields,const char * const ttl_bytes) const1237 uint Rdb_key_def::pack_record(const TABLE *const tbl, uchar *const pack_buffer,
1238                               const uchar *const record,
1239                               uchar *const packed_tuple,
1240                               Rdb_string_writer *const unpack_info,
1241                               const bool should_store_row_debug_checksums,
1242                               const longlong hidden_pk_id, uint n_key_parts,
1243                               uint *const n_null_fields,
1244                               const char *const ttl_bytes) const {
1245   assert(tbl != nullptr);
1246   assert(pack_buffer != nullptr);
1247   assert(record != nullptr);
1248   assert(packed_tuple != nullptr);
1249   // Checksums for PKs are made when record is packed.
1250   // We should never attempt to make checksum just from PK values
1251   assert_IMP(should_store_row_debug_checksums,
1252                   (m_index_type == INDEX_TYPE_SECONDARY));
1253 
1254   uchar *tuple = packed_tuple;
1255   size_t unpack_start_pos = size_t(-1);
1256   size_t unpack_len_pos = size_t(-1);
1257   size_t covered_bitmap_pos = size_t(-1);
1258   const bool hidden_pk_exists = table_has_hidden_pk(tbl);
1259 
1260   rdb_netbuf_store_index(tuple, m_index_number);
1261   tuple += INDEX_NUMBER_SIZE;
1262 
1263   // If n_key_parts is 0, it means all columns.
1264   // The following includes the 'extended key' tail.
1265   // The 'extended key' includes primary key. This is done to 'uniqify'
1266   // non-unique indexes
1267   const bool use_all_columns = n_key_parts == 0 || n_key_parts == MAX_REF_PARTS;
1268 
1269   // If hidden pk exists, but hidden pk wasnt passed in, we can't pack the
1270   // hidden key part.  So we skip it (its always 1 part).
1271   if (hidden_pk_exists && !hidden_pk_id && use_all_columns) {
1272     n_key_parts = m_key_parts - 1;
1273   } else if (use_all_columns) {
1274     n_key_parts = m_key_parts;
1275   }
1276 
1277   if (n_null_fields) *n_null_fields = 0;
1278 
1279   // Check if we need a covered bitmap. If it is certain that all key parts are
1280   // covering, we don't need one.
1281   bool store_covered_bitmap = false;
1282   if (unpack_info && use_covered_bitmap_format()) {
1283     for (uint i = 0; i < n_key_parts; i++) {
1284       if (!m_pack_info[i].m_covered) {
1285         store_covered_bitmap = true;
1286         break;
1287       }
1288     }
1289   }
1290 
1291   const char tag =
1292       store_covered_bitmap ? RDB_UNPACK_COVERED_DATA_TAG : RDB_UNPACK_DATA_TAG;
1293 
1294   if (unpack_info) {
1295     unpack_info->clear();
1296 
1297     if (m_index_type == INDEX_TYPE_SECONDARY &&
1298         m_total_index_flags_length > 0) {
1299       // Reserve space for index flag fields
1300       unpack_info->allocate(m_total_index_flags_length);
1301 
1302       // Insert TTL timestamp
1303       if (has_ttl() && ttl_bytes) {
1304         write_index_flag_field(unpack_info,
1305                                reinterpret_cast<const uchar *>(ttl_bytes),
1306                                Rdb_key_def::TTL_FLAG);
1307       }
1308     }
1309 
1310     unpack_start_pos = unpack_info->get_current_pos();
1311     unpack_info->write_uint8(tag);
1312     unpack_len_pos = unpack_info->get_current_pos();
1313     // we don't know the total length yet, so write a zero
1314     unpack_info->write_uint16(0);
1315 
1316     if (store_covered_bitmap) {
1317       // Reserve two bytes for the covered bitmap. This will store, for key
1318       // parts which are not always covering, whether or not it is covering
1319       // for this record.
1320       covered_bitmap_pos = unpack_info->get_current_pos();
1321       unpack_info->write_uint16(0);
1322     }
1323   }
1324 
1325   MY_BITMAP covered_bitmap;
1326   my_bitmap_map covered_bits;
1327   uint curr_bitmap_pos = 0;
1328   bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
1329 
1330   for (uint i = 0; i < n_key_parts; i++) {
1331     // Fill hidden pk id into the last key part for secondary keys for tables
1332     // with no pk
1333     if (hidden_pk_exists && hidden_pk_id && i + 1 == n_key_parts) {
1334       m_pack_info[i].fill_hidden_pk_val(&tuple, hidden_pk_id);
1335       break;
1336     }
1337 
1338     Field *const field = m_pack_info[i].get_field_in_table(tbl);
1339     assert(field != nullptr);
1340 
1341     uint field_offset = field->ptr - tbl->record[0];
1342     uint null_offset = field->null_offset(tbl->record[0]);
1343     bool maybe_null = field->real_maybe_null();
1344 
1345     field->move_field(
1346         const_cast<uchar *>(record) + field_offset,
1347         maybe_null ? const_cast<uchar *>(record) + null_offset : nullptr,
1348         field->null_bit);
1349     // WARNING! Don't return without restoring field->ptr and field->null_ptr
1350 
1351     tuple = pack_field(field, &m_pack_info[i], tuple, packed_tuple, pack_buffer,
1352                        unpack_info, n_null_fields);
1353 
1354     // If this key part is a prefix of a VARCHAR field, check if it's covered.
1355     if (store_covered_bitmap && field->real_type() == MYSQL_TYPE_VARCHAR &&
1356         !m_pack_info[i].m_covered && curr_bitmap_pos < MAX_REF_PARTS) {
1357       size_t data_length = field->data_length();
1358       uint16 key_length;
1359       if (m_pk_part_no[i] == (uint)-1) {
1360         key_length = tbl->key_info[get_keyno()].key_part[i].length;
1361       } else {
1362         key_length =
1363             tbl->key_info[tbl->s->primary_key].key_part[m_pk_part_no[i]].length;
1364       }
1365 
1366       if (m_pack_info[i].m_unpack_func != nullptr &&
1367           data_length <= key_length) {
1368         bitmap_set_bit(&covered_bitmap, curr_bitmap_pos);
1369       }
1370       curr_bitmap_pos++;
1371     }
1372 
1373     // Restore field->ptr and field->null_ptr
1374     field->move_field(tbl->record[0] + field_offset,
1375                       maybe_null ? tbl->record[0] + null_offset : nullptr,
1376                       field->null_bit);
1377   }
1378 
1379   if (unpack_info) {
1380     const size_t len = unpack_info->get_current_pos() - unpack_start_pos;
1381     assert(len <= std::numeric_limits<uint16_t>::max());
1382 
1383     // Don't store the unpack_info if it has only the header (that is, there's
1384     // no meaningful content).
1385     // Primary Keys are special: for them, store the unpack_info even if it's
1386     // empty (provided m_maybe_unpack_info==true, see
1387     // ha_rocksdb::convert_record_to_storage_format)
1388     if (m_index_type == Rdb_key_def::INDEX_TYPE_SECONDARY) {
1389       if (len == get_unpack_header_size(tag) && !covered_bits) {
1390         unpack_info->truncate(unpack_start_pos);
1391       } else if (store_covered_bitmap) {
1392         unpack_info->write_uint16_at(covered_bitmap_pos, covered_bits);
1393       }
1394     } else {
1395       unpack_info->write_uint16_at(unpack_len_pos, len);
1396     }
1397 
1398     //
1399     // Secondary keys have key and value checksums in the value part
1400     // Primary key is a special case (the value part has non-indexed columns),
1401     // so the checksums are computed and stored by
1402     // ha_rocksdb::convert_record_to_storage_format
1403     //
1404     if (should_store_row_debug_checksums) {
1405       const ha_checksum key_crc32 =
1406           my_checksum(0, packed_tuple, tuple - packed_tuple);
1407       const ha_checksum val_crc32 =
1408           my_checksum(0, unpack_info->ptr(), unpack_info->get_current_pos());
1409 
1410       unpack_info->write_uint8(RDB_CHECKSUM_DATA_TAG);
1411       unpack_info->write_uint32(key_crc32);
1412       unpack_info->write_uint32(val_crc32);
1413     }
1414   }
1415 
1416   assert(is_storage_available(tuple - packed_tuple, 0));
1417 
1418   return tuple - packed_tuple;
1419 }
1420 
1421 /**
1422   Pack the hidden primary key into mem-comparable form.
1423 
1424   @param
1425     tbl                   Table we're working on
1426     hidden_pk_id     IN   New value to be packed into key
1427     packed_tuple     OUT  Key in the mem-comparable form
1428 
1429   @return
1430     Length of the packed tuple
1431 */
1432 
pack_hidden_pk(const longlong hidden_pk_id,uchar * const packed_tuple) const1433 uint Rdb_key_def::pack_hidden_pk(const longlong hidden_pk_id,
1434                                  uchar *const packed_tuple) const {
1435   assert(packed_tuple != nullptr);
1436 
1437   uchar *tuple = packed_tuple;
1438   rdb_netbuf_store_index(tuple, m_index_number);
1439   tuple += INDEX_NUMBER_SIZE;
1440   assert(m_key_parts == 1);
1441   assert(is_storage_available(tuple - packed_tuple,
1442                                    m_pack_info[0].m_max_image_len));
1443 
1444   m_pack_info[0].fill_hidden_pk_val(&tuple, hidden_pk_id);
1445 
1446   assert(is_storage_available(tuple - packed_tuple, 0));
1447   return tuple - packed_tuple;
1448 }
1449 
1450   /**
1451     Function of type rdb_index_field_pack_t
1452 
1453     The following code (Rdb_key_def::pack_* and dependent functions) is pulled
1454     directly from ./sql/field.cc from all of the various
1455     Field_*::make_sort_key() functions.  These results of these functions within
1456     the server code was never intended to be persisted and as such the encoding
1457     and comparison can change over time without any notice.  To protect us from
1458     such an event as well as to ensure binary upgrade compatibility, we have
1459     copied that code here so that it is entirely within our control.
1460   */
1461 
1462 #if !defined(DBL_EXP_DIG)
1463 #define DBL_EXP_DIG (sizeof(double) * 8 - DBL_MANT_DIG)
1464 #endif
1465 
change_double_for_sort(double nr,uchar * to)1466 static void change_double_for_sort(double nr, uchar *to) {
1467   uchar *tmp = to;
1468   if (nr == 0.0) { /* Change to zero string */
1469     tmp[0] = (uchar)128;
1470     memset(tmp + 1, 0, sizeof(nr) - 1);
1471   } else {
1472 #ifdef WORDS_BIGENDIAN
1473     memcpy(tmp, &nr, sizeof(nr));
1474 #else
1475     {
1476       uchar *ptr = (uchar *)&nr;
1477 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
1478       tmp[0] = ptr[3];
1479       tmp[1] = ptr[2];
1480       tmp[2] = ptr[1];
1481       tmp[3] = ptr[0];
1482       tmp[4] = ptr[7];
1483       tmp[5] = ptr[6];
1484       tmp[6] = ptr[5];
1485       tmp[7] = ptr[4];
1486 #else
1487       tmp[0] = ptr[7];
1488       tmp[1] = ptr[6];
1489       tmp[2] = ptr[5];
1490       tmp[3] = ptr[4];
1491       tmp[4] = ptr[3];
1492       tmp[5] = ptr[2];
1493       tmp[6] = ptr[1];
1494       tmp[7] = ptr[0];
1495 #endif
1496     }
1497 #endif
1498     if (tmp[0] & 128) /* Negative */
1499     {                 /* make complement */
1500       uint i;
1501       for (i = 0; i < sizeof(nr); i++) tmp[i] = tmp[i] ^ (uchar)255;
1502     } else { /* Set high and move exponent one up */
1503       ushort exp_part =
1504           (((ushort)tmp[0] << 8) | (ushort)tmp[1] | (ushort)32768);
1505       exp_part += (ushort)1 << (16 - 1 - DBL_EXP_DIG);
1506       tmp[0] = (uchar)(exp_part >> 8);
1507       tmp[1] = (uchar)exp_part;
1508     }
1509   }
1510 }
1511 
1512 /**
1513    Copies an integer value to a format comparable with memcmp(). The
1514    format is characterized by the following:
1515 
1516    - The sign bit goes first and is unset for negative values.
1517    - The representation is big endian.
1518 
1519    The function template can be instantiated to copy from little or
1520    big endian values.
1521 
1522    @tparam Is_big_endian True if the source integer is big endian.
1523 
1524    @param to          Where to write the integer.
1525    @param to_length   Size in bytes of the destination buffer.
1526    @param from        Where to read the integer.
1527    @param from_length Size in bytes of the source integer
1528    @param is_unsigned True if the source integer is an unsigned value.
1529 */
1530 template <bool Is_big_endian>
copy_integer(uchar * to,size_t to_length,const uchar * from,size_t from_length,bool is_unsigned)1531 void copy_integer(uchar *to, size_t to_length, const uchar *from,
1532                   size_t from_length, bool is_unsigned) {
1533   if (Is_big_endian) {
1534     if (is_unsigned)
1535       to[0] = from[0];
1536     else
1537       to[0] = (char)(from[0] ^ 128);  // Reverse the sign bit.
1538     memcpy(to + 1, from + 1, to_length - 1);
1539   } else {
1540     const int sign_byte = from[from_length - 1];
1541     if (is_unsigned)
1542       to[0] = sign_byte;
1543     else
1544       to[0] = static_cast<char>(sign_byte ^ 128);  // Reverse the sign bit.
1545     for (size_t i = 1, j = from_length - 2; i < to_length; ++i, --j)
1546       to[i] = from[j];
1547   }
1548 }
1549 
pack_tiny(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1550 void Rdb_key_def::pack_tiny(
1551     Rdb_field_packing *const fpi, Field *const field,
1552     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1553     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1554   assert(fpi != nullptr);
1555   assert(field != nullptr);
1556   assert(dst != nullptr);
1557   assert(*dst != nullptr);
1558   assert(field->real_type() == MYSQL_TYPE_TINY);
1559 
1560   const size_t length = fpi->m_max_image_len;
1561   const uchar *ptr = field->ptr;
1562   const bool unsigned_flag =
1563       dynamic_cast<Field_num *const>(field)->unsigned_flag;
1564   uchar *to = *dst;
1565 
1566   assert(length >= 1);
1567   if (unsigned_flag)
1568     *to = *ptr;
1569   else
1570     to[0] = (char)(ptr[0] ^ (uchar)128); /* Reverse signbit */
1571 
1572   *dst += length;
1573 }
1574 
pack_short(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1575 void Rdb_key_def::pack_short(
1576     Rdb_field_packing *const fpi, Field *const field,
1577     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1578     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1579   assert(fpi != nullptr);
1580   assert(field != nullptr);
1581   assert(dst != nullptr);
1582   assert(*dst != nullptr);
1583   assert(field->real_type() == MYSQL_TYPE_SHORT);
1584 
1585   const size_t length = fpi->m_max_image_len;
1586   const uchar *ptr = field->ptr;
1587   const bool unsigned_flag =
1588       dynamic_cast<Field_num *const>(field)->unsigned_flag;
1589   uchar *to = *dst;
1590 
1591   assert(length >= 2);
1592 #ifdef WORDS_BIGENDIAN
1593   if (!field->table->s->db_low_byte_first) {
1594     if (unsigned_flag)
1595       to[0] = ptr[0];
1596     else
1597       to[0] = (char)(ptr[0] ^ 128); /* Revers signbit */
1598     to[1] = ptr[1];
1599   } else
1600 #endif
1601   {
1602     if (unsigned_flag)
1603       to[0] = ptr[1];
1604     else
1605       to[0] = (char)(ptr[1] ^ 128); /* Revers signbit */
1606     to[1] = ptr[0];
1607   }
1608 
1609   *dst += length;
1610 }
1611 
pack_medium(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1612 void Rdb_key_def::pack_medium(
1613     Rdb_field_packing *const fpi, Field *const field,
1614     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1615     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1616   assert(fpi != nullptr);
1617   assert(field != nullptr);
1618   assert(dst != nullptr);
1619   assert(*dst != nullptr);
1620   assert(field->real_type() == MYSQL_TYPE_INT24);
1621 
1622   const size_t length = fpi->m_max_image_len;
1623   const uchar *ptr = field->ptr;
1624   const bool unsigned_flag =
1625       dynamic_cast<Field_num *const>(field)->unsigned_flag;
1626   uchar *to = *dst;
1627 
1628   assert(length >= 3);
1629   if (unsigned_flag)
1630     to[0] = ptr[2];
1631   else
1632     to[0] = (uchar)(ptr[2] ^ 128); /* Revers signbit */
1633   to[1] = ptr[1];
1634   to[2] = ptr[0];
1635 
1636   *dst += length;
1637 }
1638 
pack_long(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1639 void Rdb_key_def::pack_long(
1640     Rdb_field_packing *const fpi, Field *const field,
1641     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1642     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1643   assert(fpi != nullptr);
1644   assert(field != nullptr);
1645   assert(dst != nullptr);
1646   assert(*dst != nullptr);
1647   assert(field->real_type() == MYSQL_TYPE_LONG);
1648 
1649   const size_t length = fpi->m_max_image_len;
1650   const uchar *ptr = field->ptr;
1651   const bool unsigned_flag =
1652       dynamic_cast<Field_num *const>(field)->unsigned_flag;
1653   uchar *to = *dst;
1654 
1655   assert(length >= 4);
1656 #ifdef WORDS_BIGENDIAN
1657   if (!field->table->s->db_low_byte_first) {
1658     if (unsigned_flag)
1659       to[0] = ptr[0];
1660     else
1661       dst[0] = (char)(ptr[0] ^ 128); /* Revers signbit */
1662     to[1] = ptr[1];
1663     to[2] = ptr[2];
1664     to[3] = ptr[3];
1665   } else
1666 #endif
1667   {
1668     if (unsigned_flag)
1669       to[0] = ptr[3];
1670     else
1671       to[0] = (char)(ptr[3] ^ 128); /* Revers signbit */
1672     to[1] = ptr[2];
1673     to[2] = ptr[1];
1674     to[3] = ptr[0];
1675   }
1676 
1677   *dst += length;
1678 }
1679 
pack_longlong(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1680 void Rdb_key_def::pack_longlong(
1681     Rdb_field_packing *const fpi, Field *const field,
1682     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1683     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1684   assert(fpi != nullptr);
1685   assert(field != nullptr);
1686   assert(dst != nullptr);
1687   assert(*dst != nullptr);
1688   assert(field->real_type() == MYSQL_TYPE_LONGLONG);
1689 
1690   static const int PACK_LENGTH = 8;
1691   const size_t length = fpi->m_max_image_len;
1692   const uchar *ptr = field->ptr;
1693   const bool unsigned_flag =
1694       dynamic_cast<Field_num *const>(field)->unsigned_flag;
1695   uchar *to = *dst;
1696 
1697   const size_t from_length = PACK_LENGTH;
1698   const size_t to_length = from_length > length ? from_length : length;
1699 #ifdef WORDS_BIGENDIAN
1700   if (field->table == NULL || !field->table->s->db_low_byte_first)
1701     copy_integer<true>(to, to_length, ptr, from_length, unsigned_flag);
1702   else
1703 #endif
1704     copy_integer<false>(to, to_length, ptr, from_length, unsigned_flag);
1705 
1706   *dst += length;
1707 }
1708 
pack_double(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1709 void Rdb_key_def::pack_double(
1710     Rdb_field_packing *const fpi, Field *const field,
1711     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1712     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1713   assert(fpi != nullptr);
1714   assert(field != nullptr);
1715   assert(dst != nullptr);
1716   assert(*dst != nullptr);
1717   assert(field->real_type() == MYSQL_TYPE_DOUBLE);
1718 
1719   const size_t length = fpi->m_max_image_len;
1720   const uchar *ptr = field->ptr;
1721   uchar *to = *dst;
1722 
1723   double nr;
1724 #ifdef WORDS_BIGENDIAN
1725   if (field->table->s->db_low_byte_first) {
1726     float8get(&nr, ptr);
1727   } else
1728 #endif
1729     doubleget(&nr, ptr);
1730   if (length < 8) {
1731     uchar buff[8];
1732     change_double_for_sort(nr, buff);
1733     memcpy(to, buff, length);
1734   } else
1735     change_double_for_sort(nr, to);
1736 
1737   *dst += length;
1738 }
1739 
1740 #if !defined(FLT_EXP_DIG)
1741 #define FLT_EXP_DIG (sizeof(float) * 8 - FLT_MANT_DIG)
1742 #endif
1743 
pack_float(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1744 void Rdb_key_def::pack_float(
1745     Rdb_field_packing *const fpi, Field *const field,
1746     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1747     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1748   assert(fpi != nullptr);
1749   assert(field != nullptr);
1750   assert(dst != nullptr);
1751   assert(*dst != nullptr);
1752   assert(field->real_type() == MYSQL_TYPE_FLOAT);
1753 
1754   const size_t length = fpi->m_max_image_len;
1755   const uchar *ptr = field->ptr;
1756   uchar *to = *dst;
1757 
1758   assert(length == sizeof(float));
1759   float nr;
1760 
1761 #ifdef WORDS_BIGENDIAN
1762   if (field->table->s->db_low_byte_first) {
1763     float4get(&nr, ptr);
1764   } else
1765 #endif
1766     memcpy(&nr, ptr, length < sizeof(float) ? length : sizeof(float));
1767 
1768   uchar *tmp = to;
1769   if (nr == (float)0.0) { /* Change to zero string */
1770     tmp[0] = (uchar)128;
1771     memset(tmp + 1, 0, length < sizeof(nr) - 1 ? length : sizeof(nr) - 1);
1772   } else {
1773 #ifdef WORDS_BIGENDIAN
1774     memcpy(tmp, &nr, sizeof(nr));
1775 #else
1776     tmp[0] = ptr[3];
1777     tmp[1] = ptr[2];
1778     tmp[2] = ptr[1];
1779     tmp[3] = ptr[0];
1780 #endif
1781     if (tmp[0] & 128) /* Negative */
1782     {                 /* make complement */
1783       uint i;
1784       for (i = 0; i < sizeof(nr); i++) tmp[i] = (uchar)(tmp[i] ^ (uchar)255);
1785     } else {
1786       ushort exp_part =
1787           (((ushort)tmp[0] << 8) | (ushort)tmp[1] | (ushort)32768);
1788       exp_part += (ushort)1 << (16 - 1 - FLT_EXP_DIG);
1789       tmp[0] = (uchar)(exp_part >> 8);
1790       tmp[1] = (uchar)exp_part;
1791     }
1792   }
1793 
1794   *dst += length;
1795 }
1796 
pack_new_decimal(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1797 void Rdb_key_def::pack_new_decimal(
1798     Rdb_field_packing *const fpi, Field *const field,
1799     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1800     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1801   assert(fpi != nullptr);
1802   assert(field != nullptr);
1803   assert(dst != nullptr);
1804   assert(*dst != nullptr);
1805   assert(field->real_type() == MYSQL_TYPE_NEWDECIMAL);
1806 
1807   const size_t length = fpi->m_max_image_len;
1808   const uchar *ptr = field->ptr;
1809   uchar *to = *dst;
1810   Field_new_decimal *const fnd = dynamic_cast<Field_new_decimal *>(field);
1811 
1812   memcpy(to, ptr, length < fnd->bin_size ? length : fnd->bin_size);
1813 
1814   *dst += length;
1815 }
1816 
pack_datetime2(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1817 void Rdb_key_def::pack_datetime2(
1818     Rdb_field_packing *const fpi, Field *const field,
1819     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1820     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1821   assert(fpi != nullptr);
1822   assert(field != nullptr);
1823   assert(dst != nullptr);
1824   assert(*dst != nullptr);
1825   assert(field->real_type() == MYSQL_TYPE_DATETIME2);
1826 
1827   const size_t length = fpi->m_max_image_len;
1828   const uchar *ptr = field->ptr;
1829   uchar *to = *dst;
1830 
1831   memcpy(to, ptr, length);
1832 
1833   *dst += length;
1834 }
1835 
pack_timestamp2(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1836 void Rdb_key_def::pack_timestamp2(
1837     Rdb_field_packing *const fpi, Field *const field,
1838     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1839     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1840   assert(fpi != nullptr);
1841   assert(field != nullptr);
1842   assert(dst != nullptr);
1843   assert(*dst != nullptr);
1844   assert(field->real_type() == MYSQL_TYPE_TIMESTAMP2);
1845 
1846   const size_t length = fpi->m_max_image_len;
1847   const uchar *ptr = field->ptr;
1848   uchar *to = *dst;
1849 
1850   memcpy(to, ptr, length);
1851 
1852   *dst += length;
1853 }
1854 
pack_time2(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1855 void Rdb_key_def::pack_time2(
1856     Rdb_field_packing *const fpi, Field *const field,
1857     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1858     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1859   assert(fpi != nullptr);
1860   assert(field != nullptr);
1861   assert(dst != nullptr);
1862   assert(*dst != nullptr);
1863   assert(field->real_type() == MYSQL_TYPE_TIME2);
1864 
1865   const size_t length = fpi->m_max_image_len;
1866   const uchar *ptr = field->ptr;
1867   uchar *to = *dst;
1868 
1869   memcpy(to, ptr, length);
1870 
1871   *dst += length;
1872 }
1873 
pack_year(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1874 void Rdb_key_def::pack_year(
1875     Rdb_field_packing *const fpi, Field *const field,
1876     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1877     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1878   assert(fpi != nullptr);
1879   assert(field != nullptr);
1880   assert(dst != nullptr);
1881   assert(*dst != nullptr);
1882   assert(field->real_type() == MYSQL_TYPE_YEAR);
1883 
1884   const size_t length = fpi->m_max_image_len;
1885   const uchar *ptr = field->ptr;
1886   const bool unsigned_flag =
1887       dynamic_cast<Field_num *const>(field)->unsigned_flag;
1888   uchar *to = *dst;
1889 
1890   assert(length >= 1);
1891   if (unsigned_flag)
1892     *to = *ptr;
1893   else
1894     to[0] = (char)(ptr[0] ^ (uchar)128); /* Reverse signbit */
1895 
1896   *dst += length;
1897 }
1898 
pack_newdate(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1899 void Rdb_key_def::pack_newdate(
1900     Rdb_field_packing *const fpi, Field *const field,
1901     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1902     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1903   assert(fpi != nullptr);
1904   assert(field != nullptr);
1905   assert(dst != nullptr);
1906   assert(*dst != nullptr);
1907   assert(field->real_type() == MYSQL_TYPE_NEWDATE);
1908 
1909   const size_t length = fpi->m_max_image_len;
1910   const uchar *ptr = field->ptr;
1911   uchar *to = *dst;
1912 
1913   assert(length >= 3);
1914   to[0] = ptr[2];
1915   to[1] = ptr[1];
1916   to[2] = ptr[0];
1917 
1918   *dst += length;
1919 }
1920 
pack_blob(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1921 void Rdb_key_def::pack_blob(
1922     Rdb_field_packing *const fpi, Field *const field,
1923     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1924     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1925   assert(fpi != nullptr);
1926   assert(field != nullptr);
1927   assert(dst != nullptr);
1928   assert(*dst != nullptr);
1929   assert(field->real_type() == MYSQL_TYPE_TINY_BLOB ||
1930               field->real_type() == MYSQL_TYPE_MEDIUM_BLOB ||
1931               field->real_type() == MYSQL_TYPE_LONG_BLOB ||
1932               field->real_type() == MYSQL_TYPE_BLOB ||
1933               field->real_type() == MYSQL_TYPE_JSON);
1934 
1935   size_t length = fpi->m_max_image_len;
1936   const uchar *ptr = field->ptr;
1937   uchar *to = *dst;
1938   Field_blob *const field_blob = dynamic_cast<Field_blob *const>(field);
1939   const CHARSET_INFO *field_charset = field_blob->charset();
1940 
1941   uchar *blob;
1942   size_t blob_length = field_blob->get_length();
1943 
1944   if (!blob_length && field_charset->pad_char == 0) {
1945     memset(to, 0, length);
1946   } else {
1947     if (field_charset == &my_charset_bin) {
1948       uchar *pos;
1949 
1950       /*
1951         Store length of blob last in blob to shorter blobs before longer blobs
1952       */
1953       length -= field_blob->pack_length_no_ptr();
1954       pos = to + length;
1955       uint key_length = blob_length < length ? blob_length : length;
1956 
1957       switch (field_blob->pack_length_no_ptr()) {
1958         case 1:
1959           *pos = (char)key_length;
1960           break;
1961         case 2:
1962           mi_int2store(pos, key_length);
1963           break;
1964         case 3:
1965           mi_int3store(pos, key_length);
1966           break;
1967         case 4:
1968           mi_int4store(pos, key_length);
1969           break;
1970       }
1971     }
1972     memcpy(&blob, ptr + field_blob->pack_length_no_ptr(), sizeof(char *));
1973 
1974     blob_length = field_charset->coll->strnxfrm(
1975         field_charset, to, length, length, blob, blob_length,
1976         MY_STRXFRM_PAD_WITH_SPACE | MY_STRXFRM_PAD_TO_MAXLEN);
1977     assert(blob_length == length);
1978   }
1979 
1980   *dst += fpi->m_max_image_len;
1981 }
1982 
1983 /**
1984   This is the end of the code copied from Field_*::make_sort_key()
1985 */
1986 
pack_with_make_sort_key(Rdb_field_packing * const fpi,Field * const field,uchar * const buf MY_ATTRIBUTE ((__unused__)),uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))1987 void Rdb_key_def::pack_with_make_sort_key(
1988     Rdb_field_packing *const fpi, Field *const field,
1989     uchar *const buf MY_ATTRIBUTE((__unused__)), uchar **dst,
1990     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
1991   assert(fpi != nullptr);
1992   assert(field != nullptr);
1993   assert(dst != nullptr);
1994   assert(*dst != nullptr);
1995 
1996   const int max_len = fpi->m_max_image_len;
1997   field->make_sort_key(*dst, max_len);
1998   *dst += max_len;
1999 }
2000 
2001 /*
2002   Compares two keys without unpacking
2003 
2004   @detail
2005   @return
2006     0 - Ok. column_index is the index of the first column which is different.
2007           -1 if two kes are equal
2008     1 - Data format error.
2009 */
compare_keys(const rocksdb::Slice * key1,const rocksdb::Slice * key2,std::size_t * const column_index) const2010 int Rdb_key_def::compare_keys(const rocksdb::Slice *key1,
2011                               const rocksdb::Slice *key2,
2012                               std::size_t *const column_index) const {
2013   assert(key1 != nullptr);
2014   assert(key2 != nullptr);
2015   assert(column_index != nullptr);
2016 
2017   // the caller should check the return value and
2018   // not rely on column_index being valid
2019   *column_index = 0xbadf00d;
2020 
2021   Rdb_string_reader reader1(key1);
2022   Rdb_string_reader reader2(key2);
2023 
2024   // Skip the index number
2025   auto indexp1 = reader1.read(INDEX_NUMBER_SIZE);
2026   if (!indexp1) return HA_EXIT_FAILURE;
2027 
2028   auto indexp2 = reader2.read(INDEX_NUMBER_SIZE);
2029   if (!indexp2) return HA_EXIT_FAILURE;
2030 
2031   // shouldn't compare with other index
2032   assert(memcmp(indexp1, indexp2, INDEX_NUMBER_SIZE) == 0);
2033 
2034   for (uint i = 0; i < m_key_parts; i++) {
2035     const Rdb_field_packing *const fpi = &m_pack_info[i];
2036     if (fpi->m_field_maybe_null) {
2037       const auto nullp1 = reader1.read(1);
2038       const auto nullp2 = reader2.read(1);
2039 
2040       if (nullp1 == nullptr || nullp2 == nullptr) {
2041         return HA_EXIT_FAILURE;
2042       }
2043 
2044       if (*nullp1 != *nullp2) {
2045         *column_index = i;
2046         return HA_EXIT_SUCCESS;
2047       }
2048 
2049       if (*nullp1 == 0) {
2050         /* This is a NULL value */
2051         continue;
2052       }
2053     }
2054 
2055     const auto before_skip1 = reader1.get_current_ptr();
2056     const auto before_skip2 = reader2.get_current_ptr();
2057     assert(fpi->m_skip_func);
2058     if ((fpi->m_skip_func)(fpi, &reader1)) {
2059       return HA_EXIT_FAILURE;
2060     }
2061     if ((fpi->m_skip_func)(fpi, &reader2)) {
2062       return HA_EXIT_FAILURE;
2063     }
2064     const auto size1 = reader1.get_current_ptr() - before_skip1;
2065     const auto size2 = reader2.get_current_ptr() - before_skip2;
2066     if (size1 != size2) {
2067       *column_index = i;
2068       return HA_EXIT_SUCCESS;
2069     }
2070 
2071     if (memcmp(before_skip1, before_skip2, size1) != 0) {
2072       *column_index = i;
2073       return HA_EXIT_SUCCESS;
2074     }
2075   }
2076 
2077   *column_index = m_key_parts;
2078   return HA_EXIT_SUCCESS;
2079 }
2080 
2081 /*
2082   @brief
2083     Given a zero-padded key, determine its real key length
2084 
2085   @detail
2086     Fixed-size skip functions just read.
2087 */
2088 
key_length(const TABLE * const table,const rocksdb::Slice & key) const2089 size_t Rdb_key_def::key_length(const TABLE *const table,
2090                                const rocksdb::Slice &key) const {
2091   assert(table != nullptr);
2092 
2093   Rdb_string_reader reader(&key);
2094 
2095   if ((!reader.read(INDEX_NUMBER_SIZE))) {
2096     return size_t(-1);
2097   }
2098   for (uint i = 0; i < m_key_parts; i++) {
2099     const Rdb_field_packing *fpi = &m_pack_info[i];
2100     if ((fpi->m_skip_func)(fpi, &reader)) {
2101       return size_t(-1);
2102     }
2103   }
2104   return key.size() - reader.remaining_bytes();
2105 }
2106 
2107 /*
2108   Take mem-comparable form and unpack_info and unpack it to Table->record
2109 
2110   @detail
2111     not all indexes support this
2112 
2113   @return
2114     HA_EXIT_SUCCESS    OK
2115     other              HA_ERR error code
2116 */
2117 
unpack_record(TABLE * const table,uchar * const buf,const rocksdb::Slice * const packed_key,const rocksdb::Slice * const unpack_info,const bool verify_row_debug_checksums) const2118 int Rdb_key_def::unpack_record(TABLE *const table, uchar *const buf,
2119                                const rocksdb::Slice *const packed_key,
2120                                const rocksdb::Slice *const unpack_info,
2121                                const bool verify_row_debug_checksums) const {
2122   Rdb_string_reader reader(packed_key);
2123   Rdb_string_reader unp_reader = Rdb_string_reader::read_or_empty(unpack_info);
2124 
2125   // There is no checksuming data after unpack_info for primary keys, because
2126   // the layout there is different. The checksum is verified in
2127   // ha_rocksdb::convert_record_from_storage_format instead.
2128   assert_IMP(!(m_index_type == INDEX_TYPE_SECONDARY),
2129                   !verify_row_debug_checksums);
2130 
2131   // Skip the index number
2132   if ((unlikely(!reader.read(INDEX_NUMBER_SIZE)))) {
2133     return HA_ERR_ROCKSDB_CORRUPT_DATA;
2134   }
2135 
2136   // For secondary keys, we expect the value field to contain index flags,
2137   // unpack data, and checksum data in that order. One or all can be missing,
2138   // but they cannot be reordered.
2139   if (unp_reader.remaining_bytes()) {
2140     if (m_index_type == INDEX_TYPE_SECONDARY &&
2141         m_total_index_flags_length > 0 &&
2142         !unp_reader.read(m_total_index_flags_length)) {
2143       return HA_ERR_ROCKSDB_CORRUPT_DATA;
2144     }
2145   }
2146 
2147   const char *unpack_header = unp_reader.get_current_ptr();
2148   const bool has_unpack_info =
2149       unp_reader.remaining_bytes() && is_unpack_data_tag(unpack_header[0]);
2150   if (has_unpack_info) {
2151     if (!unp_reader.read(get_unpack_header_size(unpack_header[0]))) {
2152       return HA_ERR_ROCKSDB_CORRUPT_DATA;
2153     }
2154   }
2155 
2156   // Read the covered bitmap
2157   MY_BITMAP covered_bitmap;
2158   my_bitmap_map covered_bits;
2159   const bool has_covered_bitmap =
2160       has_unpack_info && (unpack_header[0] == RDB_UNPACK_COVERED_DATA_TAG);
2161   if (has_covered_bitmap) {
2162     bitmap_init(&covered_bitmap, &covered_bits, MAX_REF_PARTS, false);
2163     covered_bits = rdb_netbuf_to_uint16((const uchar *)unpack_header +
2164                                         sizeof(RDB_UNPACK_COVERED_DATA_TAG) +
2165                                         RDB_UNPACK_COVERED_DATA_LEN_SIZE);
2166   }
2167 
2168   int err = HA_EXIT_SUCCESS;
2169 
2170   Rdb_key_field_iterator iter(
2171       this, m_pack_info, &reader, &unp_reader, table, has_unpack_info,
2172       has_covered_bitmap ? &covered_bitmap : nullptr, buf);
2173   while (iter.has_next()) {
2174     err = iter.next();
2175     if (unlikely(err)) {
2176       return err;
2177     }
2178   }
2179 
2180   /*
2181     Check checksum values if present
2182   */
2183   const char *ptr;
2184   if (unlikely((ptr = unp_reader.read(1)) && *ptr == RDB_CHECKSUM_DATA_TAG)) {
2185     if (verify_row_debug_checksums) {
2186       uint32_t stored_key_chksum = rdb_netbuf_to_uint32(
2187           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
2188       const uint32_t stored_val_chksum = rdb_netbuf_to_uint32(
2189           (const uchar *)unp_reader.read(RDB_CHECKSUM_SIZE));
2190 
2191       const ha_checksum computed_key_chksum =
2192           my_checksum(0, (const uchar *)packed_key->data(), packed_key->size());
2193       const ha_checksum computed_val_chksum =
2194           my_checksum(0, (const uchar *)unpack_info->data(),
2195                       unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
2196 
2197       DBUG_EXECUTE_IF("myrocks_simulate_bad_key_checksum1",
2198                       stored_key_chksum++;);
2199 
2200       if (stored_key_chksum != computed_key_chksum) {
2201         report_checksum_mismatch(true, packed_key->data(), packed_key->size());
2202         return HA_ERR_ROCKSDB_CORRUPT_DATA;
2203       }
2204 
2205       if (stored_val_chksum != computed_val_chksum) {
2206         report_checksum_mismatch(false, unpack_info->data(),
2207                                  unpack_info->size() - RDB_CHECKSUM_CHUNK_SIZE);
2208         return HA_ERR_ROCKSDB_CORRUPT_DATA;
2209       }
2210     } else {
2211       /* The checksums are present but we are not checking checksums */
2212     }
2213   }
2214 
2215   if (unlikely(reader.remaining_bytes())) return HA_ERR_ROCKSDB_CORRUPT_DATA;
2216 
2217   return HA_EXIT_SUCCESS;
2218 }
2219 
table_has_hidden_pk(const TABLE * const table)2220 bool Rdb_key_def::table_has_hidden_pk(const TABLE *const table) {
2221   return table->s->primary_key == MAX_INDEXES;
2222 }
2223 
report_checksum_mismatch(const bool is_key,const char * const data,const size_t data_size) const2224 void Rdb_key_def::report_checksum_mismatch(const bool is_key,
2225                                            const char *const data,
2226                                            const size_t data_size) const {
2227   // NO_LINT_DEBUG
2228   sql_print_error("Checksum mismatch in %s of key-value pair for index 0x%x",
2229                   is_key ? "key" : "value", get_index_number());
2230 
2231   const std::string buf = rdb_hexdump(data, data_size, RDB_MAX_HEXDUMP_LEN);
2232   // NO_LINT_DEBUG
2233   sql_print_error("Data with incorrect checksum (%" PRIu64 " bytes): %s",
2234                   (uint64_t)data_size, buf.c_str());
2235 
2236   my_error(ER_INTERNAL_ERROR, MYF(0), "Record checksum mismatch");
2237 }
2238 
index_format_min_check(const int pk_min,const int sk_min) const2239 bool Rdb_key_def::index_format_min_check(const int pk_min,
2240                                          const int sk_min) const {
2241   switch (m_index_type) {
2242     case INDEX_TYPE_PRIMARY:
2243     case INDEX_TYPE_HIDDEN_PRIMARY:
2244       return (m_kv_format_version >= pk_min);
2245     case INDEX_TYPE_SECONDARY:
2246       return (m_kv_format_version >= sk_min);
2247     default:
2248       assert(0);
2249       return false;
2250   }
2251 }
2252 
2253 ///////////////////////////////////////////////////////////////////////////////////////////
2254 // Rdb_field_packing
2255 ///////////////////////////////////////////////////////////////////////////////////////////
2256 
2257 /*
2258   Function of type rdb_index_field_skip_t
2259 */
2260 
skip_max_length(const Rdb_field_packing * const fpi,Rdb_string_reader * const reader)2261 int Rdb_key_def::skip_max_length(const Rdb_field_packing *const fpi,
2262                                  Rdb_string_reader *const reader) {
2263   if (!reader->read(fpi->m_max_image_len)) return HA_EXIT_FAILURE;
2264   return HA_EXIT_SUCCESS;
2265 }
2266 
2267   /*
2268     (RDB_ESCAPE_LENGTH-1) must be an even number so that pieces of lines are not
2269     split in the middle of an UTF-8 character. See the implementation of
2270     unpack_binary_or_utf8_varchar.
2271   */
2272 
2273 #define RDB_ESCAPE_LENGTH 9
2274 #define RDB_LEGACY_ESCAPE_LENGTH RDB_ESCAPE_LENGTH
2275 static_assert((RDB_ESCAPE_LENGTH - 1) % 2 == 0,
2276               "RDB_ESCAPE_LENGTH-1 must be even.");
2277 
2278 #define RDB_ENCODED_SIZE(len)                                   \
2279   ((len + (RDB_ESCAPE_LENGTH - 2)) / (RDB_ESCAPE_LENGTH - 1)) * \
2280       RDB_ESCAPE_LENGTH
2281 
2282 #define RDB_LEGACY_ENCODED_SIZE(len)                                          \
2283   ((len + (RDB_LEGACY_ESCAPE_LENGTH - 1)) / (RDB_LEGACY_ESCAPE_LENGTH - 1)) * \
2284       RDB_LEGACY_ESCAPE_LENGTH
2285 
2286 /*
2287   Function of type rdb_index_field_skip_t
2288 */
2289 
skip_variable_length(const Rdb_field_packing * const fpi,Rdb_string_reader * const reader)2290 int Rdb_key_def::skip_variable_length(const Rdb_field_packing *const fpi,
2291                                       Rdb_string_reader *const reader) {
2292   const uchar *ptr;
2293   bool finished = false;
2294 
2295   /* How much data can be there */
2296   size_t dst_len = fpi->m_field_pack_length - fpi->m_varchar_length_bytes;
2297 
2298   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
2299 
2300   /* Decode the length-emitted encoding here */
2301   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
2302     uint used_bytes;
2303 
2304     /* See pack_with_varchar_encoding. */
2305     if (use_legacy_format) {
2306       used_bytes = calc_unpack_legacy_variable_format(
2307           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2308     } else {
2309       used_bytes =
2310           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
2311     }
2312 
2313     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
2314       return HA_EXIT_FAILURE;  // Corruption in the data
2315     }
2316 
2317     if (finished) {
2318       break;
2319     }
2320 
2321     dst_len -= used_bytes;
2322   }
2323 
2324   if (!finished) {
2325     return HA_EXIT_FAILURE;
2326   }
2327 
2328   return HA_EXIT_SUCCESS;
2329 }
2330 
2331 const int VARCHAR_CMP_LESS_THAN_SPACES = 1;
2332 const int VARCHAR_CMP_EQUAL_TO_SPACES = 2;
2333 const int VARCHAR_CMP_GREATER_THAN_SPACES = 3;
2334 
2335 /*
2336   Skip a keypart that uses Variable-Length Space-Padded encoding
2337 */
2338 
skip_variable_space_pad(const Rdb_field_packing * const fpi,Rdb_string_reader * const reader)2339 int Rdb_key_def::skip_variable_space_pad(const Rdb_field_packing *const fpi,
2340                                          Rdb_string_reader *const reader) {
2341   const uchar *ptr;
2342   bool finished = false;
2343 
2344   /* How much data can be there */
2345   size_t dst_len = fpi->m_field_pack_length - fpi->m_varchar_length_bytes;
2346 
2347   /* Decode the length-emitted encoding here */
2348   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
2349     // See pack_with_varchar_space_pad
2350     const uchar c = ptr[fpi->m_segment_size - 1];
2351     if (c == VARCHAR_CMP_EQUAL_TO_SPACES) {
2352       // This is the last segment
2353       finished = true;
2354       break;
2355     } else if (c == VARCHAR_CMP_LESS_THAN_SPACES ||
2356                c == VARCHAR_CMP_GREATER_THAN_SPACES) {
2357       // This is not the last segment
2358       if ((fpi->m_segment_size - 1) > dst_len) {
2359         // The segment is full of data but the table field can't hold that
2360         // much! This must be data corruption.
2361         return HA_EXIT_FAILURE;
2362       }
2363       dst_len -= (fpi->m_segment_size - 1);
2364     } else {
2365       // Encountered a value that's none of the VARCHAR_CMP* constants
2366       // It's data corruption.
2367       return HA_EXIT_FAILURE;
2368     }
2369   }
2370   return finished ? HA_EXIT_SUCCESS : HA_EXIT_FAILURE;
2371 }
2372 
2373 /*
2374   Function of type rdb_index_field_unpack_t
2375 */
2376 template <int length>
unpack_integer(Rdb_field_packing * const fpi,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2377 int Rdb_key_def::unpack_integer(Rdb_field_packing *const fpi, uchar *const to,
2378                                 Rdb_string_reader *const reader,
2379                                 Rdb_string_reader *const unp_reader
2380                                     MY_ATTRIBUTE((__unused__))) {
2381   assert(length == fpi->m_max_image_len);
2382 
2383   const uchar *from;
2384   if (!(from = (const uchar *)reader->read(length))) {
2385     return UNPACK_FAILURE; /* Mem-comparable image doesn't have enough bytes */
2386   }
2387 
2388 #ifdef WORDS_BIGENDIAN
2389   {
2390     if (fpi->m_field_unsigned_flag) {
2391       to[0] = from[0];
2392     } else {
2393       to[0] = static_cast<char>(from[0] ^ 128);  // Reverse the sign bit.
2394     }
2395     /* Parameterized length should enable loop unrolling */
2396     for (int i = 1; i < length; i++) to[i] = from[i];
2397   }
2398 #else
2399   {
2400     const int sign_byte = from[0];
2401     if (fpi->m_field_unsigned_flag) {
2402       to[length - 1] = sign_byte;
2403     } else {
2404       to[length - 1] =
2405           static_cast<char>(sign_byte ^ 128);  // Reverse the sign bit.
2406     }
2407 
2408     /* Parameterized length should enable loop unrolling */
2409     for (int i = 0, j = length - 1; i < length - 1; ++i, --j) to[i] = from[j];
2410   }
2411 #endif
2412   return UNPACK_SUCCESS;
2413 }
2414 
2415 #if !defined(WORDS_BIGENDIAN)
rdb_swap_double_bytes(uchar * const dst,const uchar * const src)2416 static void rdb_swap_double_bytes(uchar *const dst, const uchar *const src) {
2417 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
2418   // A few systems store the most-significant _word_ first on little-endian
2419   dst[0] = src[3];
2420   dst[1] = src[2];
2421   dst[2] = src[1];
2422   dst[3] = src[0];
2423   dst[4] = src[7];
2424   dst[5] = src[6];
2425   dst[6] = src[5];
2426   dst[7] = src[4];
2427 #else
2428   dst[0] = src[7];
2429   dst[1] = src[6];
2430   dst[2] = src[5];
2431   dst[3] = src[4];
2432   dst[4] = src[3];
2433   dst[5] = src[2];
2434   dst[6] = src[1];
2435   dst[7] = src[0];
2436 #endif
2437 }
2438 
rdb_swap_float_bytes(uchar * const dst,const uchar * const src)2439 static void rdb_swap_float_bytes(uchar *const dst, const uchar *const src) {
2440   dst[0] = src[3];
2441   dst[1] = src[2];
2442   dst[2] = src[1];
2443   dst[3] = src[0];
2444 }
2445 #else
2446 #define rdb_swap_double_bytes nullptr
2447 #define rdb_swap_float_bytes nullptr
2448 #endif
2449 
unpack_floating_point(uchar * const dst,Rdb_string_reader * const reader,const size_t size,const int exp_digit,const uchar * const zero_pattern,const uchar * const zero_val,void (* swap_func)(uchar *,const uchar *))2450 int Rdb_key_def::unpack_floating_point(
2451     uchar *const dst, Rdb_string_reader *const reader, const size_t size,
2452     const int exp_digit, const uchar *const zero_pattern,
2453     const uchar *const zero_val, void (*swap_func)(uchar *, const uchar *)) {
2454   const uchar *const from = (const uchar *)reader->read(size);
2455   if (from == nullptr) {
2456     /* Mem-comparable image doesn't have enough bytes */
2457     return UNPACK_FAILURE;
2458   }
2459 
2460   /* Check to see if the value is zero */
2461   if (memcmp(from, zero_pattern, size) == 0) {
2462     memcpy(dst, zero_val, size);
2463     return UNPACK_SUCCESS;
2464   }
2465 
2466 #if defined(WORDS_BIGENDIAN)
2467   // On big-endian, output can go directly into result
2468   uchar *const tmp = dst;
2469 #else
2470   // Otherwise use a temporary buffer to make byte-swapping easier later
2471   uchar tmp[8];
2472 #endif
2473 
2474   memcpy(tmp, from, size);
2475 
2476   if (tmp[0] & 0x80) {
2477     // If the high bit is set the original value was positive so
2478     // remove the high bit and subtract one from the exponent.
2479     ushort exp_part = ((ushort)tmp[0] << 8) | (ushort)tmp[1];
2480     exp_part &= 0x7FFF;                             // clear high bit;
2481     exp_part -= (ushort)1 << (16 - 1 - exp_digit);  // subtract from exponent
2482     tmp[0] = (uchar)(exp_part >> 8);
2483     tmp[1] = (uchar)exp_part;
2484   } else {
2485     // Otherwise the original value was negative and all bytes have been
2486     // negated.
2487     for (size_t ii = 0; ii < size; ii++) tmp[ii] ^= 0xFF;
2488   }
2489 
2490 #if !defined(WORDS_BIGENDIAN)
2491   // On little-endian, swap the bytes around
2492   swap_func(dst, tmp);
2493 #else
2494   assert(swap_func == nullptr);
2495 #endif
2496 
2497   return UNPACK_SUCCESS;
2498 }
2499 
2500 /*
2501   Function of type rdb_index_field_unpack_t
2502 
2503   Unpack a double by doing the reverse action of change_double_for_sort
2504   (sql/filesort.cc).  Note that this only works on IEEE values.
2505   Note also that this code assumes that NaN and +/-Infinity are never
2506   allowed in the database.
2507 */
unpack_double(Rdb_field_packing * const fpi MY_ATTRIBUTE ((__unused__)),uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2508 int Rdb_key_def::unpack_double(
2509     Rdb_field_packing *const fpi MY_ATTRIBUTE((__unused__)),
2510     uchar *const field_ptr, Rdb_string_reader *const reader,
2511     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2512   static double zero_val = 0.0;
2513   static const uchar zero_pattern[8] = {128, 0, 0, 0, 0, 0, 0, 0};
2514 
2515   return unpack_floating_point(field_ptr, reader, sizeof(double), DBL_EXP_DIG,
2516                                zero_pattern, (const uchar *)&zero_val,
2517                                rdb_swap_double_bytes);
2518 }
2519 
2520 /*
2521   Function of type rdb_index_field_unpack_t
2522 
2523   Unpack a float by doing the reverse action of Field_float::make_sort_key
2524   (sql/field.cc).  Note that this only works on IEEE values.
2525   Note also that this code assumes that NaN and +/-Infinity are never
2526   allowed in the database.
2527 */
unpack_float(Rdb_field_packing * const fpi,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2528 int Rdb_key_def::unpack_float(
2529     Rdb_field_packing *const fpi, uchar *const field_ptr,
2530     Rdb_string_reader *const reader,
2531     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2532   static float zero_val = 0.0;
2533   static const uchar zero_pattern[4] = {128, 0, 0, 0};
2534 
2535   return unpack_floating_point(field_ptr, reader, sizeof(float), FLT_EXP_DIG,
2536                                zero_pattern, (const uchar *)&zero_val,
2537                                rdb_swap_float_bytes);
2538 }
2539 
2540 /*
2541   Function of type rdb_index_field_unpack_t used to
2542   Unpack by doing the reverse action to Field_newdate::make_sort_key.
2543 */
2544 
unpack_newdate(Rdb_field_packing * const fpi,uchar * const field_ptr,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2545 int Rdb_key_def::unpack_newdate(
2546     Rdb_field_packing *const fpi, uchar *const field_ptr,
2547     Rdb_string_reader *const reader,
2548     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2549   const char *from;
2550   assert(fpi->m_max_image_len == 3);
2551 
2552   if (!(from = reader->read(3))) {
2553     /* Mem-comparable image doesn't have enough bytes */
2554     return UNPACK_FAILURE;
2555   }
2556 
2557   field_ptr[0] = from[2];
2558   field_ptr[1] = from[1];
2559   field_ptr[2] = from[0];
2560   return UNPACK_SUCCESS;
2561 }
2562 
2563 /*
2564   Function of type rdb_index_field_unpack_t, used to
2565   Unpack the string by copying it over.
2566   This is for BINARY(n) where the value occupies the whole length.
2567 */
2568 
unpack_binary_str(Rdb_field_packing * const fpi,uchar * const to,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2569 int Rdb_key_def::unpack_binary_str(
2570     Rdb_field_packing *const fpi, uchar *const to,
2571     Rdb_string_reader *const reader,
2572     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2573   const char *from;
2574   if (!(from = reader->read(fpi->m_max_image_len))) {
2575     /* Mem-comparable image doesn't have enough bytes */
2576     return UNPACK_FAILURE;
2577   }
2578 
2579   memcpy(to, from, fpi->m_max_image_len);
2580   return UNPACK_SUCCESS;
2581 }
2582 
2583 /*
2584   Function of type rdb_index_field_unpack_t.
2585   For UTF-8, we need to convert 2-byte wide-character entities back into
2586   UTF8 sequences.
2587 */
2588 
unpack_utf8_str(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))2589 int Rdb_key_def::unpack_utf8_str(
2590     Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
2591     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
2592   my_core::CHARSET_INFO *const cset =
2593       (my_core::CHARSET_INFO *)fpi->m_field_charset;
2594   const uchar *src;
2595   if (!(src = (const uchar *)reader->read(fpi->m_max_image_len))) {
2596     /* Mem-comparable image doesn't have enough bytes */
2597     return UNPACK_FAILURE;
2598   }
2599 
2600   const uchar *const src_end = src + fpi->m_max_image_len;
2601   uchar *const dst_end = dst + fpi->m_field_pack_length;
2602 
2603   while (src < src_end) {
2604     my_wc_t wc = (src[0] << 8) | src[1];
2605     src += 2;
2606     int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
2607     assert(res > 0);
2608     assert(res <= 3);
2609     if (res < 0) return UNPACK_FAILURE;
2610     dst += res;
2611   }
2612 
2613   cset->cset->fill(cset, reinterpret_cast<char *>(dst), dst_end - dst,
2614                    cset->pad_char);
2615   return UNPACK_SUCCESS;
2616 }
2617 
2618 /*
2619   This is the original algorithm to encode a variable binary field.  It
2620   sets a flag byte every Nth byte.  The flag value is (255 - #pad) where
2621   #pad is the number of padding bytes that were needed (0 if all N-1
2622   bytes were used).
2623 
2624   If N=8 and the field is:
2625   * 3 bytes (1, 2, 3) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 251
2626   * 4 bytes (1, 2, 3, 0) this is encoded as: 1, 2, 3, 0, 0, 0, 0, 252
2627   And the 4 byte string compares as greater than the 3 byte string
2628 
2629   Unfortunately the algorithm has a flaw.  If the input is exactly a
2630   multiple of N-1, an extra N bytes are written.  Since we usually use
2631   N=9, an 8 byte input will generate 18 bytes of output instead of the
2632   9 bytes of output that is optimal.
2633 
2634   See pack_variable_format for the newer algorithm.
2635 */
pack_legacy_variable_format(const uchar * src,size_t src_len,uchar ** dst)2636 void Rdb_key_def::pack_legacy_variable_format(
2637     const uchar *src,  // The data to encode
2638     size_t src_len,    // The length of the data to encode
2639     uchar **dst)       // The location to encode the data
2640 {
2641   size_t copy_len;
2642   size_t padding_bytes;
2643   uchar *ptr = *dst;
2644 
2645   do {
2646     copy_len = std::min((size_t)RDB_LEGACY_ESCAPE_LENGTH - 1, src_len);
2647     padding_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - copy_len;
2648     memcpy(ptr, src, copy_len);
2649     ptr += copy_len;
2650     src += copy_len;
2651     // pad with zeros if necessary
2652     if (padding_bytes > 0) {
2653       memset(ptr, 0, padding_bytes);
2654       ptr += padding_bytes;
2655     }
2656 
2657     *(ptr++) = 255 - padding_bytes;
2658 
2659     src_len -= copy_len;
2660   } while (padding_bytes == 0);
2661 
2662   *dst = ptr;
2663 }
2664 
2665 /*
2666   This is the new algorithm.  Similarly to the legacy format the input
2667   is split up into N-1 bytes and a flag byte is used as the Nth byte
2668   in the output.
2669 
2670   - If the previous segment needed any padding the flag is set to the
2671     number of bytes used (0..N-2).  0 is possible in the first segment
2672     if the input is 0 bytes long.
2673   - If no padding was used and there is no more data left in the input
2674     the flag is set to N-1
2675   - If no padding was used and there is still data left in the input the
2676     flag is set to N.
2677 
2678   For N=9, the following input values encode to the specified
2679   outout (where 'X' indicates a byte of the original input):
2680   - 0 bytes  is encoded as 0 0 0 0 0 0 0 0 0
2681   - 1 byte   is encoded as X 0 0 0 0 0 0 0 1
2682   - 2 bytes  is encoded as X X 0 0 0 0 0 0 2
2683   - 7 bytes  is encoded as X X X X X X X 0 7
2684   - 8 bytes  is encoded as X X X X X X X X 8
2685   - 9 bytes  is encoded as X X X X X X X X 9 X 0 0 0 0 0 0 0 1
2686   - 10 bytes is encoded as X X X X X X X X 9 X X 0 0 0 0 0 0 2
2687 */
pack_variable_format(const uchar * src,size_t src_len,uchar ** dst)2688 void Rdb_key_def::pack_variable_format(
2689     const uchar *src,  // The data to encode
2690     size_t src_len,    // The length of the data to encode
2691     uchar **dst)       // The location to encode the data
2692 {
2693   uchar *ptr = *dst;
2694 
2695   for (;;) {
2696     // Figure out how many bytes to copy, copy them and adjust pointers
2697     const size_t copy_len = std::min((size_t)RDB_ESCAPE_LENGTH - 1, src_len);
2698     memcpy(ptr, src, copy_len);
2699     ptr += copy_len;
2700     src += copy_len;
2701     src_len -= copy_len;
2702 
2703     // Are we at the end of the input?
2704     if (src_len == 0) {
2705       // pad with zeros if necessary;
2706       const size_t padding_bytes = RDB_ESCAPE_LENGTH - 1 - copy_len;
2707       if (padding_bytes > 0) {
2708         memset(ptr, 0, padding_bytes);
2709         ptr += padding_bytes;
2710       }
2711 
2712       // Put the flag byte (0 - N-1) in the output
2713       *(ptr++) = (uchar)copy_len;
2714       break;
2715     }
2716 
2717     // We have more data - put the flag byte (N) in and continue
2718     *(ptr++) = RDB_ESCAPE_LENGTH;
2719   }
2720 
2721   *dst = ptr;
2722 }
2723 
2724 /*
2725   Function of type rdb_index_field_pack_t
2726 */
2727 
pack_with_varchar_encoding(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx MY_ATTRIBUTE ((__unused__)))2728 void Rdb_key_def::pack_with_varchar_encoding(
2729     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2730     Rdb_pack_field_context *const pack_ctx MY_ATTRIBUTE((__unused__))) {
2731   const CHARSET_INFO *const charset = field->charset();
2732   Field_varstring *const field_var = (Field_varstring *)field;
2733 
2734   const size_t value_length = (field_var->length_bytes == 1)
2735                                   ? (uint)*field->ptr
2736                                   : uint2korr(field->ptr);
2737   size_t xfrm_len = charset->coll->strnxfrm(
2738       charset, buf, fpi->m_max_image_len, field_var->char_length(),
2739       field_var->ptr + field_var->length_bytes, value_length, 0);
2740 
2741   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2742   if (fpi->m_use_legacy_varbinary_format) {
2743     pack_legacy_variable_format(buf, xfrm_len, dst);
2744   } else {
2745     pack_variable_format(buf, xfrm_len, dst);
2746   }
2747 }
2748 
2749 /*
2750   Compare the string in [buf..buf_end) with a string that is an infinite
2751   sequence of strings in space_xfrm
2752 */
2753 
rdb_compare_string_with_spaces(const uchar * buf,const uchar * const buf_end,const std::vector<uchar> * const space_xfrm)2754 static int rdb_compare_string_with_spaces(
2755     const uchar *buf, const uchar *const buf_end,
2756     const std::vector<uchar> *const space_xfrm) {
2757   int cmp = 0;
2758   while (buf < buf_end) {
2759     size_t bytes = std::min((size_t)(buf_end - buf), space_xfrm->size());
2760     if ((cmp = memcmp(buf, space_xfrm->data(), bytes)) != 0) break;
2761     buf += bytes;
2762   }
2763   return cmp;
2764 }
2765 
2766 static const int RDB_TRIMMED_CHARS_OFFSET = 8;
2767 /*
2768   Pack the data with Variable-Length Space-Padded Encoding.
2769 
2770   The encoding is there to meet two goals:
2771 
2772   Goal#1. Comparison. The SQL standard says
2773 
2774     " If the collation for the comparison has the PAD SPACE characteristic,
2775     for the purposes of the comparison, the shorter value is effectively
2776     extended to the length of the longer by concatenation of <space>s on the
2777     right.
2778 
2779   At the moment, all MySQL collations except one have the PAD SPACE
2780   characteristic.  The exception is the "binary" collation that is used by
2781   [VAR]BINARY columns. (Note that binary collations for specific charsets,
2782   like utf8_bin or latin1_bin are not the same as "binary" collation, they have
2783   the PAD SPACE characteristic).
2784 
2785   Goal#2 is to preserve the number of trailing spaces in the original value.
2786 
2787   This is achieved by using the following encoding:
2788   The key part:
2789   - Stores mem-comparable image of the column
2790   - It is stored in chunks of fpi->m_segment_size bytes (*)
2791     = If the remainder of the chunk is not occupied, it is padded with mem-
2792       comparable image of the space character (cs->pad_char to be precise).
2793   - The last byte of the chunk shows how the rest of column's mem-comparable
2794     image would compare to mem-comparable image of the column extended with
2795     spaces. There are three possible values.
2796      - VARCHAR_CMP_LESS_THAN_SPACES,
2797      - VARCHAR_CMP_EQUAL_TO_SPACES
2798      - VARCHAR_CMP_GREATER_THAN_SPACES
2799 
2800   VARCHAR_CMP_EQUAL_TO_SPACES means that this chunk is the last one (the rest
2801   is spaces, or something that sorts as spaces, so there is no reason to store
2802   it).
2803 
2804   Example: if fpi->m_segment_size=5, and the collation is latin1_bin:
2805 
2806    'abcd\0'   => [ 'abcd' <VARCHAR_CMP_LESS> ]['\0    ' <VARCHAR_CMP_EQUAL> ]
2807    'abcd'     => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2808    'abcd   '  => [ 'abcd' <VARCHAR_CMP_EQUAL>]
2809    'abcdZZZZ' => [ 'abcd' <VARCHAR_CMP_GREATER>][ 'ZZZZ' <VARCHAR_CMP_EQUAL>]
2810 
2811   As mentioned above, the last chunk is padded with mem-comparable images of
2812   cs->pad_char. It can be 1-byte long (latin1), 2 (utf8_bin), 3 (utf8mb4), etc.
2813 
2814   fpi->m_segment_size depends on the used collation. It is chosen to be such
2815   that no mem-comparable image of space will ever stretch across the segments
2816   (see get_segment_size_from_collation).
2817 
2818   == The value part (aka unpack_info) ==
2819   The value part stores the number of space characters that one needs to add
2820   when unpacking the string.
2821   - If the number is positive, it means add this many spaces at the end
2822   - If the number is negative, it means padding has added extra spaces which
2823     must be removed.
2824 
2825   Storage considerations
2826   - depending on column's max size, the number may occupy 1 or 2 bytes
2827   - the number of spaces that need to be removed is not more than
2828     RDB_TRIMMED_CHARS_OFFSET=8, so we offset the number by that value and
2829     then store it as unsigned.
2830 
2831   @seealso
2832     unpack_binary_or_utf8_varchar_space_pad
2833     unpack_simple_varchar_space_pad
2834     dummy_make_unpack_info
2835     skip_variable_space_pad
2836 */
2837 
pack_with_varchar_space_pad(Rdb_field_packing * const fpi,Field * const field,uchar * buf,uchar ** dst,Rdb_pack_field_context * const pack_ctx)2838 void Rdb_key_def::pack_with_varchar_space_pad(
2839     Rdb_field_packing *const fpi, Field *const field, uchar *buf, uchar **dst,
2840     Rdb_pack_field_context *const pack_ctx) {
2841   Rdb_string_writer *const unpack_info = pack_ctx->writer;
2842   const CHARSET_INFO *const charset = field->charset();
2843   const auto field_var = static_cast<Field_varstring *>(field);
2844 
2845   const size_t value_length = (field_var->length_bytes == 1)
2846                                   ? (uint)*field->ptr
2847                                   : uint2korr(field->ptr);
2848 
2849   const size_t trimmed_len = charset->cset->lengthsp(
2850       charset, (const char *)field_var->ptr + field_var->length_bytes,
2851       value_length);
2852   const size_t xfrm_len = charset->coll->strnxfrm(
2853       charset, buf, fpi->m_max_image_len, field_var->char_length(),
2854       field_var->ptr + field_var->length_bytes, trimmed_len, 0);
2855 
2856   /* Got a mem-comparable image in 'buf'. Now, produce varlength encoding */
2857   uchar *const buf_end = buf + xfrm_len;
2858 
2859   size_t encoded_size = 0;
2860   uchar *ptr = *dst;
2861   size_t padding_bytes;
2862   while (true) {
2863     const size_t copy_len =
2864         std::min<size_t>(fpi->m_segment_size - 1, buf_end - buf);
2865     padding_bytes = fpi->m_segment_size - 1 - copy_len;
2866     memcpy(ptr, buf, copy_len);
2867     ptr += copy_len;
2868     buf += copy_len;
2869 
2870     if (padding_bytes) {
2871       memcpy(ptr, fpi->space_xfrm->data(), padding_bytes);
2872       ptr += padding_bytes;
2873       *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;  // last segment
2874     } else {
2875       // Compare the string suffix with a hypothetical infinite string of
2876       // spaces. It could be that the first difference is beyond the end of
2877       // current chunk.
2878       const int cmp =
2879           rdb_compare_string_with_spaces(buf, buf_end, fpi->space_xfrm);
2880 
2881       if (cmp < 0) {
2882         *ptr = VARCHAR_CMP_LESS_THAN_SPACES;
2883       } else if (cmp > 0) {
2884         *ptr = VARCHAR_CMP_GREATER_THAN_SPACES;
2885       } else {
2886         // It turns out all the rest are spaces.
2887         *ptr = VARCHAR_CMP_EQUAL_TO_SPACES;
2888       }
2889     }
2890     encoded_size += fpi->m_segment_size;
2891 
2892     if (*(ptr++) == VARCHAR_CMP_EQUAL_TO_SPACES) break;
2893   }
2894 
2895   // m_unpack_info_stores_value means unpack_info stores the whole original
2896   // value. There is no need to store the number of trimmed/padded endspaces
2897   // in that case.
2898   if (unpack_info && !fpi->m_unpack_info_stores_value) {
2899     // (value_length - trimmed_len) is the number of trimmed space *characters*
2900     // then, padding_bytes is the number of *bytes* added as padding
2901     // then, we add 8, because we don't store negative values.
2902     assert(padding_bytes % fpi->space_xfrm_len == 0);
2903     assert((value_length - trimmed_len) % fpi->space_mb_len == 0);
2904     const size_t removed_chars =
2905         RDB_TRIMMED_CHARS_OFFSET +
2906         (value_length - trimmed_len) / fpi->space_mb_len -
2907         padding_bytes / fpi->space_xfrm_len;
2908 
2909     if (fpi->m_unpack_info_uses_two_bytes) {
2910       unpack_info->write_uint16(removed_chars);
2911     } else {
2912       assert(removed_chars < 0x100);
2913       unpack_info->write_uint8(removed_chars);
2914     }
2915   }
2916 
2917   *dst += encoded_size;
2918 }
2919 
2920 /*
2921   Calculate the number of used bytes in the chunk and whether this is the
2922   last chunk in the input.  This is based on the old legacy format - see
2923   pack_legacy_variable_format.
2924  */
calc_unpack_legacy_variable_format(uchar flag,bool * done)2925 uint Rdb_key_def::calc_unpack_legacy_variable_format(uchar flag, bool *done) {
2926   uint pad = 255 - flag;
2927   uint used_bytes = RDB_LEGACY_ESCAPE_LENGTH - 1 - pad;
2928   if (used_bytes > RDB_LEGACY_ESCAPE_LENGTH - 1) {
2929     return (uint)-1;
2930   }
2931 
2932   *done = used_bytes < RDB_LEGACY_ESCAPE_LENGTH - 1;
2933   return used_bytes;
2934 }
2935 
2936 /*
2937   Calculate the number of used bytes in the chunk and whether this is the
2938   last chunk in the input.  This is based on the new format - see
2939   pack_variable_format.
2940  */
calc_unpack_variable_format(uchar flag,bool * done)2941 uint Rdb_key_def::calc_unpack_variable_format(uchar flag, bool *done) {
2942   // Check for invalid flag values
2943   if (flag > RDB_ESCAPE_LENGTH) {
2944     return (uint)-1;
2945   }
2946 
2947   // Values from 1 to N-1 indicate this is the last chunk and that is how
2948   // many bytes were used
2949   if (flag < RDB_ESCAPE_LENGTH) {
2950     *done = true;
2951     return flag;
2952   }
2953 
2954   // A value of N means we used N-1 bytes and had more to go
2955   *done = false;
2956   return RDB_ESCAPE_LENGTH - 1;
2957 }
2958 
2959 /*
2960   Unpack data that has charset information.  Each two bytes of the input is
2961   treated as a wide-character and converted to its multibyte equivalent in
2962   the output.
2963  */
unpack_charset(const CHARSET_INFO * cset,const uchar * src,uint src_len,uchar * dst,uint dst_len,uint * used_bytes)2964 static int unpack_charset(
2965     const CHARSET_INFO *cset,  // character set information
2966     const uchar *src,          // source data to unpack
2967     uint src_len,              // length of source data
2968     uchar *dst,                // destination of unpacked data
2969     uint dst_len,              // length of destination data
2970     uint *used_bytes)          // output number of bytes used
2971 {
2972   if (src_len & 1) {
2973     /*
2974       UTF-8 characters are encoded into two-byte entities. There is no way
2975       we can have an odd number of bytes after encoding.
2976     */
2977     return UNPACK_FAILURE;
2978   }
2979 
2980   uchar *dst_end = dst + dst_len;
2981   uint used = 0;
2982 
2983   for (uint ii = 0; ii < src_len; ii += 2) {
2984     my_wc_t wc = (src[ii] << 8) | src[ii + 1];
2985     int res = cset->cset->wc_mb(cset, wc, dst + used, dst_end);
2986     assert(res > 0);
2987     assert(res <= 3);
2988     if (res < 0) {
2989       return UNPACK_FAILURE;
2990     }
2991 
2992     used += res;
2993   }
2994 
2995   *used_bytes = used;
2996   return UNPACK_SUCCESS;
2997 }
2998 
2999 /*
3000   Function of type rdb_index_field_unpack_t
3001 */
3002 
unpack_binary_or_utf8_varchar(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader MY_ATTRIBUTE ((__unused__)))3003 int Rdb_key_def::unpack_binary_or_utf8_varchar(
3004     Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
3005     Rdb_string_reader *const unp_reader MY_ATTRIBUTE((__unused__))) {
3006   const uchar *ptr;
3007   size_t len = 0;
3008   bool finished = false;
3009   uchar *d0 = dst;
3010   dst += fpi->m_varchar_length_bytes;
3011   // How much we can unpack
3012   size_t dst_len = fpi->m_field_pack_length - fpi->m_varchar_length_bytes;
3013 
3014   bool use_legacy_format = fpi->m_use_legacy_varbinary_format;
3015 
3016   /* Decode the length-emitted encoding here */
3017   while ((ptr = (const uchar *)reader->read(RDB_ESCAPE_LENGTH))) {
3018     uint used_bytes;
3019 
3020     /* See pack_with_varchar_encoding. */
3021     if (use_legacy_format) {
3022       used_bytes = calc_unpack_legacy_variable_format(
3023           ptr[RDB_ESCAPE_LENGTH - 1], &finished);
3024     } else {
3025       used_bytes =
3026           calc_unpack_variable_format(ptr[RDB_ESCAPE_LENGTH - 1], &finished);
3027     }
3028 
3029     if (used_bytes == (uint)-1 || dst_len < used_bytes) {
3030       return UNPACK_FAILURE;  // Corruption in the data
3031     }
3032 
3033     /*
3034       Now, we need to decode used_bytes of data and append them to the value.
3035     */
3036     if (fpi->m_field_charset == &my_charset_utf8_bin) {
3037       int err = unpack_charset(fpi->m_field_charset, ptr, used_bytes, dst,
3038                                dst_len, &used_bytes);
3039       if (err != UNPACK_SUCCESS) {
3040         return err;
3041       }
3042     } else {
3043       memcpy(dst, ptr, used_bytes);
3044     }
3045 
3046     dst += used_bytes;
3047     dst_len -= used_bytes;
3048     len += used_bytes;
3049 
3050     if (finished) {
3051       break;
3052     }
3053   }
3054 
3055   if (!finished) {
3056     return UNPACK_FAILURE;
3057   }
3058 
3059   /* Save the length */
3060   if (fpi->m_varchar_length_bytes == 1) {
3061     d0[0] = len;
3062   } else {
3063     assert(fpi->m_varchar_length_bytes == 2);
3064     int2store(d0, len);
3065   }
3066   return UNPACK_SUCCESS;
3067 }
3068 
3069 /*
3070   @seealso
3071     pack_with_varchar_space_pad - packing function
3072     unpack_simple_varchar_space_pad - unpacking function for 'simple'
3073     charsets.
3074     skip_variable_space_pad - skip function
3075 */
unpack_binary_or_utf8_varchar_space_pad(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3076 int Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad(
3077     Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
3078     Rdb_string_reader *const unp_reader) {
3079   const uchar *ptr;
3080   size_t len = 0;
3081   bool finished = false;
3082   uchar *d0 = dst;
3083   uchar *dst_end = dst + fpi->m_field_pack_length;
3084   dst += fpi->m_varchar_length_bytes;
3085 
3086   uint space_padding_bytes = 0;
3087   uint extra_spaces;
3088   if ((fpi->m_unpack_info_uses_two_bytes
3089            ? unp_reader->read_uint16(&extra_spaces)
3090            : unp_reader->read_uint8(&extra_spaces))) {
3091     return UNPACK_FAILURE;
3092   }
3093 
3094   if (extra_spaces <= RDB_TRIMMED_CHARS_OFFSET) {
3095     space_padding_bytes =
3096         -(static_cast<int>(extra_spaces) - RDB_TRIMMED_CHARS_OFFSET);
3097     extra_spaces = 0;
3098   } else {
3099     extra_spaces -= RDB_TRIMMED_CHARS_OFFSET;
3100   }
3101 
3102   space_padding_bytes *= fpi->space_xfrm_len;
3103 
3104   /* Decode the length-emitted encoding here */
3105   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
3106     const char last_byte = ptr[fpi->m_segment_size - 1];
3107     size_t used_bytes;
3108     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES)  // this is the last segment
3109     {
3110       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
3111         return UNPACK_FAILURE;  // Cannot happen, corrupted data
3112       }
3113       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
3114       finished = true;
3115     } else {
3116       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
3117           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
3118         return UNPACK_FAILURE;  // Invalid value
3119       }
3120       used_bytes = fpi->m_segment_size - 1;
3121     }
3122 
3123     // Now, need to decode used_bytes of data and append them to the value.
3124     if (fpi->m_field_charset == &my_charset_utf8_bin) {
3125       if (used_bytes & 1) {
3126         /*
3127           UTF-8 characters are encoded into two-byte entities. There is no way
3128           we can have an odd number of bytes after encoding.
3129         */
3130         return UNPACK_FAILURE;
3131       }
3132 
3133       const uchar *src = ptr;
3134       const uchar *const src_end = ptr + used_bytes;
3135       while (src < src_end) {
3136         my_wc_t wc = (src[0] << 8) | src[1];
3137         src += 2;
3138         const CHARSET_INFO *cset = fpi->m_field_charset;
3139         int res = cset->cset->wc_mb(cset, wc, dst, dst_end);
3140         assert(res <= 3);
3141         if (res <= 0) return UNPACK_FAILURE;
3142         dst += res;
3143         len += res;
3144       }
3145     } else {
3146       if (dst + used_bytes > dst_end) return UNPACK_FAILURE;
3147       memcpy(dst, ptr, used_bytes);
3148       dst += used_bytes;
3149       len += used_bytes;
3150     }
3151 
3152     if (finished) {
3153       if (extra_spaces) {
3154         // Both binary and UTF-8 charset store space as ' ',
3155         // so the following is ok:
3156         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
3157         memset(dst, fpi->m_field_charset->pad_char, extra_spaces);
3158         len += extra_spaces;
3159       }
3160       break;
3161     }
3162   }
3163 
3164   if (!finished) return UNPACK_FAILURE;
3165 
3166   /* Save the length */
3167   if (fpi->m_varchar_length_bytes == 1) {
3168     d0[0] = len;
3169   } else {
3170     assert(fpi->m_varchar_length_bytes == 2);
3171     int2store(d0, len);
3172   }
3173   return UNPACK_SUCCESS;
3174 }
3175 
3176 /////////////////////////////////////////////////////////////////////////
3177 
3178 /*
3179   Function of type rdb_make_unpack_info_t
3180 */
3181 
make_unpack_unknown(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)3182 void Rdb_key_def::make_unpack_unknown(
3183     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
3184     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
3185   pack_ctx->writer->write(field->ptr, field->pack_length());
3186 }
3187 
3188 /*
3189   This point of this function is only to indicate that unpack_info is
3190   available.
3191 
3192   The actual unpack_info data is produced by the function that packs the key,
3193   that is, pack_with_varchar_space_pad.
3194 */
3195 
dummy_make_unpack_info(const Rdb_collation_codec * codec MY_ATTRIBUTE ((__unused__)),const Field * field MY_ATTRIBUTE ((__unused__)),Rdb_pack_field_context * pack_ctx MY_ATTRIBUTE ((__unused__)))3196 void Rdb_key_def::dummy_make_unpack_info(
3197     const Rdb_collation_codec *codec MY_ATTRIBUTE((__unused__)),
3198     const Field *field MY_ATTRIBUTE((__unused__)),
3199     Rdb_pack_field_context *pack_ctx MY_ATTRIBUTE((__unused__))) {
3200   // Do nothing
3201 }
3202 
3203 /*
3204   Function of type rdb_index_field_unpack_t
3205 */
3206 
unpack_unknown(Rdb_field_packing * const fpi,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3207 int Rdb_key_def::unpack_unknown(Rdb_field_packing *const fpi, uchar *const dst,
3208                                 Rdb_string_reader *const reader,
3209                                 Rdb_string_reader *const unp_reader) {
3210   const uchar *ptr;
3211   const uint len = fpi->m_unpack_data_len;
3212   // We don't use anything from the key, so skip over it.
3213   if (skip_max_length(fpi, reader)) {
3214     return UNPACK_FAILURE;
3215   }
3216 
3217   assert_IMP(len > 0, unp_reader != nullptr);
3218 
3219   if ((ptr = (const uchar *)unp_reader->read(len))) {
3220     memcpy(dst, ptr, len);
3221     return UNPACK_SUCCESS;
3222   }
3223   return UNPACK_FAILURE;
3224 }
3225 
3226 /*
3227   Function of type rdb_make_unpack_info_t
3228 */
3229 
make_unpack_unknown_varchar(const Rdb_collation_codec * const codec MY_ATTRIBUTE ((__unused__)),const Field * const field,Rdb_pack_field_context * const pack_ctx)3230 void Rdb_key_def::make_unpack_unknown_varchar(
3231     const Rdb_collation_codec *const codec MY_ATTRIBUTE((__unused__)),
3232     const Field *const field, Rdb_pack_field_context *const pack_ctx) {
3233   const auto f = static_cast<const Field_varstring *>(field);
3234   uint len = f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
3235   len += f->length_bytes;
3236   pack_ctx->writer->write(field->ptr, len);
3237 }
3238 
3239 /*
3240   Function of type rdb_index_field_unpack_t
3241 
3242   @detail
3243   Unpack a key part in an "unknown" collation from its
3244   (mem_comparable_form, unpack_info) form.
3245 
3246   "Unknown" means we have no clue about how mem_comparable_form is made from
3247   the original string, so we keep the whole original string in the unpack_info.
3248 
3249   @seealso
3250     make_unpack_unknown, unpack_unknown
3251 */
3252 
unpack_unknown_varchar(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3253 int Rdb_key_def::unpack_unknown_varchar(Rdb_field_packing *const fpi,
3254                                         uchar *dst,
3255                                         Rdb_string_reader *const reader,
3256                                         Rdb_string_reader *const unp_reader) {
3257   const uchar *ptr;
3258   uchar *const d0 = dst;
3259   dst += fpi->m_varchar_length_bytes;
3260   const uint len_bytes = fpi->m_varchar_length_bytes;
3261   // We don't use anything from the key, so skip over it.
3262   if ((fpi->m_skip_func)(fpi, reader)) {
3263     return UNPACK_FAILURE;
3264   }
3265 
3266   assert(len_bytes > 0);
3267   assert(unp_reader != nullptr);
3268 
3269   if ((ptr = (const uchar *)unp_reader->read(len_bytes))) {
3270     memcpy(d0, ptr, len_bytes);
3271     const uint len = len_bytes == 1 ? (uint)*ptr : uint2korr(ptr);
3272     if ((ptr = (const uchar *)unp_reader->read(len))) {
3273       memcpy(dst, ptr, len);
3274       return UNPACK_SUCCESS;
3275     }
3276   }
3277   return UNPACK_FAILURE;
3278 }
3279 
3280 /*
3281   Write unpack_data for a "simple" collation
3282 */
rdb_write_unpack_simple(Rdb_bit_writer * const writer,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len)3283 static void rdb_write_unpack_simple(Rdb_bit_writer *const writer,
3284                                     const Rdb_collation_codec *const codec,
3285                                     const uchar *const src,
3286                                     const size_t src_len) {
3287   for (uint i = 0; i < src_len; i++) {
3288     writer->write(codec->m_enc_size[src[i]], codec->m_enc_idx[src[i]]);
3289   }
3290 }
3291 
rdb_read_unpack_simple(Rdb_bit_reader * const reader,const Rdb_collation_codec * const codec,const uchar * const src,const size_t src_len,uchar * const dst)3292 static uint rdb_read_unpack_simple(Rdb_bit_reader *const reader,
3293                                    const Rdb_collation_codec *const codec,
3294                                    const uchar *const src, const size_t src_len,
3295                                    uchar *const dst) {
3296   for (uint i = 0; i < src_len; i++) {
3297     if (codec->m_dec_size[src[i]] > 0) {
3298       uint *ret;
3299       assert(reader != nullptr);
3300 
3301       if ((ret = reader->read(codec->m_dec_size[src[i]])) == nullptr) {
3302         return UNPACK_FAILURE;
3303       }
3304       dst[i] = codec->m_dec_idx[*ret][src[i]];
3305     } else {
3306       dst[i] = codec->m_dec_idx[0][src[i]];
3307     }
3308   }
3309 
3310   return UNPACK_SUCCESS;
3311 }
3312 
3313 /*
3314   Function of type rdb_make_unpack_info_t
3315 
3316   @detail
3317     Make unpack_data for VARCHAR(n) in a "simple" charset.
3318 */
3319 
make_unpack_simple_varchar(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)3320 void Rdb_key_def::make_unpack_simple_varchar(
3321     const Rdb_collation_codec *const codec, const Field *const field,
3322     Rdb_pack_field_context *const pack_ctx) {
3323   const auto f = static_cast<const Field_varstring *>(field);
3324   uchar *const src = f->ptr + f->length_bytes;
3325   const size_t src_len =
3326       f->length_bytes == 1 ? (uint)*f->ptr : uint2korr(f->ptr);
3327   Rdb_bit_writer bit_writer(pack_ctx->writer);
3328   // The std::min compares characters with bytes, but for simple collations,
3329   // mbmaxlen = 1.
3330   rdb_write_unpack_simple(&bit_writer, codec, src,
3331                           std::min((size_t)f->char_length(), src_len));
3332 }
3333 
3334 /*
3335   Function of type rdb_index_field_unpack_t
3336 
3337   @seealso
3338     pack_with_varchar_space_pad - packing function
3339     unpack_binary_or_utf8_varchar_space_pad - a similar unpacking function
3340 */
3341 
unpack_simple_varchar_space_pad(Rdb_field_packing * const fpi,uchar * dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3342 int Rdb_key_def::unpack_simple_varchar_space_pad(
3343     Rdb_field_packing *const fpi, uchar *dst, Rdb_string_reader *const reader,
3344     Rdb_string_reader *const unp_reader) {
3345   const uchar *ptr;
3346   size_t len = 0;
3347   bool finished = false;
3348   uchar *d0 = dst;
3349   // For simple collations, char_length is also number of bytes.
3350   assert((size_t)fpi->m_max_image_len >= fpi->m_varchar_char_length);
3351   uchar *dst_end = dst + fpi->m_field_pack_length;
3352   dst += fpi->m_varchar_length_bytes;
3353   Rdb_bit_reader bit_reader(unp_reader);
3354 
3355   uint space_padding_bytes = 0;
3356   uint extra_spaces;
3357   assert(unp_reader != nullptr);
3358 
3359   if ((fpi->m_unpack_info_uses_two_bytes
3360            ? unp_reader->read_uint16(&extra_spaces)
3361            : unp_reader->read_uint8(&extra_spaces))) {
3362     return UNPACK_FAILURE;
3363   }
3364 
3365   if (extra_spaces <= 8) {
3366     space_padding_bytes = -(static_cast<int>(extra_spaces) - 8);
3367     extra_spaces = 0;
3368   } else {
3369     extra_spaces -= 8;
3370   }
3371 
3372   space_padding_bytes *= fpi->space_xfrm_len;
3373 
3374   /* Decode the length-emitted encoding here */
3375   while ((ptr = (const uchar *)reader->read(fpi->m_segment_size))) {
3376     const char last_byte =
3377         ptr[fpi->m_segment_size - 1];  // number of padding bytes
3378     size_t used_bytes;
3379     if (last_byte == VARCHAR_CMP_EQUAL_TO_SPACES) {
3380       // this is the last one
3381       if (space_padding_bytes > (fpi->m_segment_size - 1)) {
3382         return UNPACK_FAILURE;  // Cannot happen, corrupted data
3383       }
3384       used_bytes = (fpi->m_segment_size - 1) - space_padding_bytes;
3385       finished = true;
3386     } else {
3387       if (last_byte != VARCHAR_CMP_LESS_THAN_SPACES &&
3388           last_byte != VARCHAR_CMP_GREATER_THAN_SPACES) {
3389         return UNPACK_FAILURE;
3390       }
3391       used_bytes = fpi->m_segment_size - 1;
3392     }
3393 
3394     if (dst + used_bytes > dst_end) {
3395       // The value on disk is longer than the field definition allows?
3396       return UNPACK_FAILURE;
3397     }
3398 
3399     uint ret;
3400     if ((ret = rdb_read_unpack_simple(&bit_reader, fpi->m_charset_codec, ptr,
3401                                       used_bytes, dst)) != UNPACK_SUCCESS) {
3402       return ret;
3403     }
3404 
3405     dst += used_bytes;
3406     len += used_bytes;
3407 
3408     if (finished) {
3409       if (extra_spaces) {
3410         if (dst + extra_spaces > dst_end) return UNPACK_FAILURE;
3411         // pad_char has a 1-byte form in all charsets that
3412         // are handled by rdb_init_collation_mapping.
3413         memset(dst, fpi->m_field_charset->pad_char, extra_spaces);
3414         len += extra_spaces;
3415       }
3416       break;
3417     }
3418   }
3419 
3420   if (!finished) return UNPACK_FAILURE;
3421 
3422   /* Save the length */
3423   if (fpi->m_varchar_length_bytes == 1) {
3424     d0[0] = len;
3425   } else {
3426     assert(fpi->m_varchar_length_bytes == 2);
3427     int2store(d0, len);
3428   }
3429   return UNPACK_SUCCESS;
3430 }
3431 
3432 /*
3433   Function of type rdb_make_unpack_info_t
3434 
3435   @detail
3436     Make unpack_data for CHAR(n) value in a "simple" charset.
3437     It is CHAR(N), so SQL layer has padded the value with spaces up to N chars.
3438 
3439   @seealso
3440     The VARCHAR variant is in make_unpack_simple_varchar
3441 */
3442 
make_unpack_simple(const Rdb_collation_codec * const codec,const Field * const field,Rdb_pack_field_context * const pack_ctx)3443 void Rdb_key_def::make_unpack_simple(const Rdb_collation_codec *const codec,
3444                                      const Field *const field,
3445                                      Rdb_pack_field_context *const pack_ctx) {
3446   const uchar *const src = field->ptr;
3447   Rdb_bit_writer bit_writer(pack_ctx->writer);
3448   rdb_write_unpack_simple(&bit_writer, codec, src, field->pack_length());
3449 }
3450 
3451 /*
3452   Function of type rdb_index_field_unpack_t
3453 */
3454 
unpack_simple(Rdb_field_packing * const fpi,uchar * const dst,Rdb_string_reader * const reader,Rdb_string_reader * const unp_reader)3455 int Rdb_key_def::unpack_simple(Rdb_field_packing *const fpi, uchar *const dst,
3456                                Rdb_string_reader *const reader,
3457                                Rdb_string_reader *const unp_reader) {
3458   const uchar *ptr;
3459   const uint len = fpi->m_max_image_len;
3460   Rdb_bit_reader bit_reader(unp_reader);
3461 
3462   if (!(ptr = (const uchar *)reader->read(len))) {
3463     return UNPACK_FAILURE;
3464   }
3465 
3466   return rdb_read_unpack_simple(unp_reader ? &bit_reader : nullptr,
3467                                 fpi->m_charset_codec, ptr, len, dst);
3468 }
3469 
3470 // See Rdb_charset_space_info::spaces_xfrm
3471 const int RDB_SPACE_XFRM_SIZE = 32;
3472 
3473 namespace {
3474 
3475 // A class holding information about how space character is represented in a
3476 // charset.
3477 class Rdb_charset_space_info {
3478  public:
3479   Rdb_charset_space_info(const Rdb_charset_space_info &) = delete;
3480   Rdb_charset_space_info &operator=(const Rdb_charset_space_info &) = delete;
3481   Rdb_charset_space_info() = default;
3482 
3483   // A few strxfrm'ed space characters, at least RDB_SPACE_XFRM_SIZE bytes
3484   std::vector<uchar> spaces_xfrm;
3485 
3486   // length(strxfrm(' '))
3487   size_t space_xfrm_len;
3488 
3489   // length of the space character itself
3490   // Typically space is just 0x20 (length=1) but in ucs2 it is 0x00 0x20
3491   // (length=2)
3492   size_t space_mb_len;
3493 };
3494 
3495 }  // namespace
3496 
3497 static std::array<std::unique_ptr<Rdb_charset_space_info>, MY_ALL_CHARSETS_SIZE>
3498     rdb_mem_comparable_space;
3499 
3500 /*
3501   @brief
3502   For a given charset, get
3503    - strxfrm('    '), a sample that is at least RDB_SPACE_XFRM_SIZE bytes long.
3504    - length of strxfrm(charset, ' ')
3505    - length of the space character in the charset
3506 
3507   @param cs  IN    Charset to get the space for
3508   @param ptr OUT   A few space characters
3509   @param len OUT   Return length of the space (in bytes)
3510 
3511   @detail
3512     It is tempting to pre-generate mem-comparable form of space character for
3513     every charset on server startup.
3514     One can't do that: some charsets are not initialized until somebody
3515     attempts to use them (e.g. create or open a table that has a field that
3516     uses the charset).
3517 */
3518 
rdb_get_mem_comparable_space(const CHARSET_INFO * const cs,const std::vector<uchar> ** xfrm,size_t * const xfrm_len,size_t * const mb_len)3519 static void rdb_get_mem_comparable_space(const CHARSET_INFO *const cs,
3520                                          const std::vector<uchar> **xfrm,
3521                                          size_t *const xfrm_len,
3522                                          size_t *const mb_len) {
3523   assert(cs->number < MY_ALL_CHARSETS_SIZE);
3524   if (!rdb_mem_comparable_space[cs->number].get()) {
3525     RDB_MUTEX_LOCK_CHECK(rdb_mem_cmp_space_mutex);
3526     if (!rdb_mem_comparable_space[cs->number].get()) {
3527       // Upper bound of how many bytes can be occupied by multi-byte form of a
3528       // character in any charset.
3529       const int MAX_MULTI_BYTE_CHAR_SIZE = 4;
3530       assert(cs->mbmaxlen <= MAX_MULTI_BYTE_CHAR_SIZE);
3531 
3532       // multi-byte form of the ' ' (space) character
3533       uchar space_mb[MAX_MULTI_BYTE_CHAR_SIZE];
3534 
3535       const size_t space_mb_len = cs->cset->wc_mb(
3536           cs, (my_wc_t)cs->pad_char, space_mb, space_mb + sizeof(space_mb));
3537 
3538       // mem-comparable image of the space character
3539       std::array<uchar, 20> space;
3540 
3541       const size_t space_len = cs->coll->strnxfrm(
3542           cs, space.data(), sizeof(space), 1, space_mb, space_mb_len, 0);
3543       Rdb_charset_space_info *const info = new Rdb_charset_space_info;
3544       info->space_xfrm_len = space_len;
3545       info->space_mb_len = space_mb_len;
3546       while (info->spaces_xfrm.size() < RDB_SPACE_XFRM_SIZE) {
3547         info->spaces_xfrm.insert(info->spaces_xfrm.end(), space.data(),
3548                                  space.data() + space_len);
3549       }
3550       rdb_mem_comparable_space[cs->number].reset(info);
3551     }
3552     RDB_MUTEX_UNLOCK_CHECK(rdb_mem_cmp_space_mutex);
3553   }
3554 
3555   *xfrm = &rdb_mem_comparable_space[cs->number]->spaces_xfrm;
3556   *xfrm_len = rdb_mem_comparable_space[cs->number]->space_xfrm_len;
3557   *mb_len = rdb_mem_comparable_space[cs->number]->space_mb_len;
3558 }
3559 
3560 mysql_mutex_t rdb_mem_cmp_space_mutex;
3561 
3562 std::array<const Rdb_collation_codec *, MY_ALL_CHARSETS_SIZE>
3563     rdb_collation_data;
3564 mysql_mutex_t rdb_collation_data_mutex;
3565 
rdb_is_collation_supported(const my_core::CHARSET_INFO * const cs)3566 bool rdb_is_collation_supported(const my_core::CHARSET_INFO *const cs) {
3567   return (cs->coll == &my_collation_8bit_simple_ci_handler);
3568 }
3569 
rdb_init_collation_mapping(const my_core::CHARSET_INFO * const cs)3570 static const Rdb_collation_codec *rdb_init_collation_mapping(
3571     const my_core::CHARSET_INFO *const cs) {
3572   assert(cs);
3573   assert(cs->state & MY_CS_AVAILABLE);
3574   const Rdb_collation_codec *codec = rdb_collation_data[cs->number];
3575 
3576   if (codec == nullptr && rdb_is_collation_supported(cs)) {
3577     RDB_MUTEX_LOCK_CHECK(rdb_collation_data_mutex);
3578 
3579     codec = rdb_collation_data[cs->number];
3580     if (codec == nullptr) {
3581       Rdb_collation_codec *cur = nullptr;
3582 
3583       // Compute reverse mapping for simple collations.
3584       if (cs->coll == &my_collation_8bit_simple_ci_handler) {
3585         cur = new Rdb_collation_codec;
3586         std::map<uchar, std::vector<uchar>> rev_map;
3587         size_t max_conflict_size = 0;
3588         for (int src = 0; src < 256; src++) {
3589           uchar dst = cs->sort_order[src];
3590           rev_map[dst].push_back(src);
3591           max_conflict_size = std::max(max_conflict_size, rev_map[dst].size());
3592         }
3593         cur->m_dec_idx.resize(max_conflict_size);
3594 
3595         for (auto const &p : rev_map) {
3596           uchar dst = p.first;
3597           for (uint idx = 0; idx < p.second.size(); idx++) {
3598             uchar src = p.second[idx];
3599             uchar bits =
3600                 my_bit_log2(my_round_up_to_next_power(p.second.size()));
3601             cur->m_enc_idx[src] = idx;
3602             cur->m_enc_size[src] = bits;
3603             cur->m_dec_size[dst] = bits;
3604             cur->m_dec_idx[idx][dst] = src;
3605           }
3606         }
3607 
3608         cur->m_make_unpack_info_func = {
3609             {Rdb_key_def::make_unpack_simple_varchar,
3610              Rdb_key_def::make_unpack_simple}};
3611         cur->m_unpack_func = {{Rdb_key_def::unpack_simple_varchar_space_pad,
3612                                Rdb_key_def::unpack_simple}};
3613       } else {
3614         // Out of luck for now.
3615       }
3616 
3617       if (cur != nullptr) {
3618         codec = cur;
3619         cur->m_cs = cs;
3620         rdb_collation_data[cs->number] = cur;
3621       }
3622     }
3623 
3624     RDB_MUTEX_UNLOCK_CHECK(rdb_collation_data_mutex);
3625   }
3626 
3627   return codec;
3628 }
3629 
get_segment_size_from_collation(const CHARSET_INFO * const cs)3630 static int get_segment_size_from_collation(const CHARSET_INFO *const cs) {
3631   int ret;
3632   if (cs == &my_charset_utf8mb4_bin || cs == &my_charset_utf16_bin ||
3633       cs == &my_charset_utf16le_bin || cs == &my_charset_utf32_bin) {
3634     /*
3635       In these collations, a character produces one weight, which is 3 bytes.
3636       Segment has 3 characters, add one byte for VARCHAR_CMP_* marker, and we
3637       get 3*3+1=10
3638     */
3639     ret = 10;
3640   } else {
3641     /*
3642       All other collations. There are two classes:
3643       - Unicode-based, except for collations mentioned in the if-condition.
3644         For these all weights are 2 bytes long, a character may produce 0..8
3645         weights.
3646         in any case, 8 bytes of payload in the segment guarantee that the last
3647         space character won't span across segments.
3648 
3649       - Collations not based on unicode. These have length(strxfrm(' '))=1,
3650         there nothing to worry about.
3651 
3652       In both cases, take 8 bytes payload + 1 byte for VARCHAR_CMP* marker.
3653     */
3654     ret = 9;
3655   }
3656   assert(ret < RDB_SPACE_XFRM_SIZE);
3657   return ret;
3658 }
3659 
Rdb_field_packing(const Rdb_field_packing & o)3660 Rdb_field_packing::Rdb_field_packing(const Rdb_field_packing &o)
3661     : m_max_image_len(o.m_max_image_len),
3662       m_unpack_data_len(o.m_unpack_data_len),
3663       m_unpack_data_offset(o.m_unpack_data_offset),
3664       m_field_maybe_null(o.m_field_maybe_null),
3665       m_segment_size(o.m_segment_size),
3666       m_unpack_info_uses_two_bytes(o.m_unpack_info_uses_two_bytes),
3667       m_covered(o.m_covered),
3668       space_xfrm(o.space_xfrm),
3669       space_xfrm_len(o.space_xfrm_len),
3670       space_mb_len(o.space_mb_len),
3671       m_charset_codec(o.m_charset_codec),
3672       m_unpack_info_stores_value(o.m_unpack_info_stores_value),
3673       m_pack_func(o.m_pack_func),
3674       m_make_unpack_info_func(o.m_make_unpack_info_func),
3675       m_unpack_func(o.m_unpack_func),
3676       m_skip_func(o.m_skip_func),
3677       m_keynr(o.m_keynr),
3678       m_key_part(o.m_key_part) {}
3679 
Rdb_field_packing()3680 Rdb_field_packing::Rdb_field_packing()
3681     : m_max_image_len(0),
3682       m_unpack_data_len(0),
3683       m_unpack_data_offset(0),
3684       m_field_maybe_null(false),
3685       m_segment_size(0),
3686       m_unpack_info_uses_two_bytes(false),
3687       m_covered(false),
3688       space_xfrm(nullptr),
3689       space_xfrm_len(0),
3690       space_mb_len(0),
3691       m_charset_codec(nullptr),
3692       m_unpack_info_stores_value(false),
3693       m_pack_func(nullptr),
3694       m_make_unpack_info_func(nullptr),
3695       m_unpack_func(nullptr),
3696       m_skip_func(nullptr),
3697       m_keynr(0),
3698       m_key_part(0) {}
3699 
3700 /*
3701   @brief
3702     Setup packing of index field into its mem-comparable form
3703 
3704   @detail
3705     - It is possible produce mem-comparable form for any datatype.
3706     - Some datatypes also allow to unpack the original value from its
3707       mem-comparable form.
3708       = Some of these require extra information to be stored in "unpack_info".
3709         unpack_info is not a part of mem-comparable form, it is only used to
3710         restore the original value
3711 
3712   @param
3713     field  IN  field to be packed/un-packed
3714 
3715   @return
3716     TRUE  -  Field can be read with index-only reads
3717     FALSE -  Otherwise
3718 */
3719 
setup(const Rdb_key_def * const key_descr,const Field * const field,const uint keynr_arg,const uint key_part_arg,const uint16 key_length)3720 bool Rdb_field_packing::setup(const Rdb_key_def *const key_descr,
3721                               const Field *const field, const uint keynr_arg,
3722                               const uint key_part_arg,
3723                               const uint16 key_length) {
3724   int res = false;
3725   enum_field_types type = field ? field->real_type() : MYSQL_TYPE_LONGLONG;
3726 
3727   m_field_real_type = type;
3728   m_field_offset =
3729       (key_descr && field) ? (field->ptr - field->table->record[0]) : -1;
3730   m_field_null_offset = (key_descr && field) ? field->null_offset() : -1;
3731   m_field_null_bit_mask = (key_descr && field) ? field->null_bit : 0;
3732   m_field_pack_length = (key_descr && field) ? field->pack_length() : -1;
3733   m_field_charset = (key_descr && field) ? field->charset() : nullptr;
3734   m_field_unsigned_flag = false;
3735   m_field_maybe_null = field ? field->real_maybe_null() : false;
3736   m_varchar_length_bytes = -1;
3737   m_varchar_char_length = -1;
3738 
3739   m_keynr = keynr_arg;
3740   m_key_part = key_part_arg;
3741 
3742   m_unpack_func = nullptr;
3743   m_make_unpack_info_func = nullptr;
3744   m_unpack_data_len = 0;
3745   space_xfrm = nullptr;  // safety
3746   // whether to use legacy format for varchar
3747   m_use_legacy_varbinary_format = false;
3748   // ha_rocksdb::index_flags() will pass key_descr == null to
3749   // see whether field(column) can be read-only reads through return value,
3750   // but the legacy vs. new varchar format doesn't affect return value.
3751   // Just change m_use_legacy_varbinary_format to true if key_descr isn't given.
3752   if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3753     m_use_legacy_varbinary_format = true;
3754   }
3755   /* Calculate image length. By default, is is pack_length() */
3756   m_max_image_len =
3757       field ? field->pack_length() : ROCKSDB_SIZEOF_HIDDEN_PK_COLUMN;
3758   m_skip_func = Rdb_key_def::skip_max_length;
3759   m_pack_func = Rdb_key_def::pack_with_make_sort_key;
3760 
3761   m_covered = false;
3762 
3763   switch (type) {
3764     case MYSQL_TYPE_LONGLONG:
3765       m_pack_func = Rdb_key_def::pack_longlong;
3766       m_field_unsigned_flag =
3767           field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3768       m_unpack_func = Rdb_key_def::unpack_integer<8>;
3769       m_covered = true;
3770       return true;
3771 
3772     case MYSQL_TYPE_LONG:
3773       m_pack_func = Rdb_key_def::pack_long;
3774       m_field_unsigned_flag =
3775           field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3776       m_unpack_func = Rdb_key_def::unpack_integer<4>;
3777       m_covered = true;
3778       return true;
3779 
3780     case MYSQL_TYPE_INT24:
3781       m_pack_func = Rdb_key_def::pack_medium;
3782       m_field_unsigned_flag =
3783           field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3784       m_unpack_func = Rdb_key_def::unpack_integer<3>;
3785       m_covered = true;
3786       return true;
3787 
3788     case MYSQL_TYPE_SHORT:
3789       m_pack_func = Rdb_key_def::pack_short;
3790       m_field_unsigned_flag =
3791           field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3792       m_unpack_func = Rdb_key_def::unpack_integer<2>;
3793       m_covered = true;
3794       return true;
3795 
3796     case MYSQL_TYPE_TINY:
3797       m_pack_func = Rdb_key_def::pack_tiny;
3798       m_field_unsigned_flag =
3799           field ? static_cast<const Field_num *>(field)->unsigned_flag : false;
3800       m_unpack_func = Rdb_key_def::unpack_integer<1>;
3801       m_covered = true;
3802       return true;
3803 
3804     case MYSQL_TYPE_DOUBLE:
3805       m_pack_func = Rdb_key_def::pack_double;
3806       m_unpack_func = Rdb_key_def::unpack_double;
3807       m_covered = true;
3808       return true;
3809 
3810     case MYSQL_TYPE_FLOAT:
3811       m_pack_func = Rdb_key_def::pack_float;
3812       m_unpack_func = Rdb_key_def::unpack_float;
3813       m_covered = true;
3814       return true;
3815 
3816     case MYSQL_TYPE_NEWDECIMAL:
3817       m_pack_func = Rdb_key_def::pack_new_decimal;
3818       m_unpack_func = Rdb_key_def::unpack_binary_str;
3819       m_covered = true;
3820       return true;
3821 
3822     case MYSQL_TYPE_DATETIME2:
3823       m_pack_func = Rdb_key_def::pack_datetime2;
3824       m_unpack_func = Rdb_key_def::unpack_binary_str;
3825       m_covered = true;
3826       return true;
3827 
3828     case MYSQL_TYPE_TIMESTAMP2:
3829       m_pack_func = Rdb_key_def::pack_timestamp2;
3830       m_unpack_func = Rdb_key_def::unpack_binary_str;
3831       m_covered = true;
3832       return true;
3833 
3834     case MYSQL_TYPE_TIME2:
3835       m_pack_func = Rdb_key_def::pack_time2;
3836       m_unpack_func = Rdb_key_def::unpack_binary_str;
3837       m_covered = true;
3838       return true;
3839 
3840     case MYSQL_TYPE_YEAR:
3841       m_pack_func = Rdb_key_def::pack_year;
3842       m_unpack_func = Rdb_key_def::unpack_binary_str;
3843       m_covered = true;
3844       return true;
3845 
3846     case MYSQL_TYPE_NEWDATE:
3847       m_pack_func = Rdb_key_def::pack_newdate;
3848       m_unpack_func = Rdb_key_def::unpack_newdate;
3849       m_covered = true;
3850       return true;
3851 
3852     case MYSQL_TYPE_TINY_BLOB:
3853     case MYSQL_TYPE_MEDIUM_BLOB:
3854     case MYSQL_TYPE_LONG_BLOB:
3855     case MYSQL_TYPE_BLOB:
3856     case MYSQL_TYPE_JSON: {
3857       m_pack_func = Rdb_key_def::pack_blob;
3858       if (key_descr) {
3859         // The my_charset_bin collation is special in that it will consider
3860         // shorter strings sorting as less than longer strings.
3861         //
3862         // See Field_blob::make_sort_key for details.
3863         m_max_image_len =
3864             key_length +
3865             (field->charset() == &my_charset_bin
3866                  ? dynamic_cast<const Field_blob *>(field)->pack_length_no_ptr()
3867                  : 0);
3868         // Return false because indexes on text/blob will always require
3869         // a prefix. With a prefix, the optimizer will not be able to do an
3870         // index-only scan since there may be content occuring after the prefix
3871         // length.
3872         return false;
3873       }
3874     } break;
3875     // Obsolete
3876     case MYSQL_TYPE_DECIMAL:
3877     case MYSQL_TYPE_TIMESTAMP:
3878     case MYSQL_TYPE_TIME:
3879     case MYSQL_TYPE_DATETIME:
3880       assert(0);
3881     default:
3882       break;
3883   }
3884 
3885   m_unpack_info_stores_value = false;
3886   /* Handle [VAR](CHAR|BINARY) */
3887 
3888   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3889     /*
3890       For CHAR-based columns, check how strxfrm image will take.
3891       field->field_length = field->char_length() * cs->mbmaxlen.
3892     */
3893     const CHARSET_INFO *cs = field->charset();
3894     m_max_image_len = cs->coll->strnxfrmlen(cs, field->field_length);
3895   }
3896   const bool is_varchar = (type == MYSQL_TYPE_VARCHAR);
3897   const CHARSET_INFO *cs = field->charset();
3898   // max_image_len before chunking is taken into account
3899   const int max_image_len_before_chunks = m_max_image_len;
3900 
3901   if (is_varchar) {
3902     // The default for varchar is variable-length, without space-padding for
3903     // comparisons
3904     const auto field_var = static_cast<const Field_varstring *>(field);
3905     m_varchar_length_bytes = field_var->length_bytes;
3906     m_varchar_char_length = field_var->char_length();
3907     m_skip_func = Rdb_key_def::skip_variable_length;
3908     m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3909     if (!key_descr || key_descr->use_legacy_varbinary_format()) {
3910       m_max_image_len = RDB_LEGACY_ENCODED_SIZE(m_max_image_len);
3911     } else {
3912       // Calculate the maximum size of the short section plus the
3913       // maximum size of the long section
3914       m_max_image_len = RDB_ENCODED_SIZE(m_max_image_len);
3915     }
3916 
3917     m_unpack_info_uses_two_bytes = (field_var->field_length + 8 >= 0x100);
3918   }
3919 
3920   if (type == MYSQL_TYPE_VARCHAR || type == MYSQL_TYPE_STRING) {
3921     // See http://dev.mysql.com/doc/refman/5.7/en/string-types.html for
3922     // information about character-based datatypes are compared.
3923     bool use_unknown_collation = false;
3924     DBUG_EXECUTE_IF("myrocks_enable_unknown_collation_index_only_scans",
3925                     use_unknown_collation = true;);
3926 
3927     if (cs == &my_charset_bin) {
3928       // - SQL layer pads BINARY(N) so that it always is N bytes long.
3929       // - For VARBINARY(N), values may have different lengths, so we're using
3930       //   variable-length encoding. This is also the only charset where the
3931       //   values are not space-padded for comparison.
3932       m_unpack_func = is_varchar ? Rdb_key_def::unpack_binary_or_utf8_varchar
3933                                  : Rdb_key_def::unpack_binary_str;
3934       res = true;
3935     } else if (cs == &my_charset_latin1_bin || cs == &my_charset_utf8_bin) {
3936       // For _bin collations, mem-comparable form of the string is the string
3937       // itself.
3938 
3939       if (is_varchar) {
3940         // VARCHARs - are compared as if they were space-padded - but are
3941         // not actually space-padded (reading the value back produces the
3942         // original value, without the padding)
3943         m_unpack_func = Rdb_key_def::unpack_binary_or_utf8_varchar_space_pad;
3944         m_skip_func = Rdb_key_def::skip_variable_space_pad;
3945         m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3946         m_make_unpack_info_func = Rdb_key_def::dummy_make_unpack_info;
3947         m_segment_size = get_segment_size_from_collation(cs);
3948         m_max_image_len =
3949             (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3950             m_segment_size;
3951         rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3952                                      &space_mb_len);
3953       } else {
3954         // SQL layer pads CHAR(N) values to their maximum length.
3955         // We just store that and restore it back.
3956         m_unpack_func = (cs == &my_charset_latin1_bin)
3957                             ? Rdb_key_def::unpack_binary_str
3958                             : Rdb_key_def::unpack_utf8_str;
3959       }
3960       res = true;
3961     } else {
3962       // This is [VAR]CHAR(n) and the collation is not $(charset_name)_bin
3963 
3964       res = true;  // index-only scans are possible
3965       m_unpack_data_len = is_varchar ? 0 : field->field_length;
3966       const uint idx = is_varchar ? 0 : 1;
3967       const Rdb_collation_codec *codec = nullptr;
3968 
3969       if (is_varchar) {
3970         // VARCHAR requires space-padding for doing comparisons
3971         //
3972         // The check for cs->levels_for_order is to catch
3973         // latin2_czech_cs and cp1250_czech_cs - multi-level collations
3974         // that Variable-Length Space Padded Encoding can't handle.
3975         // It is not expected to work for any other multi-level collations,
3976         // either.
3977         // Currently we handle these collations as NO_PAD, even if they have
3978         // PAD_SPACE attribute.
3979         if (cs->levels_for_order == 1) {
3980           m_pack_func = Rdb_key_def::pack_with_varchar_space_pad;
3981           m_skip_func = Rdb_key_def::skip_variable_space_pad;
3982           m_segment_size = get_segment_size_from_collation(cs);
3983           m_max_image_len =
3984               (max_image_len_before_chunks / (m_segment_size - 1) + 1) *
3985               m_segment_size;
3986           rdb_get_mem_comparable_space(cs, &space_xfrm, &space_xfrm_len,
3987                                        &space_mb_len);
3988         } else {
3989           //  NO_LINT_DEBUG
3990           sql_print_warning(
3991               "RocksDB: you're trying to create an index "
3992               "with a multi-level collation %s",
3993               cs->name);
3994           //  NO_LINT_DEBUG
3995           sql_print_warning(
3996               "MyRocks will handle this collation internally "
3997               " as if it had a NO_PAD attribute.");
3998           m_pack_func = Rdb_key_def::pack_with_varchar_encoding;
3999           m_skip_func = Rdb_key_def::skip_variable_length;
4000         }
4001       }
4002 
4003       if ((codec = rdb_init_collation_mapping(cs)) != nullptr) {
4004         // The collation allows to store extra information in the unpack_info
4005         // which can be used to restore the original value from the
4006         // mem-comparable form.
4007         m_make_unpack_info_func = codec->m_make_unpack_info_func[idx];
4008         m_unpack_func = codec->m_unpack_func[idx];
4009         m_charset_codec = codec;
4010       } else if (use_unknown_collation) {
4011         // We have no clue about how this collation produces mem-comparable
4012         // form. Our way of restoring the original value is to keep a copy of
4013         // the original value in unpack_info.
4014         m_unpack_info_stores_value = true;
4015         m_make_unpack_info_func = is_varchar
4016                                       ? Rdb_key_def::make_unpack_unknown_varchar
4017                                       : Rdb_key_def::make_unpack_unknown;
4018         m_unpack_func = is_varchar ? Rdb_key_def::unpack_unknown_varchar
4019                                    : Rdb_key_def::unpack_unknown;
4020       } else {
4021         // Same as above: we don't know how to restore the value from its
4022         // mem-comparable form.
4023         // Here, we just indicate to the SQL layer we can't do it.
4024         assert(m_unpack_func == nullptr);
4025         m_unpack_info_stores_value = false;
4026         res = false;  // Indicate that index-only reads are not possible
4027       }
4028     }
4029 
4030     // Make an adjustment: if this column is partially covered, tell the SQL
4031     // layer we can't do index-only scans. Later when we perform an index read,
4032     // we'll check on a record-by-record basis if we can do an index-only scan
4033     // or not.
4034     uint field_length;
4035     if (field->table) {
4036       field_length = field->table->field[field->field_index]->field_length;
4037     } else {
4038       field_length = field->field_length;
4039     }
4040 
4041     if (field_length != key_length) {
4042       res = false;
4043       // If this index doesn't support covered bitmaps, then we won't know
4044       // during a read if the column is actually covered or not. If so, we need
4045       // to assume the column isn't covered and skip it during unpacking.
4046       //
4047       // If key_descr == NULL, then this is a dummy field and we probably don't
4048       // need to perform this step. However, to preserve the behavior before
4049       // this change, we'll only skip this step if we have an index which
4050       // supports covered bitmaps.
4051       if (!key_descr || !key_descr->use_covered_bitmap_format()) {
4052         m_unpack_func = nullptr;
4053         m_make_unpack_info_func = nullptr;
4054         m_unpack_info_stores_value = true;
4055       }
4056     }
4057   }
4058 
4059   m_covered = res;
4060   return res;
4061 }
4062 
get_field_in_table(const TABLE * const tbl) const4063 Field *Rdb_field_packing::get_field_in_table(const TABLE *const tbl) const {
4064   return tbl->key_info[m_keynr].key_part[m_key_part].field;
4065 }
4066 
fill_hidden_pk_val(uchar ** dst,const longlong hidden_pk_id) const4067 void Rdb_field_packing::fill_hidden_pk_val(uchar **dst,
4068                                            const longlong hidden_pk_id) const {
4069   assert(m_max_image_len == 8);
4070 
4071   String to;
4072   rdb_netstr_append_uint64(&to, hidden_pk_id);
4073   memcpy(*dst, to.ptr(), m_max_image_len);
4074 
4075   *dst += m_max_image_len;
4076 }
4077 
4078 ///////////////////////////////////////////////////////////////////////////////////////////
4079 // Rdb_ddl_manager
4080 ///////////////////////////////////////////////////////////////////////////////////////////
4081 
~Rdb_tbl_def()4082 Rdb_tbl_def::~Rdb_tbl_def() {
4083   auto ddl_manager = rdb_get_ddl_manager();
4084   /* Don't free key definitions */
4085   if (m_key_descr_arr) {
4086     for (uint i = 0; i < m_key_count; i++) {
4087       if (ddl_manager && m_key_descr_arr[i]) {
4088         ddl_manager->erase_index_num(m_key_descr_arr[i]->get_gl_index_id());
4089       }
4090 
4091       m_key_descr_arr[i] = nullptr;
4092     }
4093 
4094     delete[] m_key_descr_arr;
4095     m_key_descr_arr = nullptr;
4096   }
4097 }
4098 
4099 /*
4100   Put table definition DDL entry. Actual write is done at
4101   Rdb_dict_manager::commit.
4102 
4103   We write
4104     dbname.tablename -> version + {key_entry, key_entry, key_entry, ... }
4105 
4106   Where key entries are a tuple of
4107     ( cf_id, index_nr )
4108 */
4109 
put_dict(Rdb_dict_manager * const dict,Rdb_cf_manager * cf_manager,rocksdb::WriteBatch * const batch,const rocksdb::Slice & key)4110 bool Rdb_tbl_def::put_dict(Rdb_dict_manager *const dict,
4111                            Rdb_cf_manager *cf_manager,
4112                            rocksdb::WriteBatch *const batch,
4113                            const rocksdb::Slice &key) {
4114   StringBuffer<8 * Rdb_key_def::PACKED_SIZE> indexes;
4115   indexes.alloc(Rdb_key_def::VERSION_SIZE +
4116                 m_key_count * Rdb_key_def::PACKED_SIZE * 2);
4117   rdb_netstr_append_uint16(&indexes, Rdb_key_def::DDL_ENTRY_INDEX_VERSION);
4118 
4119   for (uint i = 0; i < m_key_count; i++) {
4120     const Rdb_key_def &kd = *m_key_descr_arr[i];
4121 
4122     const uint cf_id = kd.get_cf()->GetID();
4123     /*
4124       If cf_id already exists, cf_flags must be the same.
4125       To prevent race condition, reading/modifying/committing CF flags
4126       need to be protected by mutex (dict_manager->lock()).
4127       When RocksDB supports transaction with pessimistic concurrency
4128       control, we can switch to use it and removing mutex.
4129     */
4130     const std::string cf_name = kd.get_cf()->GetName();
4131 
4132     std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
4133         cf_manager->get_cf(cf_name);
4134 
4135     if (!cfh || cfh != kd.get_shared_cf() || dict->get_dropped_cf(cf_id)) {
4136       // The CF has been dropped, i.e., cf_manager.remove_dropped_cf() has been
4137       // called; or the CF is being dropped, i.e., cf_manager.drop_cf() has
4138       // been called.
4139       my_error(ER_CF_DROPPED, MYF(0), cf_name.c_str());
4140       return true;
4141     }
4142 
4143     rdb_netstr_append_uint32(&indexes, cf_id);
4144 
4145     uint32 index_number = kd.get_index_number();
4146     rdb_netstr_append_uint32(&indexes, index_number);
4147 
4148     struct Rdb_index_info index_info;
4149     index_info.m_gl_index_id = {cf_id, index_number};
4150     index_info.m_index_dict_version = Rdb_key_def::INDEX_INFO_VERSION_LATEST;
4151     index_info.m_index_type = kd.m_index_type;
4152     index_info.m_kv_version = kd.m_kv_format_version;
4153     index_info.m_index_flags = kd.m_index_flags_bitmap;
4154     index_info.m_ttl_duration = kd.m_ttl_duration;
4155 
4156     dict->add_or_update_index_cf_mapping(batch, &index_info);
4157   }
4158 
4159   const rocksdb::Slice svalue(indexes.c_ptr(), indexes.length());
4160 
4161   dict->put_key(batch, key, svalue);
4162   return false;
4163 }
4164 
get_create_time()4165 time_t Rdb_tbl_def::get_create_time() {
4166   time_t create_time = m_create_time;
4167 
4168   if (create_time == CREATE_TIME_UNKNOWN) {
4169     // Read it from the .frm file. It's not a problem if several threads do this
4170     // concurrently
4171     char path[FN_REFLEN];
4172     snprintf(path, sizeof(path), "%s/%s/%s%s", mysql_data_home,
4173              m_dbname.c_str(), m_tablename.c_str(), reg_ext);
4174     unpack_filename(path, path);
4175     MY_STAT f_stat;
4176     if (my_stat(path, &f_stat, MYF(0)))
4177       create_time = f_stat.st_ctime;
4178     else
4179       create_time = 0;  // will be shown as SQL NULL
4180     m_create_time = create_time;
4181   }
4182   return create_time;
4183 }
4184 
4185 // Length that each index flag takes inside the record.
4186 // Each index in the array maps to the enum INDEX_FLAG
4187 static const std::array<uint, 1> index_flag_lengths = {
4188     {ROCKSDB_SIZEOF_TTL_RECORD}};
4189 
has_index_flag(uint32 index_flags,enum INDEX_FLAG flag)4190 bool Rdb_key_def::has_index_flag(uint32 index_flags, enum INDEX_FLAG flag) {
4191   return flag & index_flags;
4192 }
4193 
calculate_index_flag_offset(uint32 index_flags,enum INDEX_FLAG flag,uint * const length)4194 uint32 Rdb_key_def::calculate_index_flag_offset(uint32 index_flags,
4195                                                 enum INDEX_FLAG flag,
4196                                                 uint *const length) {
4197   assert_IMP(flag != MAX_FLAG,
4198                   Rdb_key_def::has_index_flag(index_flags, flag));
4199 
4200   uint offset = 0;
4201   for (size_t bit = 0; bit < sizeof(index_flags) * CHAR_BIT; ++bit) {
4202     int mask = 1 << bit;
4203 
4204     /* Exit once we've reached the proper flag */
4205     if (flag & mask) {
4206       if (length != nullptr) {
4207         *length = index_flag_lengths[bit];
4208       }
4209       break;
4210     }
4211 
4212     if (index_flags & mask) {
4213       offset += index_flag_lengths[bit];
4214     }
4215   }
4216 
4217   return offset;
4218 }
4219 
write_index_flag_field(Rdb_string_writer * const buf,const uchar * const val,enum INDEX_FLAG flag) const4220 void Rdb_key_def::write_index_flag_field(Rdb_string_writer *const buf,
4221                                          const uchar *const val,
4222                                          enum INDEX_FLAG flag) const {
4223   uint len;
4224   uint offset = calculate_index_flag_offset(m_index_flags_bitmap, flag, &len);
4225   assert(offset + len <= buf->get_current_pos());
4226   memcpy(buf->ptr() + offset, val, len);
4227 }
4228 
check_if_is_mysql_system_table()4229 void Rdb_tbl_def::check_if_is_mysql_system_table() {
4230   static const char *const system_dbs[] = {
4231       "mysql",
4232       "performance_schema",
4233       "information_schema",
4234   };
4235 
4236   m_is_mysql_system_table = false;
4237   for (uint ii = 0; ii < array_elements(system_dbs); ii++) {
4238     if (strcmp(m_dbname.c_str(), system_dbs[ii]) == 0) {
4239       m_is_mysql_system_table = true;
4240       break;
4241     }
4242   }
4243 }
4244 
check_and_set_read_free_rpl_table()4245 void Rdb_tbl_def::check_and_set_read_free_rpl_table() {
4246   m_is_read_free_rpl_table =
4247       rdb_read_free_regex_handler.match(base_tablename());
4248 }
4249 
set_name(const std::string & name)4250 void Rdb_tbl_def::set_name(const std::string &name) {
4251   int err MY_ATTRIBUTE((__unused__));
4252 
4253   m_dbname_tablename = name;
4254   err = rdb_split_normalized_tablename(name, &m_dbname, &m_tablename,
4255                                        &m_partition);
4256   assert(err == 0);
4257 
4258   check_if_is_mysql_system_table();
4259 }
4260 
get_autoincr_gl_index_id()4261 GL_INDEX_ID Rdb_tbl_def::get_autoincr_gl_index_id() {
4262   for (uint i = 0; i < m_key_count; i++) {
4263     auto &k = m_key_descr_arr[i];
4264     if (k->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
4265         k->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY) {
4266       return k->get_gl_index_id();
4267     }
4268   }
4269 
4270   // Every table must have a primary key, even if it's hidden.
4271   abort();
4272   return GL_INDEX_ID();
4273 }
4274 
erase_index_num(const GL_INDEX_ID & gl_index_id)4275 void Rdb_ddl_manager::erase_index_num(const GL_INDEX_ID &gl_index_id) {
4276   m_index_num_to_keydef.erase(gl_index_id);
4277 }
4278 
add_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)4279 void Rdb_ddl_manager::add_uncommitted_keydefs(
4280     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
4281   mysql_rwlock_wrlock(&m_rwlock);
4282   for (const auto &index : indexes) {
4283     m_index_num_to_uncommitted_keydef[index->get_gl_index_id()] = index;
4284   }
4285   mysql_rwlock_unlock(&m_rwlock);
4286 }
4287 
remove_uncommitted_keydefs(const std::unordered_set<std::shared_ptr<Rdb_key_def>> & indexes)4288 void Rdb_ddl_manager::remove_uncommitted_keydefs(
4289     const std::unordered_set<std::shared_ptr<Rdb_key_def>> &indexes) {
4290   mysql_rwlock_wrlock(&m_rwlock);
4291   for (const auto &index : indexes) {
4292     m_index_num_to_uncommitted_keydef.erase(index->get_gl_index_id());
4293   }
4294   mysql_rwlock_unlock(&m_rwlock);
4295 }
4296 
find_in_uncommitted_keydef(const uint32_t & cf_id)4297 int Rdb_ddl_manager::find_in_uncommitted_keydef(const uint32_t &cf_id) {
4298   mysql_rwlock_rdlock(&m_rwlock);
4299   for (const auto &pr : m_index_num_to_uncommitted_keydef) {
4300     const auto &kd = pr.second;
4301 
4302     if (kd->get_cf()->GetID() == cf_id) {
4303       mysql_rwlock_unlock(&m_rwlock);
4304       return HA_EXIT_FAILURE;
4305     }
4306   }
4307 
4308   mysql_rwlock_unlock(&m_rwlock);
4309   return HA_EXIT_SUCCESS;
4310 }
4311 
4312 #if defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) && ROCKSDB_INCLUDE_VALIDATE_TABLES
4313 namespace  // anonymous namespace = not visible outside this source file
4314 {
4315 struct Rdb_validate_tbls : public Rdb_tables_scanner {
4316   using tbl_info_t = std::pair<std::string, bool>;
4317   using tbl_list_t = std::map<std::string, std::set<tbl_info_t>>;
4318 
4319   tbl_list_t m_list;
4320 
4321   int add_table(Rdb_tbl_def *tdef) override;
4322 
4323   bool compare_to_actual_tables(const std::string &datadir, bool *has_errors);
4324 
4325   bool scan_for_frms(const std::string &datadir, const std::string &dbname,
4326                      bool *has_errors);
4327 
4328   bool check_frm_file(const std::string &fullpath, const std::string &dbname,
4329                       const std::string &tablename, bool *has_errors);
4330 };
4331 }  // anonymous namespace
4332 
4333 /*
4334   Get a list of tables that we expect to have .frm files for.  This will use the
4335   information just read from the RocksDB data dictionary.
4336 */
add_table(Rdb_tbl_def * tdef)4337 int Rdb_validate_tbls::add_table(Rdb_tbl_def *tdef) {
4338   assert(tdef != nullptr);
4339 
4340   /*
4341     Add the database/table into the list that are not temp table.
4342     Also skip over truncate temp table.
4343   */
4344   if (tdef->base_tablename().find(tmp_file_prefix) == std::string::npos &&
4345       tdef->base_tablename().find(TRUNCATE_TABLE_PREFIX) == std::string::npos) {
4346     bool is_partition = tdef->base_partition().size() != 0;
4347     m_list[tdef->base_dbname()].insert(
4348         tbl_info_t(tdef->base_tablename(), is_partition));
4349   }
4350 
4351   return HA_EXIT_SUCCESS;
4352 }
4353 
4354 /*
4355   Access the .frm file for this dbname/tablename and see if it is a RocksDB
4356   table (or partition table).
4357 */
check_frm_file(const std::string & fullpath,const std::string & dbname,const std::string & tablename,bool * has_errors)4358 bool Rdb_validate_tbls::check_frm_file(const std::string &fullpath,
4359                                        const std::string &dbname,
4360                                        const std::string &tablename,
4361                                        bool *has_errors) {
4362   /* Check this .frm file to see what engine it uses */
4363   String fullfilename(fullpath.c_str(), &my_charset_bin);
4364   fullfilename.append(FN_DIRSEP);
4365   fullfilename.append(tablename.c_str());
4366   fullfilename.append(".frm");
4367 
4368   /*
4369     This function will return the legacy_db_type of the table.  Currently
4370     it does not reference the first parameter (THD* thd), but if it ever
4371     did in the future we would need to make a version that does it without
4372     the connection handle as we don't have one here.
4373   */
4374   enum legacy_db_type eng_type;
4375   frm_type_enum type = dd_frm_type(nullptr, fullfilename.c_ptr(), &eng_type);
4376   if (type == FRMTYPE_ERROR) {
4377     // NO_LINT_DEBUG
4378     sql_print_warning("RocksDB: Failed to open/read .from file: %s",
4379                       fullfilename.ptr());
4380     return false;
4381   }
4382 
4383   std::string partition_info_str;
4384   if (!native_part::get_part_str_for_path(fullfilename.c_ptr(),
4385                                           partition_info_str)) {
4386     sql_print_warning("RocksDB: can't read partition info string from %s",
4387                       fullfilename.ptr());
4388     return false;
4389   }
4390 
4391   if (!partition_info_str.empty()) eng_type = DB_TYPE_PARTITION_DB;
4392 
4393   if (type == FRMTYPE_TABLE) {
4394     /* For a RocksDB table do we have a reference in the data dictionary? */
4395     if (eng_type == DB_TYPE_ROCKSDB) {
4396       /*
4397         Attempt to remove the table entry from the list of tables.  If this
4398         fails then we know we had a .frm file that wasn't registered in RocksDB.
4399       */
4400       tbl_info_t element(tablename, false);
4401       if (m_list.count(dbname) == 0 || m_list[dbname].erase(element) == 0) {
4402         sql_print_warning(
4403             "RocksDB: Schema mismatch - "
4404             "A .frm file exists for table %s.%s, "
4405             "but that table is not registered in RocksDB",
4406             dbname.c_str(), tablename.c_str());
4407         *has_errors = true;
4408       }
4409     } else if (eng_type == DB_TYPE_PARTITION_DB) {
4410       /*
4411         For partition tables, see if it is in the m_list as a partition,
4412         but don't generate an error if it isn't there - we don't know that the
4413         .frm is for RocksDB.
4414       */
4415       if (m_list.count(dbname) > 0) {
4416         m_list[dbname].erase(tbl_info_t(tablename, true));
4417       }
4418     }
4419   }
4420 
4421   return true;
4422 }
4423 
4424 /* Scan the database subdirectory for .frm files */
scan_for_frms(const std::string & datadir,const std::string & dbname,bool * has_errors)4425 bool Rdb_validate_tbls::scan_for_frms(const std::string &datadir,
4426                                       const std::string &dbname,
4427                                       bool *has_errors) {
4428   bool result = true;
4429   std::string fullpath = datadir + dbname;
4430   struct st_my_dir *dir_info = my_dir(fullpath.c_str(), MYF(MY_DONT_SORT));
4431 
4432   /* Access the directory */
4433   if (dir_info == nullptr) {
4434     // NO_LINT_DEBUG
4435     sql_print_warning("RocksDB: Could not open database directory: %s",
4436                       fullpath.c_str());
4437     return false;
4438   }
4439 
4440   /* Scan through the files in the directory */
4441   struct fileinfo *file_info = dir_info->dir_entry;
4442   for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
4443     /* Find .frm files that are not temp files (those that contain '#sql') */
4444     const char *ext = strrchr(file_info->name, '.');
4445     if (ext != nullptr && strstr(file_info->name, tmp_file_prefix) == nullptr &&
4446         strcmp(ext, ".frm") == 0) {
4447       std::string tablename =
4448           std::string(file_info->name, ext - file_info->name);
4449 
4450       /* Check to see if the .frm file is from RocksDB */
4451       if (!check_frm_file(fullpath, dbname, tablename, has_errors)) {
4452         result = false;
4453         break;
4454       }
4455     }
4456   }
4457 
4458   /* Remove any databases who have no more tables listed */
4459   if (m_list.count(dbname) == 1 && m_list[dbname].size() == 0) {
4460     m_list.erase(dbname);
4461   }
4462 
4463   /* Release the directory entry */
4464   my_dirend(dir_info);
4465 
4466   return result;
4467 }
4468 
4469 /*
4470   Scan the datadir for all databases (subdirectories) and get a list of .frm
4471   files they contain
4472 */
compare_to_actual_tables(const std::string & datadir,bool * has_errors)4473 bool Rdb_validate_tbls::compare_to_actual_tables(const std::string &datadir,
4474                                                  bool *has_errors) {
4475   bool result = true;
4476   struct st_my_dir *dir_info;
4477   struct fileinfo *file_info;
4478 
4479   dir_info = my_dir(datadir.c_str(), MYF(MY_DONT_SORT | MY_WANT_STAT));
4480   if (dir_info == nullptr) {
4481     // NO_LINT_DEBUG
4482     sql_print_warning("RocksDB: could not open datadir: %s", datadir.c_str());
4483     return false;
4484   }
4485 
4486   file_info = dir_info->dir_entry;
4487   for (uint ii = 0; ii < dir_info->number_off_files; ii++, file_info++) {
4488     /* Ignore files/dirs starting with '.' */
4489     if (file_info->name[0] == '.') continue;
4490 
4491     /* Ignore all non-directory files */
4492     if (!MY_S_ISDIR(file_info->mystat->st_mode)) continue;
4493 
4494     /* Scan all the .frm files in the directory */
4495     if (!scan_for_frms(datadir, file_info->name, has_errors)) {
4496       result = false;
4497       break;
4498     }
4499   }
4500 
4501   /* Release the directory info */
4502   my_dirend(dir_info);
4503 
4504   return result;
4505 }
4506 
4507 /*
4508   Validate that all auto increment values in the data dictionary are on a
4509   supported version.
4510 */
validate_auto_incr()4511 bool Rdb_ddl_manager::validate_auto_incr() {
4512   std::unique_ptr<rocksdb::Iterator> it(m_dict->new_iterator());
4513 
4514   uchar auto_incr_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4515   rdb_netbuf_store_index(auto_incr_entry, Rdb_key_def::AUTO_INC);
4516   const rocksdb::Slice auto_incr_entry_slice(
4517       reinterpret_cast<char *>(auto_incr_entry),
4518       Rdb_key_def::INDEX_NUMBER_SIZE);
4519   for (it->Seek(auto_incr_entry_slice); it->Valid(); it->Next()) {
4520     const rocksdb::Slice key = it->key();
4521     const rocksdb::Slice val = it->value();
4522     GL_INDEX_ID gl_index_id;
4523 
4524     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4525         memcmp(key.data(), auto_incr_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4526       break;
4527     }
4528 
4529     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3) {
4530       return false;
4531     }
4532 
4533     if (val.size() <= Rdb_key_def::VERSION_SIZE) {
4534       return false;
4535     }
4536 
4537     // Check if we have orphaned entries for whatever reason by cross
4538     // referencing ddl entries.
4539     auto ptr = reinterpret_cast<const uchar *>(key.data());
4540     ptr += Rdb_key_def::INDEX_NUMBER_SIZE;
4541     rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4542     if (!m_dict->get_index_info(gl_index_id, nullptr)) {
4543       // NO_LINT_DEBUG
4544       sql_print_warning(
4545           "RocksDB: AUTOINC mismatch - "
4546           "Index number (%u, %u) found in AUTOINC "
4547           "but does not exist as a DDL entry for table %s",
4548           gl_index_id.cf_id, gl_index_id.index_id,
4549           safe_get_table_name(gl_index_id).c_str());
4550       return false;
4551     }
4552 
4553     ptr = reinterpret_cast<const uchar *>(val.data());
4554     const int version = rdb_netbuf_read_uint16(&ptr);
4555     if (version > Rdb_key_def::AUTO_INCREMENT_VERSION) {
4556       // NO_LINT_DEBUG
4557       sql_print_warning(
4558           "RocksDB: AUTOINC mismatch - "
4559           "Index number (%u, %u) found in AUTOINC "
4560           "is on unsupported version %d for table %s",
4561           gl_index_id.cf_id, gl_index_id.index_id, version,
4562           safe_get_table_name(gl_index_id).c_str());
4563       return false;
4564     }
4565   }
4566 
4567   if (!it->status().ok()) {
4568     return false;
4569   }
4570 
4571   return true;
4572 }
4573 
4574 /*
4575   Validate that all the tables in the RocksDB database dictionary match the .frm
4576   files in the datadir
4577 */
validate_schemas(void)4578 bool Rdb_ddl_manager::validate_schemas(void) {
4579   bool has_errors = false;
4580   const std::string datadir = std::string(mysql_real_data_home);
4581   Rdb_validate_tbls table_list;
4582 
4583   /* Get the list of tables from the database dictionary */
4584   if (scan_for_tables(&table_list) != 0) {
4585     return false;
4586   }
4587 
4588   /* Compare that to the list of actual .frm files */
4589   if (!table_list.compare_to_actual_tables(datadir, &has_errors)) {
4590     return false;
4591   }
4592 
4593   /*
4594     Any tables left in the tables list are ones that are registered in RocksDB
4595     but don't have .frm files.
4596   */
4597   for (const auto &db : table_list.m_list) {
4598     for (const auto &table : db.second) {
4599       sql_print_warning(
4600           "RocksDB: Schema mismatch - "
4601           "Table %s.%s is registered in RocksDB "
4602           "but does not have a .frm file",
4603           db.first.c_str(), table.first.c_str());
4604       has_errors = true;
4605     }
4606   }
4607 
4608   return !has_errors;
4609 }
4610 #endif  // defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) &&
4611         // ROCKSDB_INCLUDE_VALIDATE_TABLES
4612 
4613 #if defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) && ROCKSDB_INCLUDE_VALIDATE_TABLES
init(Rdb_dict_manager * const dict_arg,Rdb_cf_manager * const cf_manager,const uint32_t validate_tables)4614 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4615                            Rdb_cf_manager *const cf_manager,
4616                            const uint32_t validate_tables) {
4617 #else
4618 bool Rdb_ddl_manager::init(Rdb_dict_manager *const dict_arg,
4619                            Rdb_cf_manager *const cf_manager) {
4620 #endif  // defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) &&
4621         // ROCKSDB_INCLUDE_VALIDATE_TABLES
4622   m_dict = dict_arg;
4623   m_cf_manager = cf_manager;
4624   mysql_rwlock_init(0, &m_rwlock);
4625 
4626   /* Read the data dictionary and populate the hash */
4627   uchar ddl_entry[Rdb_key_def::INDEX_NUMBER_SIZE];
4628   rdb_netbuf_store_index(ddl_entry, Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
4629   const rocksdb::Slice ddl_entry_slice((char *)ddl_entry,
4630                                        Rdb_key_def::INDEX_NUMBER_SIZE);
4631 
4632   /* Reading data dictionary should always skip bloom filter */
4633   rocksdb::Iterator *it = m_dict->new_iterator();
4634   int i = 0;
4635 
4636   uint max_index_id_in_dict = 0;
4637   m_dict->get_max_index_id(&max_index_id_in_dict);
4638 
4639   for (it->Seek(ddl_entry_slice); it->Valid(); it->Next()) {
4640     const uchar *ptr;
4641     const uchar *ptr_end;
4642     const rocksdb::Slice key = it->key();
4643     const rocksdb::Slice val = it->value();
4644 
4645     if (key.size() >= Rdb_key_def::INDEX_NUMBER_SIZE &&
4646         memcmp(key.data(), ddl_entry, Rdb_key_def::INDEX_NUMBER_SIZE)) {
4647       break;
4648     }
4649 
4650     if (key.size() <= Rdb_key_def::INDEX_NUMBER_SIZE) {
4651       sql_print_error("RocksDB: Table_store: key has length %d (corruption?)",
4652                       (int)key.size());
4653       return true;
4654     }
4655 
4656     Rdb_tbl_def *const tdef =
4657         new Rdb_tbl_def(key, Rdb_key_def::INDEX_NUMBER_SIZE);
4658 
4659     // Now, read the DDLs.
4660     const int real_val_size = val.size() - Rdb_key_def::VERSION_SIZE;
4661     if (real_val_size % Rdb_key_def::PACKED_SIZE * 2 > 0) {
4662       sql_print_error("RocksDB: Table_store: invalid keylist for table %s",
4663                       tdef->full_tablename().c_str());
4664       return true;
4665     }
4666     tdef->m_key_count = real_val_size / (Rdb_key_def::PACKED_SIZE * 2);
4667     tdef->m_key_descr_arr = new std::shared_ptr<Rdb_key_def>[tdef->m_key_count];
4668 
4669     ptr = reinterpret_cast<const uchar *>(val.data());
4670     const int version = rdb_netbuf_read_uint16(&ptr);
4671     if (version != Rdb_key_def::DDL_ENTRY_INDEX_VERSION) {
4672       sql_print_error(
4673           "RocksDB: DDL ENTRY Version was not expected."
4674           "Expected: %d, Actual: %d",
4675           Rdb_key_def::DDL_ENTRY_INDEX_VERSION, version);
4676       return true;
4677     }
4678     ptr_end = ptr + real_val_size;
4679     for (uint keyno = 0; ptr < ptr_end; keyno++) {
4680       GL_INDEX_ID gl_index_id;
4681       rdb_netbuf_read_gl_index(&ptr, &gl_index_id);
4682       uint flags = 0;
4683       struct Rdb_index_info index_info;
4684       if (!m_dict->get_index_info(gl_index_id, &index_info)) {
4685         sql_print_error(
4686             "RocksDB: Could not get index information "
4687             "for Index Number (%u,%u), table %s",
4688             gl_index_id.cf_id, gl_index_id.index_id,
4689             tdef->full_tablename().c_str());
4690         return true;
4691       }
4692       if (max_index_id_in_dict < gl_index_id.index_id) {
4693         sql_print_error(
4694             "RocksDB: Found max index id %u from data dictionary "
4695             "but also found larger index id %u from dictionary. "
4696             "This should never happen and possibly a bug.",
4697             max_index_id_in_dict, gl_index_id.index_id);
4698         return true;
4699       }
4700       if (!m_dict->get_cf_flags(gl_index_id.cf_id, &flags)) {
4701         sql_print_error(
4702             "RocksDB: Could not get Column Family Flags "
4703             "for CF Number %d, table %s",
4704             gl_index_id.cf_id, tdef->full_tablename().c_str());
4705         return true;
4706       }
4707 
4708       if ((flags & Rdb_key_def::AUTO_CF_FLAG) != 0) {
4709         // The per-index cf option is deprecated.  Make sure we don't have the
4710         // flag set in any existing database.   NO_LINT_DEBUG
4711         sql_print_error(
4712             "RocksDB: The defunct AUTO_CF_FLAG is enabled for CF "
4713             "number %d, table %s",
4714             gl_index_id.cf_id, tdef->full_tablename().c_str());
4715       }
4716 
4717       std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
4718           cf_manager->get_cf(gl_index_id.cf_id);
4719       assert(cfh);
4720 
4721       uint32 ttl_rec_offset =
4722           Rdb_key_def::has_index_flag(index_info.m_index_flags,
4723                                       Rdb_key_def::TTL_FLAG)
4724               ? Rdb_key_def::calculate_index_flag_offset(
4725                     index_info.m_index_flags, Rdb_key_def::TTL_FLAG)
4726               : UINT_MAX;
4727 
4728       /*
4729         We can't fully initialize Rdb_key_def object here, because full
4730         initialization requires that there is an open TABLE* where we could
4731         look at Field* objects and set max_length and other attributes
4732       */
4733       tdef->m_key_descr_arr[keyno] = std::make_shared<Rdb_key_def>(
4734           gl_index_id.index_id, keyno, cfh, index_info.m_index_dict_version,
4735           index_info.m_index_type, index_info.m_kv_version,
4736           flags & Rdb_key_def::REVERSE_CF_FLAG,
4737           flags & Rdb_key_def::PER_PARTITION_CF_FLAG, "",
4738           m_dict->get_stats(gl_index_id), index_info.m_index_flags,
4739           ttl_rec_offset, index_info.m_ttl_duration);
4740     }
4741 
4742     assert(tdef->m_key_count > 0);
4743     tdef->m_tbl_stats.set(
4744         tdef->m_key_count > 0 ? tdef->m_key_descr_arr[0]->m_stats.m_rows : 0, 0,
4745         0);
4746 
4747     put(tdef);
4748     i++;
4749   }
4750 
4751 #if defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) && ROCKSDB_INCLUDE_VALIDATE_TABLES
4752   /*
4753     If validate_tables is greater than 0 run the validation.  Only fail the
4754     initialzation if the setting is 1.  If the setting is 2 we continue.
4755   */
4756   if (validate_tables > 0) {
4757     std::string msg;
4758     if (!validate_schemas()) {
4759       msg =
4760           "RocksDB: Problems validating data dictionary "
4761           "against .frm files, exiting";
4762     } else if (!validate_auto_incr()) {
4763       msg =
4764           "RocksDB: Problems validating auto increment values in "
4765           "data dictionary, exiting";
4766     }
4767     if (validate_tables == 1 && !msg.empty()) {
4768       // NO_LINT_DEBUG
4769       sql_print_error(
4770           "%s. Use \"rocksdb_validate_tables=2\" to ignore this error.",
4771           msg.c_str());
4772       return true;
4773     }
4774   }
4775 #endif  // defined(ROCKSDB_INCLUDE_VALIDATE_TABLES) &&
4776         // ROCKSDB_INCLUDE_VALIDATE_TABLES
4777 
4778   // index ids used by applications should not conflict with
4779   // data dictionary index ids
4780   if (max_index_id_in_dict < Rdb_key_def::END_DICT_INDEX_ID) {
4781     max_index_id_in_dict = Rdb_key_def::END_DICT_INDEX_ID;
4782   }
4783 
4784   m_sequence.init(max_index_id_in_dict + 1);
4785 
4786   if (!it->status().ok()) {
4787     rdb_log_status_error(it->status(), "Table_store load error");
4788     return true;
4789   }
4790   delete it;
4791   // NO_LINT_DEBUG
4792   sql_print_information("RocksDB: Table_store: loaded DDL data for %d tables",
4793                         i);
4794   return false;
4795 }
4796 
4797 Rdb_tbl_def *Rdb_ddl_manager::find(const std::string &table_name,
4798                                    const bool lock) {
4799   Rdb_tbl_def *rec = nullptr;
4800 
4801   if (lock) {
4802     mysql_rwlock_rdlock(&m_rwlock);
4803   }
4804 
4805   const auto &it = m_ddl_map.find(table_name);
4806   if (it != m_ddl_map.end()) {
4807     rec = it->second;
4808   }
4809 
4810   if (lock) {
4811     mysql_rwlock_unlock(&m_rwlock);
4812   }
4813 
4814   return rec;
4815 }
4816 
4817 int Rdb_ddl_manager::find_indexes(const std::string &table_name,
4818                                   std::vector<GL_INDEX_ID> *indexes) {
4819   mysql_rwlock_rdlock(&m_rwlock);
4820 
4821   Rdb_tbl_def *tdef = nullptr;
4822   const auto it = m_ddl_map.find(table_name);
4823   if (it != m_ddl_map.end()) {
4824     tdef = it->second;
4825   }
4826 
4827   if (!tdef) {
4828     mysql_rwlock_unlock(&m_rwlock);
4829     return HA_EXIT_FAILURE;
4830   }
4831 
4832   for (uint i = 0; i < tdef->m_key_count; i++) {
4833     indexes->push_back(tdef->m_key_descr_arr[i]->get_gl_index_id());
4834   }
4835 
4836   mysql_rwlock_unlock(&m_rwlock);
4837 
4838   return HA_EXIT_SUCCESS;
4839 }
4840 
4841 int Rdb_ddl_manager::find_table_stats(const std::string &table_name,
4842                                       Rdb_table_stats *tbl_stats) {
4843   mysql_rwlock_rdlock(&m_rwlock);
4844 
4845   Rdb_tbl_def *tdef = nullptr;
4846   const auto it = m_ddl_map.find(table_name);
4847   if (it != m_ddl_map.end()) {
4848     tdef = it->second;
4849   }
4850 
4851   if (!tdef) {
4852     mysql_rwlock_unlock(&m_rwlock);
4853     return HA_EXIT_FAILURE;
4854   }
4855 
4856   *tbl_stats = tdef->m_tbl_stats;
4857 
4858   mysql_rwlock_unlock(&m_rwlock);
4859 
4860   return HA_EXIT_SUCCESS;
4861 }
4862 
4863 // this is a safe version of the find() function below.  It acquires a read
4864 // lock on m_rwlock to make sure the Rdb_key_def is not discarded while we
4865 // are finding it.  Copying it into 'ret' increments the count making sure
4866 // that the object will not be discarded until we are finished with it.
4867 std::shared_ptr<const Rdb_key_def> Rdb_ddl_manager::safe_find(
4868     GL_INDEX_ID gl_index_id) {
4869   std::shared_ptr<const Rdb_key_def> ret(nullptr);
4870 
4871   mysql_rwlock_rdlock(&m_rwlock);
4872 
4873   const auto it = m_index_num_to_keydef.find(gl_index_id);
4874   if (it != m_index_num_to_keydef.end()) {
4875     const auto table_def = find(it->second.first, false);
4876     if (table_def && it->second.second < table_def->m_key_count) {
4877       const auto &kd = table_def->m_key_descr_arr[it->second.second];
4878       if (kd->max_storage_fmt_length() != 0) {
4879         ret = kd;
4880       }
4881     }
4882   } else {
4883     const auto uncommitted_it =
4884         m_index_num_to_uncommitted_keydef.find(gl_index_id);
4885     if (uncommitted_it != m_index_num_to_uncommitted_keydef.end()) {
4886       const auto &kd = uncommitted_it->second;
4887       if (kd->max_storage_fmt_length() != 0) {
4888         ret = kd;
4889       }
4890     }
4891   }
4892 
4893   mysql_rwlock_unlock(&m_rwlock);
4894 
4895   return ret;
4896 }
4897 
4898 // this method assumes at least read-only lock on m_rwlock
4899 const std::shared_ptr<Rdb_key_def> &Rdb_ddl_manager::find(
4900     GL_INDEX_ID gl_index_id) {
4901   const auto it = m_index_num_to_keydef.find(gl_index_id);
4902   if (it != m_index_num_to_keydef.end()) {
4903     const auto table_def = find(it->second.first, false);
4904     if (table_def) {
4905       if (it->second.second < table_def->m_key_count) {
4906         return table_def->m_key_descr_arr[it->second.second];
4907       }
4908     }
4909   } else {
4910     const auto uncommitted_it =
4911         m_index_num_to_uncommitted_keydef.find(gl_index_id);
4912     if (uncommitted_it != m_index_num_to_uncommitted_keydef.end()) {
4913       return uncommitted_it->second;
4914     }
4915   }
4916 
4917   static std::shared_ptr<Rdb_key_def> empty = nullptr;
4918 
4919   return empty;
4920 }
4921 
4922 // this method returns the name of the table based on an index id. It acquires
4923 // a read lock on m_rwlock.
4924 const std::string Rdb_ddl_manager::safe_get_table_name(
4925     const GL_INDEX_ID &gl_index_id) {
4926   std::string ret;
4927   mysql_rwlock_rdlock(&m_rwlock);
4928   auto it = m_index_num_to_keydef.find(gl_index_id);
4929   if (it != m_index_num_to_keydef.end()) {
4930     ret = it->second.first;
4931   }
4932   mysql_rwlock_unlock(&m_rwlock);
4933   return ret;
4934 }
4935 
4936 void Rdb_ddl_manager::set_stats(
4937     const std::unordered_map<GL_INDEX_ID, Rdb_index_stats> &stats) {
4938   mysql_rwlock_wrlock(&m_rwlock);
4939   for (const auto &src : stats) {
4940     const auto &keydef = find(src.second.m_gl_index_id);
4941     if (keydef) {
4942       keydef->m_stats = src.second;
4943       m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4944     }
4945   }
4946   mysql_rwlock_unlock(&m_rwlock);
4947 }
4948 
4949 void Rdb_ddl_manager::adjust_stats(
4950     const std::vector<Rdb_index_stats> &new_data,
4951     const std::vector<Rdb_index_stats> &deleted_data) {
4952   mysql_rwlock_wrlock(&m_rwlock);
4953   int i = 0;
4954   for (const auto &data : {new_data, deleted_data}) {
4955     for (const auto &src : data) {
4956       const auto &keydef = find(src.m_gl_index_id);
4957       if (keydef) {
4958         keydef->m_stats.m_distinct_keys_per_prefix.resize(
4959             keydef->get_key_parts());
4960         keydef->m_stats.merge(src, i == 0, keydef->max_storage_fmt_length());
4961         m_stats2store[keydef->m_stats.m_gl_index_id] = keydef->m_stats;
4962       }
4963     }
4964     i++;
4965   }
4966   const bool should_save_stats = !m_stats2store.empty();
4967   mysql_rwlock_unlock(&m_rwlock);
4968   if (should_save_stats) {
4969     // Queue an async persist_stats(false) call to the background thread.
4970     rdb_queue_save_stats_request();
4971   }
4972 }
4973 
4974 void Rdb_ddl_manager::persist_stats(const bool sync) {
4975   mysql_rwlock_wrlock(&m_rwlock);
4976   const auto local_stats2store = std::move(m_stats2store);
4977   m_stats2store.clear();
4978   mysql_rwlock_unlock(&m_rwlock);
4979 
4980   // Persist stats
4981   const std::unique_ptr<rocksdb::WriteBatch> wb = m_dict->begin();
4982   std::vector<Rdb_index_stats> stats;
4983   std::transform(local_stats2store.begin(), local_stats2store.end(),
4984                  std::back_inserter(stats),
4985                  [](const std::pair<GL_INDEX_ID, Rdb_index_stats> &s) {
4986                    return s.second;
4987                  });
4988   m_dict->add_stats(wb.get(), stats);
4989   m_dict->commit(wb.get(), sync);
4990 }
4991 
4992 void Rdb_ddl_manager::set_table_stats(const std::string &tbl_name) {
4993   timespec ts;
4994   clock_gettime(CLOCK_REALTIME, &ts);
4995 
4996   mysql_rwlock_rdlock(&m_rwlock);
4997   const auto &tbl_def = find(tbl_name, false /* needs lock */);
4998   if (tbl_def) {
4999     assert(tbl_def->m_key_count > 0);
5000     // Take the number of rows of the first index as the number of rows of
5001     // the table. This is an estimated value.
5002     tbl_def->m_tbl_stats.set(tbl_def->m_key_count > 0
5003                                  ? tbl_def->m_key_descr_arr[0]->m_stats.m_rows
5004                                  : 0,
5005                              0, ts.tv_sec);
5006   }
5007   mysql_rwlock_unlock(&m_rwlock);
5008 }
5009 
5010 /*
5011   Put table definition of `tbl` into the mapping, and also write it to the
5012   on-disk data dictionary.
5013 */
5014 
5015 int Rdb_ddl_manager::put_and_write(Rdb_tbl_def *const tbl,
5016                                    rocksdb::WriteBatch *const batch) {
5017   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> buf_writer;
5018 
5019   buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
5020 
5021   const std::string &dbname_tablename = tbl->full_tablename();
5022   buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
5023 
5024   int res;
5025   if ((res =
5026            tbl->put_dict(m_dict, m_cf_manager, batch, buf_writer.to_slice()))) {
5027     return res;
5028   }
5029   if ((res = put(tbl))) {
5030     return res;
5031   }
5032   return HA_EXIT_SUCCESS;
5033 }
5034 
5035 /* Return 0 - ok, other value - error */
5036 /* TODO:
5037   This function modifies m_ddl_map and m_index_num_to_keydef.
5038   However, these changes need to be reversed if dict_manager.commit fails
5039   See the discussion here: https://reviews.facebook.net/D35925#inline-259167
5040   Tracked by https://github.com/facebook/mysql-5.6/issues/33
5041 */
5042 int Rdb_ddl_manager::put(Rdb_tbl_def *const tbl, const bool lock) {
5043   const std::string &dbname_tablename = tbl->full_tablename();
5044 
5045   if (lock) mysql_rwlock_wrlock(&m_rwlock);
5046 
5047   // We have to do this find because 'tbl' is not yet in the list.  We need
5048   // to find the one we are replacing ('rec')
5049   const auto &it = m_ddl_map.find(dbname_tablename);
5050   if (it != m_ddl_map.end()) {
5051     delete it->second;
5052     m_ddl_map.erase(it);
5053   }
5054   m_ddl_map.emplace(dbname_tablename, tbl);
5055 
5056   for (uint keyno = 0; keyno < tbl->m_key_count; keyno++) {
5057     m_index_num_to_keydef[tbl->m_key_descr_arr[keyno]->get_gl_index_id()] =
5058         std::make_pair(dbname_tablename, keyno);
5059   }
5060   tbl->check_and_set_read_free_rpl_table();
5061 
5062   if (lock) mysql_rwlock_unlock(&m_rwlock);
5063   return 0;
5064 }
5065 
5066 void Rdb_ddl_manager::remove(Rdb_tbl_def *const tbl,
5067                              rocksdb::WriteBatch *const batch,
5068                              const bool lock) {
5069   if (lock) mysql_rwlock_wrlock(&m_rwlock);
5070 
5071   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> key_writer;
5072   key_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
5073   const std::string &dbname_tablename = tbl->full_tablename();
5074   key_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
5075 
5076   m_dict->delete_key(batch, key_writer.to_slice());
5077 
5078   const auto &it = m_ddl_map.find(dbname_tablename);
5079   if (it != m_ddl_map.end()) {
5080     // Free Rdb_tbl_def
5081     delete it->second;
5082 
5083     m_ddl_map.erase(it);
5084   }
5085 
5086   if (lock) mysql_rwlock_unlock(&m_rwlock);
5087 }
5088 
5089 bool Rdb_ddl_manager::rename(const std::string &from, const std::string &to,
5090                              rocksdb::WriteBatch *const batch) {
5091   Rdb_tbl_def *rec;
5092   Rdb_tbl_def *new_rec;
5093   bool res = true;
5094   Rdb_buf_writer<FN_LEN * 2 + Rdb_key_def::INDEX_NUMBER_SIZE> new_buf_writer;
5095 
5096   mysql_rwlock_wrlock(&m_rwlock);
5097   if (!(rec = find(from, false))) {
5098     mysql_rwlock_unlock(&m_rwlock);
5099     return true;
5100   }
5101 
5102   new_rec = new Rdb_tbl_def(to);
5103 
5104   new_rec->m_key_count = rec->m_key_count;
5105   new_rec->m_auto_incr_val =
5106       rec->m_auto_incr_val.load(std::memory_order_relaxed);
5107   new_rec->m_key_descr_arr = rec->m_key_descr_arr;
5108 
5109   new_rec->m_hidden_pk_val =
5110       rec->m_hidden_pk_val.load(std::memory_order_relaxed);
5111 
5112   new_rec->m_tbl_stats = rec->m_tbl_stats;
5113 
5114   // so that it's not free'd when deleting the old rec
5115   rec->m_key_descr_arr = nullptr;
5116 
5117   // Create a new key
5118   new_buf_writer.write_index(Rdb_key_def::DDL_ENTRY_INDEX_START_NUMBER);
5119 
5120   const std::string &dbname_tablename = new_rec->full_tablename();
5121   new_buf_writer.write(dbname_tablename.c_str(), dbname_tablename.size());
5122 
5123   // Create a key to add
5124   if (!new_rec->put_dict(m_dict, m_cf_manager, batch,
5125                          new_buf_writer.to_slice())) {
5126     remove(rec, batch, false);
5127     put(new_rec, false);
5128     res = false;  // ok
5129   }
5130 
5131   mysql_rwlock_unlock(&m_rwlock);
5132   return res;
5133 }
5134 
5135 void Rdb_ddl_manager::cleanup() {
5136   for (const auto &kv : m_ddl_map) {
5137     delete kv.second;
5138   }
5139   m_ddl_map.clear();
5140 
5141   mysql_rwlock_destroy(&m_rwlock);
5142   m_sequence.cleanup();
5143 }
5144 
5145 int Rdb_ddl_manager::scan_for_tables(Rdb_tables_scanner *const tables_scanner) {
5146   int ret;
5147 
5148   assert(tables_scanner != nullptr);
5149 
5150   // This method should NOT accquire dict_manager lock and
5151   // cf_manager lock in order to prevent deadlocks.
5152   mysql_rwlock_rdlock(&m_rwlock);
5153 
5154   ret = 0;
5155 
5156   for (const auto &kv : m_ddl_map) {
5157     ret = tables_scanner->add_table(kv.second);
5158     if (ret) break;
5159   }
5160 
5161   mysql_rwlock_unlock(&m_rwlock);
5162   return ret;
5163 }
5164 
5165 bool Rdb_dict_manager::init(rocksdb::TransactionDB *const rdb_dict,
5166                             Rdb_cf_manager *const cf_manager,
5167                             const my_bool enable_remove_orphaned_dropped_cfs) {
5168   assert(rdb_dict != nullptr);
5169   assert(cf_manager != nullptr);
5170 
5171   mysql_mutex_init(0, &m_mutex, MY_MUTEX_INIT_FAST);
5172 
5173   m_db = rdb_dict;
5174 
5175   // It is safe to get raw pointers here since:
5176   // 1. System CF and default CF cannot be dropped
5177   // 2. cf_manager outlives dict_manager
5178   m_system_cfh =
5179       cf_manager->get_or_create_cf(m_db, DEFAULT_SYSTEM_CF_NAME, true).get();
5180   rocksdb::ColumnFamilyHandle *default_cfh =
5181       cf_manager->get_cf(DEFAULT_CF_NAME).get();
5182 
5183   // System CF and default CF should be initialized
5184   if (m_system_cfh == nullptr || default_cfh == nullptr) {
5185     return HA_EXIT_FAILURE;
5186   }
5187 
5188   rdb_netbuf_store_index(m_key_buf_max_index_id, Rdb_key_def::MAX_INDEX_ID);
5189 
5190   m_key_slice_max_index_id =
5191       rocksdb::Slice(reinterpret_cast<char *>(m_key_buf_max_index_id),
5192                      Rdb_key_def::INDEX_NUMBER_SIZE);
5193 
5194   resume_drop_indexes();
5195   rollback_ongoing_index_creation();
5196 
5197   // Initialize system CF and default CF flags
5198   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5199   rocksdb::WriteBatch *const batch = wb.get();
5200 
5201   add_cf_flags(batch, m_system_cfh->GetID(), 0);
5202   add_cf_flags(batch, default_cfh->GetID(), 0);
5203   commit(batch);
5204 
5205   if (add_missing_cf_flags(cf_manager)) {
5206     return HA_EXIT_FAILURE;
5207   }
5208 
5209   if (remove_orphaned_dropped_cfs(cf_manager,
5210                                   enable_remove_orphaned_dropped_cfs)) {
5211     return HA_EXIT_FAILURE;
5212   }
5213 
5214   return HA_EXIT_SUCCESS;
5215 }
5216 
5217 std::unique_ptr<rocksdb::WriteBatch> Rdb_dict_manager::begin() const {
5218   return std::unique_ptr<rocksdb::WriteBatch>(new rocksdb::WriteBatch);
5219 }
5220 
5221 void Rdb_dict_manager::put_key(rocksdb::WriteBatchBase *const batch,
5222                                const rocksdb::Slice &key,
5223                                const rocksdb::Slice &value) const {
5224   batch->Put(m_system_cfh, key, value);
5225 }
5226 
5227 rocksdb::Status Rdb_dict_manager::get_value(const rocksdb::Slice &key,
5228                                             std::string *const value) const {
5229   rocksdb::ReadOptions options;
5230   options.total_order_seek = true;
5231   return m_db->Get(options, m_system_cfh, key, value);
5232 }
5233 
5234 void Rdb_dict_manager::delete_key(rocksdb::WriteBatchBase *batch,
5235                                   const rocksdb::Slice &key) const {
5236   batch->Delete(m_system_cfh, key);
5237 }
5238 
5239 rocksdb::Iterator *Rdb_dict_manager::new_iterator() const {
5240   /* Reading data dictionary should always skip bloom filter */
5241   rocksdb::ReadOptions read_options;
5242   read_options.total_order_seek = true;
5243   return m_db->NewIterator(read_options, m_system_cfh);
5244 }
5245 
5246 int Rdb_dict_manager::commit(rocksdb::WriteBatch *const batch,
5247                              const bool sync) const {
5248   if (!batch) return HA_ERR_ROCKSDB_COMMIT_FAILED;
5249   int res = HA_EXIT_SUCCESS;
5250   rocksdb::WriteOptions options;
5251   options.sync = sync;
5252   rocksdb::TransactionDBWriteOptimizations optimize;
5253   optimize.skip_concurrency_control = true;
5254   rocksdb::Status s = m_db->Write(options, optimize, batch);
5255   res = !s.ok();  // we return true when something failed
5256   if (res) {
5257     rdb_handle_io_error(s, RDB_IO_ERROR_DICT_COMMIT);
5258   }
5259   batch->Clear();
5260   return res;
5261 }
5262 
5263 void Rdb_dict_manager::dump_index_id(uchar *const netbuf,
5264                                      Rdb_key_def::DATA_DICT_TYPE dict_type,
5265                                      const GL_INDEX_ID &gl_index_id) {
5266   rdb_netbuf_store_uint32(netbuf, dict_type);
5267   rdb_netbuf_store_uint32(netbuf + Rdb_key_def::INDEX_NUMBER_SIZE,
5268                           gl_index_id.cf_id);
5269   rdb_netbuf_store_uint32(netbuf + 2 * Rdb_key_def::INDEX_NUMBER_SIZE,
5270                           gl_index_id.index_id);
5271 }
5272 
5273 void Rdb_dict_manager::delete_with_prefix(
5274     rocksdb::WriteBatch *const batch, Rdb_key_def::DATA_DICT_TYPE dict_type,
5275     const GL_INDEX_ID &gl_index_id) const {
5276   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5277   dump_index_id(&key_writer, dict_type, gl_index_id);
5278 
5279   delete_key(batch, key_writer.to_slice());
5280 }
5281 
5282 void Rdb_dict_manager::add_or_update_index_cf_mapping(
5283     rocksdb::WriteBatch *batch, struct Rdb_index_info *const index_info) const {
5284   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5285   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO,
5286                 index_info->m_gl_index_id);
5287 
5288   Rdb_buf_writer<256> value_writer;
5289 
5290   value_writer.write_uint16(Rdb_key_def::INDEX_INFO_VERSION_LATEST);
5291   value_writer.write_byte(index_info->m_index_type);
5292   value_writer.write_uint16(index_info->m_kv_version);
5293   value_writer.write_uint32(index_info->m_index_flags);
5294   value_writer.write_uint64(index_info->m_ttl_duration);
5295 
5296   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5297 }
5298 
5299 void Rdb_dict_manager::add_cf_flags(rocksdb::WriteBatch *const batch,
5300                                     const uint32_t cf_id,
5301                                     const uint32_t cf_flags) const {
5302   assert(batch != nullptr);
5303 
5304   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
5305   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
5306   key_writer.write_uint32(cf_id);
5307 
5308   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5309       value_writer;
5310   value_writer.write_uint16(Rdb_key_def::CF_DEFINITION_VERSION);
5311   value_writer.write_uint32(cf_flags);
5312 
5313   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5314 }
5315 
5316 void Rdb_dict_manager::delete_cf_flags(rocksdb::WriteBatch *const batch,
5317                                        const uint &cf_id) const {
5318   assert(batch != nullptr);
5319 
5320   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5321 
5322   rdb_netbuf_store_uint32(key_buf, Rdb_key_def::CF_DEFINITION);
5323   rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5324   const rocksdb::Slice key =
5325       rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5326 
5327   delete_key(batch, key);
5328 }
5329 
5330 void Rdb_dict_manager::delete_index_info(rocksdb::WriteBatch *batch,
5331                                          const GL_INDEX_ID &gl_index_id) const {
5332   delete_with_prefix(batch, Rdb_key_def::INDEX_INFO, gl_index_id);
5333   delete_with_prefix(batch, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5334   delete_with_prefix(batch, Rdb_key_def::AUTO_INC, gl_index_id);
5335 }
5336 
5337 bool Rdb_dict_manager::get_index_info(
5338     const GL_INDEX_ID &gl_index_id,
5339     struct Rdb_index_info *const index_info) const {
5340   if (index_info) {
5341     index_info->m_gl_index_id = gl_index_id;
5342   }
5343 
5344   bool found = false;
5345   bool error = false;
5346   std::string value;
5347   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5348   dump_index_id(&key_writer, Rdb_key_def::INDEX_INFO, gl_index_id);
5349 
5350   const rocksdb::Status &status = get_value(key_writer.to_slice(), &value);
5351   if (status.ok()) {
5352     if (!index_info) {
5353       return true;
5354     }
5355 
5356     const uchar *const val = (const uchar *)value.c_str();
5357     const uchar *ptr = val;
5358     index_info->m_index_dict_version = rdb_netbuf_to_uint16(val);
5359     ptr += RDB_SIZEOF_INDEX_INFO_VERSION;
5360 
5361     switch (index_info->m_index_dict_version) {
5362       case Rdb_key_def::INDEX_INFO_VERSION_FIELD_FLAGS:
5363         /* Sanity check to prevent reading bogus TTL record. */
5364         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
5365                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
5366                                 RDB_SIZEOF_INDEX_FLAGS +
5367                                 ROCKSDB_SIZEOF_TTL_RECORD) {
5368           error = true;
5369           break;
5370         }
5371         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
5372         ptr += RDB_SIZEOF_INDEX_TYPE;
5373         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
5374         ptr += RDB_SIZEOF_KV_VERSION;
5375         index_info->m_index_flags = rdb_netbuf_to_uint32(ptr);
5376         ptr += RDB_SIZEOF_INDEX_FLAGS;
5377         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
5378         found = true;
5379         break;
5380 
5381       case Rdb_key_def::INDEX_INFO_VERSION_TTL:
5382         /* Sanity check to prevent reading bogus into TTL record. */
5383         if (value.size() != RDB_SIZEOF_INDEX_INFO_VERSION +
5384                                 RDB_SIZEOF_INDEX_TYPE + RDB_SIZEOF_KV_VERSION +
5385                                 ROCKSDB_SIZEOF_TTL_RECORD) {
5386           error = true;
5387           break;
5388         }
5389         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
5390         ptr += RDB_SIZEOF_INDEX_TYPE;
5391         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
5392         ptr += RDB_SIZEOF_KV_VERSION;
5393         index_info->m_ttl_duration = rdb_netbuf_to_uint64(ptr);
5394         if ((index_info->m_kv_version ==
5395              Rdb_key_def::PRIMARY_FORMAT_VERSION_TTL) &&
5396             index_info->m_ttl_duration > 0) {
5397           index_info->m_index_flags = Rdb_key_def::TTL_FLAG;
5398         }
5399         found = true;
5400         break;
5401 
5402       case Rdb_key_def::INDEX_INFO_VERSION_VERIFY_KV_FORMAT:
5403       case Rdb_key_def::INDEX_INFO_VERSION_GLOBAL_ID:
5404         index_info->m_index_type = rdb_netbuf_to_byte(ptr);
5405         ptr += RDB_SIZEOF_INDEX_TYPE;
5406         index_info->m_kv_version = rdb_netbuf_to_uint16(ptr);
5407         found = true;
5408         break;
5409 
5410       default:
5411         error = true;
5412         break;
5413     }
5414 
5415     switch (index_info->m_index_type) {
5416       case Rdb_key_def::INDEX_TYPE_PRIMARY:
5417       case Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY: {
5418         error = index_info->m_kv_version >
5419                 Rdb_key_def::PRIMARY_FORMAT_VERSION_LATEST;
5420         break;
5421       }
5422       case Rdb_key_def::INDEX_TYPE_SECONDARY:
5423         error = index_info->m_kv_version >
5424                 Rdb_key_def::SECONDARY_FORMAT_VERSION_LATEST;
5425         break;
5426       default:
5427         error = true;
5428         break;
5429     }
5430   }
5431 
5432   if (error) {
5433     // NO_LINT_DEBUG
5434     sql_print_error(
5435         "RocksDB: Found invalid key version number (%u, %u, %u, %llu) "
5436         "from data dictionary. This should never happen "
5437         "and it may be a bug.",
5438         index_info->m_index_dict_version, index_info->m_index_type,
5439         index_info->m_kv_version, index_info->m_ttl_duration);
5440     abort();
5441   }
5442 
5443   return found;
5444 }
5445 
5446 bool Rdb_dict_manager::get_cf_flags(const uint32_t cf_id,
5447                                     uint32_t *const cf_flags) const {
5448   assert(cf_flags != nullptr);
5449 
5450   bool found = false;
5451   std::string value;
5452   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 2> key_writer;
5453 
5454   key_writer.write_uint32(Rdb_key_def::CF_DEFINITION);
5455   key_writer.write_uint32(cf_id);
5456 
5457   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5458 
5459   if (status.ok()) {
5460     const uchar *val = (const uchar *)value.c_str();
5461     assert(val);
5462 
5463     const uint16_t version = rdb_netbuf_to_uint16(val);
5464 
5465     if (version == Rdb_key_def::CF_DEFINITION_VERSION) {
5466       *cf_flags = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5467       found = true;
5468     }
5469   }
5470 
5471   return found;
5472 }
5473 
5474 void Rdb_dict_manager::add_dropped_cf(rocksdb::WriteBatch *const batch,
5475                                       const uint &cf_id) const {
5476   assert(batch != nullptr);
5477 
5478   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5479   uchar value_buf[Rdb_key_def::VERSION_SIZE] = {0};
5480   rdb_netbuf_store_uint32(key_buf, Rdb_key_def::DROPPED_CF);
5481   rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5482   const rocksdb::Slice key =
5483       rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5484 
5485   rdb_netbuf_store_uint16(value_buf, Rdb_key_def::DROPPED_CF_VERSION);
5486   const rocksdb::Slice value =
5487       rocksdb::Slice(reinterpret_cast<char *>(value_buf), sizeof(value_buf));
5488   batch->Put(m_system_cfh, key, value);
5489 }
5490 
5491 bool Rdb_dict_manager::get_dropped_cf(const uint &cf_id) const {
5492   std::string value;
5493   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5494 
5495   rdb_netbuf_store_uint32(key_buf, Rdb_key_def::DROPPED_CF);
5496   rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5497 
5498   const rocksdb::Slice key =
5499       rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5500   const rocksdb::Status status = get_value(key, &value);
5501 
5502   return status.ok();
5503 }
5504 
5505 void Rdb_dict_manager::delete_dropped_cf_and_flags(
5506     rocksdb::WriteBatch *const batch, const uint &cf_id) const {
5507   assert(batch != nullptr);
5508   delete_dropped_cf(batch, cf_id);
5509   delete_cf_flags(batch, cf_id);
5510 }
5511 
5512 void Rdb_dict_manager::delete_dropped_cf(rocksdb::WriteBatch *const batch,
5513                                          const uint &cf_id) const {
5514   assert(batch != nullptr);
5515 
5516   uchar key_buf[Rdb_key_def::INDEX_NUMBER_SIZE * 2] = {0};
5517 
5518   rdb_netbuf_store_uint32(key_buf, Rdb_key_def::DROPPED_CF);
5519   rdb_netbuf_store_uint32(key_buf + Rdb_key_def::INDEX_NUMBER_SIZE, cf_id);
5520   const rocksdb::Slice key =
5521       rocksdb::Slice(reinterpret_cast<char *>(key_buf), sizeof(key_buf));
5522 
5523   delete_key(batch, key);
5524 }
5525 
5526 void Rdb_dict_manager::get_all_dropped_cfs(
5527     std::unordered_set<uint32> *dropped_cf_ids) const {
5528   uchar dropped_cf_buf[Rdb_key_def::INDEX_NUMBER_SIZE];
5529   rdb_netbuf_store_uint32(dropped_cf_buf, Rdb_key_def::DROPPED_CF);
5530   const rocksdb::Slice dropped_cf_slice(
5531       reinterpret_cast<char *>(dropped_cf_buf), Rdb_key_def::INDEX_NUMBER_SIZE);
5532 
5533   rocksdb::Iterator *it = new_iterator();
5534   for (it->Seek(dropped_cf_slice); it->Valid(); it->Next()) {
5535     rocksdb::Slice key = it->key();
5536     const uchar *const ptr = (const uchar *)key.data();
5537 
5538     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 2 ||
5539         rdb_netbuf_to_uint32(ptr) != Rdb_key_def::DROPPED_CF) {
5540       break;
5541     }
5542 
5543     uint32 cf_id = rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5544     dropped_cf_ids->insert(cf_id);
5545   }
5546 
5547   delete it;
5548 }
5549 
5550 /*
5551   Returning index ids that were marked as deleted (via DROP TABLE) but
5552   still not removed by drop_index_thread yet, or indexes that are marked as
5553   ongoing creation.
5554  */
5555 void Rdb_dict_manager::get_ongoing_index_operation(
5556     std::unordered_set<GL_INDEX_ID> *gl_index_ids,
5557     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5558   assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5559               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5560 
5561   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE> index_writer;
5562   index_writer.write_uint32(dd_type);
5563   const rocksdb::Slice index_slice = index_writer.to_slice();
5564 
5565   rocksdb::Iterator *it = new_iterator();
5566   for (it->Seek(index_slice); it->Valid(); it->Next()) {
5567     rocksdb::Slice key = it->key();
5568     const uchar *const ptr = (const uchar *)key.data();
5569 
5570     /*
5571       Ongoing drop/create index operations require key to be of the form:
5572       dd_type + cf_id + index_id (== INDEX_NUMBER_SIZE * 3)
5573 
5574       This may need to be changed in the future if we want to process a new
5575       ddl_type with different format.
5576     */
5577     if (key.size() != Rdb_key_def::INDEX_NUMBER_SIZE * 3 ||
5578         rdb_netbuf_to_uint32(ptr) != dd_type) {
5579       break;
5580     }
5581 
5582     // We don't check version right now since currently we always store only
5583     // Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION = 1 as a value.
5584     // If increasing version number, we need to add version check logic here.
5585     GL_INDEX_ID gl_index_id;
5586     gl_index_id.cf_id =
5587         rdb_netbuf_to_uint32(ptr + Rdb_key_def::INDEX_NUMBER_SIZE);
5588     gl_index_id.index_id =
5589         rdb_netbuf_to_uint32(ptr + 2 * Rdb_key_def::INDEX_NUMBER_SIZE);
5590     gl_index_ids->insert(gl_index_id);
5591   }
5592   delete it;
5593 }
5594 
5595 /*
5596   If mysqld reboots during create table, a column family can be
5597   created without cf flags. This method adds missing cf flags. It
5598   only should be called during mysqld startup.
5599  */
5600 int Rdb_dict_manager::add_missing_cf_flags(
5601     Rdb_cf_manager *const cf_manager) const {
5602   for (const auto &cf_name : cf_manager->get_cf_names()) {
5603     std::shared_ptr<rocksdb::ColumnFamilyHandle> cfh =
5604         cf_manager->get_cf(cf_name);
5605 
5606     if (cf_manager->create_cf_flags_if_needed(this, cfh->GetID(), cf_name)) {
5607       return HA_EXIT_FAILURE;
5608     }
5609   }
5610 
5611   return HA_EXIT_SUCCESS;
5612 }
5613 
5614 /*
5615   If mysqld reboots during dropping a column family, it can happen
5616   that the column family is deleted from RocksDB, but its id is
5617   in the list of cf ids that are to be dropped.
5618   This method cleans up these orphaned cf ids. It only should be
5619   called during mysqld startup.
5620  */
5621 int Rdb_dict_manager::remove_orphaned_dropped_cfs(
5622     Rdb_cf_manager *const cf_manager,
5623     const my_bool &enable_remove_orphaned_dropped_cfs) const {
5624   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5625   rocksdb::WriteBatch *const batch = wb.get();
5626 
5627   std::unordered_set<uint32> dropped_cf_ids;
5628   get_all_dropped_cfs(&dropped_cf_ids);
5629   for (const auto cf_id : dropped_cf_ids) {
5630     if (!cf_manager->get_cf(cf_id)) {
5631       // NO_LINT_DEBUG
5632       sql_print_warning(
5633           "RocksDB: Column family with id %u doesn't exist in "
5634           "cf manager, but it is listed to be dropped",
5635           cf_id);
5636 
5637       if (enable_remove_orphaned_dropped_cfs) {
5638         delete_dropped_cf_and_flags(batch, cf_id);
5639       }
5640     }
5641   }
5642 
5643   commit(batch);
5644   return HA_EXIT_SUCCESS;
5645 }
5646 
5647 /*
5648   Returning true if index_id is create/delete ongoing (undergoing creation or
5649   marked as deleted via DROP TABLE but drop_index_thread has not wiped yet)
5650   or not.
5651  */
5652 bool Rdb_dict_manager::is_index_operation_ongoing(
5653     const GL_INDEX_ID &gl_index_id, Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5654   assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5655               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5656 
5657   bool found = false;
5658   std::string value;
5659   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5660   dump_index_id(&key_writer, dd_type, gl_index_id);
5661 
5662   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5663   if (status.ok()) {
5664     found = true;
5665   }
5666   return found;
5667 }
5668 
5669 /*
5670   Adding index_id to data dictionary so that the index id is removed
5671   by drop_index_thread, or to track online index creation.
5672  */
5673 void Rdb_dict_manager::start_ongoing_index_operation(
5674     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5675     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5676   assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5677               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5678 
5679   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5680   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE> value_writer;
5681 
5682   dump_index_id(&key_writer, dd_type, gl_index_id);
5683 
5684   // version as needed
5685   if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5686     value_writer.write_uint16(Rdb_key_def::DDL_DROP_INDEX_ONGOING_VERSION);
5687   } else {
5688     value_writer.write_uint16(Rdb_key_def::DDL_CREATE_INDEX_ONGOING_VERSION);
5689   }
5690 
5691   batch->Put(m_system_cfh, key_writer.to_slice(), value_writer.to_slice());
5692 }
5693 
5694 /*
5695   Removing index_id from data dictionary to confirm drop_index_thread
5696   completed dropping entire key/values of the index_id
5697  */
5698 void Rdb_dict_manager::end_ongoing_index_operation(
5699     rocksdb::WriteBatch *const batch, const GL_INDEX_ID &gl_index_id,
5700     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5701   assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5702               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5703 
5704   delete_with_prefix(batch, dd_type, gl_index_id);
5705 }
5706 
5707 /*
5708   Returning true if there is no target index ids to be removed
5709   by drop_index_thread
5710  */
5711 bool Rdb_dict_manager::is_drop_index_empty() const {
5712   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5713   get_ongoing_drop_indexes(&gl_index_ids);
5714   return gl_index_ids.empty();
5715 }
5716 
5717 /*
5718   This function is supposed to be called by DROP TABLE. Logging messages
5719   that dropping indexes started, and adding data dictionary so that
5720   all associated indexes to be removed
5721  */
5722 void Rdb_dict_manager::add_drop_table(
5723     std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5724     rocksdb::WriteBatch *const batch) const {
5725   std::unordered_set<GL_INDEX_ID> dropped_index_ids;
5726   for (uint32 i = 0; i < n_keys; i++) {
5727     dropped_index_ids.insert(key_descr[i]->get_gl_index_id());
5728   }
5729 
5730   add_drop_index(dropped_index_ids, batch);
5731 }
5732 
5733 /*
5734   Called during inplace index drop operations. Logging messages
5735   that dropping indexes started, and adding data dictionary so that
5736   all associated indexes to be removed
5737  */
5738 void Rdb_dict_manager::add_drop_index(
5739     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5740     rocksdb::WriteBatch *const batch) const {
5741   for (const auto &gl_index_id : gl_index_ids) {
5742     log_start_drop_index(gl_index_id, "Begin");
5743     start_drop_index(batch, gl_index_id);
5744   }
5745 }
5746 
5747 /*
5748   Called during inplace index creation operations. Logging messages
5749   that adding indexes started, and updates data dictionary with all associated
5750   indexes to be added.
5751  */
5752 void Rdb_dict_manager::add_create_index(
5753     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5754     rocksdb::WriteBatch *const batch) const {
5755   for (const auto &gl_index_id : gl_index_ids) {
5756     // NO_LINT_DEBUG
5757     sql_print_information("RocksDB: Begin index creation (%u,%u)",
5758                           gl_index_id.cf_id, gl_index_id.index_id);
5759     start_create_index(batch, gl_index_id);
5760   }
5761 }
5762 
5763 /*
5764   This function is supposed to be called by drop_index_thread, when it
5765   finished dropping any index, or at the completion of online index creation.
5766  */
5767 void Rdb_dict_manager::finish_indexes_operation(
5768     const std::unordered_set<GL_INDEX_ID> &gl_index_ids,
5769     Rdb_key_def::DATA_DICT_TYPE dd_type) const {
5770   assert(dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING ||
5771               dd_type == Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5772 
5773   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5774   rocksdb::WriteBatch *const batch = wb.get();
5775 
5776   std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5777   get_ongoing_create_indexes(&incomplete_create_indexes);
5778 
5779   for (const auto &gl_index_id : gl_index_ids) {
5780     if (is_index_operation_ongoing(gl_index_id, dd_type)) {
5781       end_ongoing_index_operation(batch, gl_index_id, dd_type);
5782 
5783       /*
5784         Remove the corresponding incomplete create indexes from data
5785         dictionary as well
5786       */
5787       if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5788         if (incomplete_create_indexes.count(gl_index_id)) {
5789           end_ongoing_index_operation(batch, gl_index_id,
5790                                       Rdb_key_def::DDL_CREATE_INDEX_ONGOING);
5791         }
5792       }
5793     }
5794 
5795     if (dd_type == Rdb_key_def::DDL_DROP_INDEX_ONGOING) {
5796       delete_index_info(batch, gl_index_id);
5797     }
5798   }
5799   commit(batch);
5800 }
5801 
5802 /*
5803   This function is supposed to be called when initializing
5804   Rdb_dict_manager (at startup). If there is any index ids that are
5805   drop ongoing, printing out messages for diagnostics purposes.
5806  */
5807 void Rdb_dict_manager::resume_drop_indexes() const {
5808   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5809   get_ongoing_drop_indexes(&gl_index_ids);
5810 
5811   uint max_index_id_in_dict = 0;
5812   get_max_index_id(&max_index_id_in_dict);
5813 
5814   for (const auto &gl_index_id : gl_index_ids) {
5815     log_start_drop_index(gl_index_id, "Resume");
5816     if (max_index_id_in_dict < gl_index_id.index_id) {
5817       sql_print_error(
5818           "RocksDB: Found max index id %u from data dictionary "
5819           "but also found dropped index id (%u,%u) from drop_index "
5820           "dictionary. This should never happen and is possibly a "
5821           "bug.",
5822           max_index_id_in_dict, gl_index_id.cf_id, gl_index_id.index_id);
5823       abort();
5824     }
5825   }
5826 }
5827 
5828 void Rdb_dict_manager::rollback_ongoing_index_creation() const {
5829   std::unordered_set<GL_INDEX_ID> gl_index_ids;
5830 
5831   get_ongoing_create_indexes(&gl_index_ids);
5832   rollback_ongoing_index_creation(gl_index_ids);
5833 }
5834 
5835 void Rdb_dict_manager::rollback_ongoing_index_creation(
5836     const std::unordered_set<GL_INDEX_ID> &gl_index_ids) const {
5837   const std::unique_ptr<rocksdb::WriteBatch> wb = begin();
5838   rocksdb::WriteBatch *const batch = wb.get();
5839 
5840   for (const auto &gl_index_id : gl_index_ids) {
5841     // NO_LINT_DEBUG
5842     sql_print_information("RocksDB: Removing incomplete create index (%u,%u)",
5843                           gl_index_id.cf_id, gl_index_id.index_id);
5844 
5845     start_drop_index(batch, gl_index_id);
5846   }
5847 
5848   commit(batch);
5849 }
5850 
5851 void Rdb_dict_manager::log_start_drop_table(
5852     const std::shared_ptr<Rdb_key_def> *const key_descr, const uint32 n_keys,
5853     const char *const log_action) const {
5854   for (uint32 i = 0; i < n_keys; i++) {
5855     log_start_drop_index(key_descr[i]->get_gl_index_id(), log_action);
5856   }
5857 }
5858 
5859 void Rdb_dict_manager::log_start_drop_index(GL_INDEX_ID gl_index_id,
5860                                             const char *log_action) const {
5861   struct Rdb_index_info index_info;
5862   if (!get_index_info(gl_index_id, &index_info)) {
5863     /*
5864       If we don't find the index info, it could be that it's because it was a
5865       partially created index that isn't in the data dictionary yet that needs
5866       to be rolled back.
5867     */
5868     std::unordered_set<GL_INDEX_ID> incomplete_create_indexes;
5869     get_ongoing_create_indexes(&incomplete_create_indexes);
5870 
5871     if (!incomplete_create_indexes.count(gl_index_id)) {
5872       /* If it's not a partially created index, something is very wrong. */
5873       sql_print_error(
5874           "RocksDB: Failed to get column family info "
5875           "from index id (%u,%u). MyRocks data dictionary may "
5876           "get corrupted.",
5877           gl_index_id.cf_id, gl_index_id.index_id);
5878       abort();
5879     }
5880   }
5881 }
5882 
5883 bool Rdb_dict_manager::get_max_index_id(uint32_t *const index_id) const {
5884   bool found = false;
5885   std::string value;
5886 
5887   const rocksdb::Status status = get_value(m_key_slice_max_index_id, &value);
5888   if (status.ok()) {
5889     const uchar *const val = (const uchar *)value.c_str();
5890     const uint16_t version = rdb_netbuf_to_uint16(val);
5891     if (version == Rdb_key_def::MAX_INDEX_ID_VERSION) {
5892       *index_id = rdb_netbuf_to_uint32(val + Rdb_key_def::VERSION_SIZE);
5893       found = true;
5894     }
5895   }
5896   return found;
5897 }
5898 
5899 bool Rdb_dict_manager::update_max_index_id(rocksdb::WriteBatch *const batch,
5900                                            const uint32_t index_id) const {
5901   assert(batch != nullptr);
5902 
5903   uint32_t old_index_id = -1;
5904   if (get_max_index_id(&old_index_id)) {
5905     if (old_index_id > index_id) {
5906       sql_print_error(
5907           "RocksDB: Found max index id %u from data dictionary "
5908           "but trying to update to older value %u. This should "
5909           "never happen and possibly a bug.",
5910           old_index_id, index_id);
5911       return true;
5912     }
5913   }
5914 
5915   Rdb_buf_writer<Rdb_key_def::VERSION_SIZE + Rdb_key_def::INDEX_NUMBER_SIZE>
5916       value_writer;
5917   value_writer.write_uint16(Rdb_key_def::MAX_INDEX_ID_VERSION);
5918   value_writer.write_uint32(index_id);
5919 
5920   batch->Put(m_system_cfh, m_key_slice_max_index_id, value_writer.to_slice());
5921   return false;
5922 }
5923 
5924 void Rdb_dict_manager::add_stats(
5925     rocksdb::WriteBatch *const batch,
5926     const std::vector<Rdb_index_stats> &stats) const {
5927   assert(batch != nullptr);
5928 
5929   for (const auto &it : stats) {
5930     Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5931     dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, it.m_gl_index_id);
5932 
5933     // IndexStats::materialize takes complete care of serialization including
5934     // storing the version
5935     const auto value =
5936         Rdb_index_stats::materialize(std::vector<Rdb_index_stats>{it});
5937 
5938     batch->Put(m_system_cfh, key_writer.to_slice(), value);
5939   }
5940 }
5941 
5942 Rdb_index_stats Rdb_dict_manager::get_stats(GL_INDEX_ID gl_index_id) const {
5943   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5944   dump_index_id(&key_writer, Rdb_key_def::INDEX_STATISTICS, gl_index_id);
5945 
5946   std::string value;
5947   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5948   if (status.ok()) {
5949     std::vector<Rdb_index_stats> v;
5950     // unmaterialize checks if the version matches
5951     if (Rdb_index_stats::unmaterialize(value, &v) == 0 && v.size() == 1) {
5952       return v[0];
5953     }
5954   }
5955 
5956   return Rdb_index_stats();
5957 }
5958 
5959 rocksdb::Status Rdb_dict_manager::put_auto_incr_val(
5960     rocksdb::WriteBatchBase *batch, GL_INDEX_ID gl_index_id, ulonglong val,
5961     bool overwrite) const {
5962   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5963   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5964 
5965   // Value is constructed by storing the version and the value.
5966   Rdb_buf_writer<RDB_SIZEOF_AUTO_INCREMENT_VERSION +
5967                  ROCKSDB_SIZEOF_AUTOINC_VALUE>
5968       value_writer;
5969   value_writer.write_uint16(Rdb_key_def::AUTO_INCREMENT_VERSION);
5970   value_writer.write_uint64(val);
5971 
5972   if (overwrite) {
5973     return batch->Put(m_system_cfh, key_writer.to_slice(),
5974                       value_writer.to_slice());
5975   }
5976   return batch->Merge(m_system_cfh, key_writer.to_slice(),
5977                       value_writer.to_slice());
5978 }
5979 
5980 bool Rdb_dict_manager::get_auto_incr_val(const GL_INDEX_ID &gl_index_id,
5981                                          ulonglong *new_val) const {
5982   Rdb_buf_writer<Rdb_key_def::INDEX_NUMBER_SIZE * 3> key_writer;
5983   dump_index_id(&key_writer, Rdb_key_def::AUTO_INC, gl_index_id);
5984 
5985   std::string value;
5986   const rocksdb::Status status = get_value(key_writer.to_slice(), &value);
5987 
5988   if (status.ok()) {
5989     const uchar *const val = reinterpret_cast<const uchar *>(value.data());
5990 
5991     if (rdb_netbuf_to_uint16(val) <= Rdb_key_def::AUTO_INCREMENT_VERSION) {
5992       *new_val = rdb_netbuf_to_uint64(val + RDB_SIZEOF_AUTO_INCREMENT_VERSION);
5993       return true;
5994     }
5995   }
5996   return false;
5997 }
5998 
5999 uint Rdb_seq_generator::get_and_update_next_number(
6000     Rdb_dict_manager *const dict) {
6001   assert(dict != nullptr);
6002 
6003   uint res;
6004   RDB_MUTEX_LOCK_CHECK(m_mutex);
6005 
6006   res = m_next_number++;
6007 
6008   const std::unique_ptr<rocksdb::WriteBatch> wb = dict->begin();
6009   rocksdb::WriteBatch *const batch = wb.get();
6010 
6011   assert(batch != nullptr);
6012   dict->update_max_index_id(batch, res);
6013   dict->commit(batch);
6014 
6015   RDB_MUTEX_UNLOCK_CHECK(m_mutex);
6016 
6017   return res;
6018 }
6019 
6020 }  // namespace myrocks
6021