1 /*
2    Copyright (c) 2015, Facebook, Inc.
3 
4    This program is f
5    i the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16 
17 /* This C++ file's header file */
18 #include "./rdb_converter.h"
19 
20 /* Standard C++ header files */
21 #include <algorithm>
22 #include <map>
23 #include <string>
24 #include <vector>
25 
26 /* MySQL header files */
27 #include "log.h"
28 #include "my_stacktrace.h"
29 #include "sql_array.h"
30 
31 /* MyRocks header files */
32 #include "./ha_rocksdb.h"
33 #include "./ha_rocksdb_proto.h"
34 #include "./rdb_datadic.h"
35 #include "./rdb_psi.h"
36 #include "./rdb_utils.h"
37 
38 namespace myrocks {
39 
dbug_modify_key_varchar8(String * on_disk_rec)40 void dbug_modify_key_varchar8(String *on_disk_rec) {
41   std::string res;
42   // The key starts with index number
43   res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE);
44 
45   // Then, a mem-comparable form of a varchar(8) value.
46   res.append("ABCDE\0\0\0\xFC", 9);
47   on_disk_rec->length(0);
48   on_disk_rec->append(res.data(), res.size());
49 }
50 
51 /*
52   Convert field from rocksdb storage format into Mysql Record format
53   @param    buf         OUT          start memory to fill converted data
54   @param    offset      IN/OUT       decoded data is stored in buf + offset
55   @param    table       IN           current table
56   @param    field       IN           current field
57   @param    reader      IN           rocksdb value slice reader
58   @param    decode      IN           whether to decode current field
59   @return
60     0      OK
61     other  HA_ERR error code (can be SE-specific)
62 */
decode(uchar * const buf,TABLE * table,Rdb_field_encoder * field_dec,Rdb_string_reader * reader,bool decode,bool is_null)63 int Rdb_convert_to_record_value_decoder::decode(uchar *const buf, TABLE *table,
64                                                 Rdb_field_encoder *field_dec,
65                                                 Rdb_string_reader *reader,
66                                                 bool decode, bool is_null) {
67   int err = HA_EXIT_SUCCESS;
68   auto ptr = buf + field_dec->m_field_offset;
69   if (is_null) {
70     if (decode && field_dec->maybe_null()) {
71       // This sets the NULL-bit of this record
72       buf[field_dec->m_field_null_offset] |= field_dec->m_field_null_mask;
73 
74       /*
75         Besides that, set the field value to default value. CHECKSUM TABLE
76         depends on this.
77       */
78       memcpy(ptr, table->s->default_values + field_dec->m_field_offset,
79              field_dec->m_field_pack_length);
80     }
81   } else {
82     if (decode && field_dec->maybe_null()) {
83       // sets non-null bits for this record
84       buf[field_dec->m_field_null_offset] &= ~(field_dec->m_field_null_mask);
85     }
86 
87     if (field_dec->m_field_type == MYSQL_TYPE_BLOB ||
88         field_dec->m_field_type == MYSQL_TYPE_JSON) {
89       err = decode_blob(table, ptr, field_dec, reader, decode);
90     } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
91       err = decode_varchar(ptr, field_dec, reader, decode);
92     } else {
93       err = decode_fixed_length_field(ptr, field_dec, reader, decode);
94     }
95   }
96 
97   return err;
98 }
99 
100 /*
101   Convert blob from rocksdb storage format into Mysql Record format
102   @param    table       IN           current table
103   @param    field       IN           current field
104   @param    reader      IN           rocksdb value slice reader
105   @param    decode      IN           whether to decode current field
106   @return
107     0      OK
108     other  HA_ERR error code (can be SE-specific)
109 */
decode_blob(TABLE * table,uchar * const buf,Rdb_field_encoder * field_dec,Rdb_string_reader * reader,bool decode)110 int Rdb_convert_to_record_value_decoder::decode_blob(
111     TABLE *table, uchar *const buf, Rdb_field_encoder *field_dec,
112     Rdb_string_reader *reader, bool decode) {
113   // Get the number of bytes needed to store length
114   const uint length_bytes =
115       field_dec->m_field_pack_length - portable_sizeof_char_ptr;
116 
117   const char *data_len_str;
118   if (!(data_len_str = reader->read(length_bytes))) {
119     return HA_ERR_ROCKSDB_CORRUPT_DATA;
120   }
121 
122   memcpy(buf, data_len_str, length_bytes);
123   uint32 data_len =
124       Field_blob::get_length(reinterpret_cast<const uchar *>(data_len_str),
125                              length_bytes, table->s->db_low_byte_first);
126   const char *blob_ptr;
127   if (!(blob_ptr = reader->read(data_len))) {
128     return HA_ERR_ROCKSDB_CORRUPT_DATA;
129   }
130 
131   if (decode) {
132     // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
133     // platforms)
134     memset(buf + length_bytes, 0, 8);
135     memcpy(buf + length_bytes, &blob_ptr, sizeof(uchar **));
136   }
137 
138   return HA_EXIT_SUCCESS;
139 }
140 
141 /*
142   Convert fixed length field from rocksdb storage format into Mysql Record
143   format
144   @param    field       IN           current field
145   @param    field_dec   IN           data structure conttain field encoding data
146   @param    reader      IN           rocksdb value slice reader
147   @param    decode      IN           whether to decode current field
148   @return
149     0      OK
150     other  HA_ERR error code (can be SE-specific)
151 */
decode_fixed_length_field(uchar * const buf,Rdb_field_encoder * field_dec,Rdb_string_reader * const reader,bool decode)152 int Rdb_convert_to_record_value_decoder::decode_fixed_length_field(
153     uchar *const buf, Rdb_field_encoder *field_dec,
154     Rdb_string_reader *const reader, bool decode) {
155   uint len = field_dec->m_field_pack_length;
156   if (len > 0) {
157     const char *data_bytes;
158     if ((data_bytes = reader->read(len)) == nullptr) {
159       return HA_ERR_ROCKSDB_CORRUPT_DATA;
160     }
161 
162     if (decode) {
163       memcpy(buf, data_bytes, len);
164     }
165   }
166 
167   return HA_EXIT_SUCCESS;
168 }
169 
170 /*
171   Convert varchar field from rocksdb storage format into Mysql Record format
172   @param    field       IN           current field
173   @param    field_dec   IN           data structure conttain field encoding data
174   @param    reader      IN           rocksdb value slice reader
175   @param    decode      IN           whether to decode current field
176   @return
177     0      OK
178     other  HA_ERR error code (can be SE-specific)
179 */
decode_varchar(uchar * const buf,Rdb_field_encoder * field_dec,Rdb_string_reader * const reader,bool decode)180 int Rdb_convert_to_record_value_decoder::decode_varchar(
181     uchar *const buf, Rdb_field_encoder *field_dec,
182     Rdb_string_reader *const reader, bool decode) {
183   const char *data_len_str;
184   if (!(data_len_str = reader->read(field_dec->m_field_length_bytes))) {
185     return HA_ERR_ROCKSDB_CORRUPT_DATA;
186   }
187 
188   uint data_len;
189   // field_dec->length_bytes is 1 or 2
190   if (field_dec->m_field_length_bytes == 1) {
191     data_len = (uchar)data_len_str[0];
192   } else {
193     assert(field_dec->m_field_length_bytes == 2);
194     data_len = uint2korr(data_len_str);
195   }
196 
197   if (data_len > field_dec->m_field_length) {
198     // The data on disk is longer than table DDL allows?
199     return HA_ERR_ROCKSDB_CORRUPT_DATA;
200   }
201 
202   if (!reader->read(data_len)) {
203     return HA_ERR_ROCKSDB_CORRUPT_DATA;
204   }
205 
206   if (decode) {
207     memcpy(buf, data_len_str, field_dec->m_field_length_bytes + data_len);
208   }
209 
210   return HA_EXIT_SUCCESS;
211 }
212 
213 template <typename value_field_decoder>
Rdb_value_field_iterator(TABLE * table,Rdb_string_reader * value_slice_reader,const Rdb_converter * rdb_converter,uchar * const buf)214 Rdb_value_field_iterator<value_field_decoder>::Rdb_value_field_iterator(
215     TABLE *table, Rdb_string_reader *value_slice_reader,
216     const Rdb_converter *rdb_converter, uchar *const buf)
217     : m_buf(buf) {
218   assert(table != nullptr);
219   assert(buf != nullptr);
220 
221   m_table = table;
222   m_value_slice_reader = value_slice_reader;
223   auto fields = rdb_converter->get_decode_fields();
224   m_field_iter = fields->begin();
225   m_field_end = fields->end();
226   m_null_bytes = rdb_converter->get_null_bytes();
227 }
228 
229 // Iterate each requested field and decode one by one
230 template <typename value_field_decoder>
next()231 int Rdb_value_field_iterator<value_field_decoder>::next() {
232   int err = HA_EXIT_SUCCESS;
233   while (m_field_iter != m_field_end) {
234     m_field_dec = m_field_iter->m_field_enc;
235     bool decode = m_field_iter->m_decode;
236     bool maybe_null = m_field_dec->maybe_null();
237     // This is_null value is bind to how stroage format store its value
238     m_is_null = maybe_null && ((m_null_bytes[m_field_dec->m_null_offset] &
239                                 m_field_dec->m_null_mask) != 0);
240 
241     // Skip the bytes we need to skip
242     int skip = m_field_iter->m_skip;
243     if (skip && !m_value_slice_reader->read(skip)) {
244       return HA_ERR_ROCKSDB_CORRUPT_DATA;
245     }
246 
247     // Decode each field
248     err = value_field_decoder::decode(m_buf, m_table, m_field_dec,
249                                       m_value_slice_reader, decode, m_is_null);
250     if (err != HA_EXIT_SUCCESS) {
251       return err;
252     }
253 
254     m_field_iter++;
255 
256     // Only break for the field that are actually decoding rather than skipping
257     if (decode) {
258       break;
259     }
260   }
261   return err;
262 }
263 
264 template <typename value_field_decoder>
end_of_fields() const265 bool Rdb_value_field_iterator<value_field_decoder>::end_of_fields() const {
266   return m_field_iter == m_field_end;
267 }
268 
269 template <typename value_field_decoder>
get_dst() const270 void *Rdb_value_field_iterator<value_field_decoder>::get_dst() const {
271   assert(m_buf != nullptr);
272   return m_buf + m_offset;
273 }
274 
275 template <typename value_field_decoder>
get_field_index() const276 int Rdb_value_field_iterator<value_field_decoder>::get_field_index() const {
277   assert(m_field_dec != nullptr);
278   return m_field_dec->m_field_index;
279 }
280 
281 template <typename value_field_decoder>
get_field_type() const282 enum_field_types Rdb_value_field_iterator<value_field_decoder>::get_field_type()
283     const {
284   assert(m_field_dec != nullptr);
285   return m_field_dec->m_field_type;
286 }
287 
288 template <typename value_field_decoder>
is_null() const289 bool Rdb_value_field_iterator<value_field_decoder>::is_null() const {
290   return m_is_null;
291 }
292 
293 /*
294   Initialize Rdb_converter with table data
295   @param    thd        IN      Thread context
296   @param    tbl_def    IN      MyRocks table definition
297   @param    table      IN      Current open table
298 */
Rdb_converter(const THD * thd,const Rdb_tbl_def * tbl_def,TABLE * table)299 Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def,
300                              TABLE *table)
301     : m_thd(thd), m_tbl_def(tbl_def), m_table(table) {
302   assert(thd != nullptr);
303   assert(tbl_def != nullptr);
304   assert(table != nullptr);
305 
306   m_key_requested = false;
307   m_verify_row_debug_checksums = false;
308   m_maybe_unpack_info = false;
309   m_row_checksums_checked = 0;
310   m_null_bytes = nullptr;
311   setup_field_encoders();
312   m_lookup_bitmap = {nullptr, 0, 0, nullptr, nullptr};
313 }
314 
~Rdb_converter()315 Rdb_converter::~Rdb_converter() {
316   my_free(m_encoder_arr);
317   m_encoder_arr = nullptr;
318   // These are needed to suppress valgrind errors in rocksdb.partition
319   m_storage_record.mem_free();
320   bitmap_free(&m_lookup_bitmap);
321 }
322 
323 /*
324   Decide storage type for each encoder
325 */
get_storage_type(Rdb_field_encoder * const encoder,const uint kp)326 void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
327                                      const uint kp) {
328   auto pk_descr =
329       m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)];
330   // STORE_SOME uses unpack_info.
331   if (pk_descr->has_unpack_info(kp)) {
332     assert(pk_descr->can_unpack(kp));
333     encoder->m_storage_type = Rdb_field_encoder::STORE_SOME;
334     m_maybe_unpack_info = true;
335   } else if (pk_descr->can_unpack(kp)) {
336     encoder->m_storage_type = Rdb_field_encoder::STORE_NONE;
337   }
338 }
339 
340 /*
341   @brief
342     Setup which fields will be unpacked when reading rows
343 
344   @detail
345     Two special cases when we still unpack all fields:
346     - When client requires decode_all_fields, such as this table is being
347   updated (m_lock_rows==RDB_LOCK_WRITE).
348     - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to
349   read all fields to find whether there is a row checksum at the end. We could
350   skip the fields instead of decoding them, but currently we do decoding.)
351 
352   @seealso
353     Rdb_converter::setup_field_encoders()
354     Rdb_converter::convert_record_from_storage_format()
355 */
setup_field_decoders(const MY_BITMAP * field_map,uint active_index,bool keyread_only,bool decode_all_fields)356 void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
357                                          uint active_index, bool keyread_only,
358                                          bool decode_all_fields) {
359   m_key_requested = false;
360   m_decoders_vect.clear();
361   bitmap_free(&m_lookup_bitmap);
362   int last_useful = 0;
363   int skip_size = 0;
364 
365   for (uint i = 0; i < m_table->s->fields; i++) {
366     bool field_requested =
367         decode_all_fields || m_verify_row_debug_checksums ||
368         bitmap_is_set(field_map, m_table->field[i]->field_index);
369 
370     // We only need the decoder if the whole record is stored.
371     if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
372       // the field potentially needs unpacking
373       if (field_requested) {
374         // the field is in the read set
375         m_key_requested = true;
376       }
377       continue;
378     }
379 
380     if (field_requested) {
381       // We will need to decode this field
382       m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size});
383       last_useful = m_decoders_vect.size();
384       skip_size = 0;
385     } else {
386       if (m_encoder_arr[i].uses_variable_len_encoding() ||
387           m_encoder_arr[i].maybe_null()) {
388         // For variable-length field, we need to read the data and skip it
389         m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size});
390         skip_size = 0;
391       } else {
392         // Fixed-width field can be skipped without looking at it.
393         // Add appropriate skip_size to the next field.
394         skip_size += m_encoder_arr[i].m_field_pack_length;
395       }
396     }
397   }
398 
399   // It could be that the last few elements are varchars that just do
400   // skipping. Remove them.
401   m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
402                         m_decoders_vect.end());
403 
404   if (!keyread_only && active_index != m_table->s->primary_key) {
405     m_tbl_def->m_key_descr_arr[active_index]->get_lookup_bitmap(
406         m_table, &m_lookup_bitmap);
407   }
408 }
409 
setup_field_encoders()410 void Rdb_converter::setup_field_encoders() {
411   uint null_bytes_length = 0;
412   uchar cur_null_mask = 0x1;
413 
414   m_encoder_arr = static_cast<Rdb_field_encoder *>(
415 #ifdef HAVE_PSI_INTERFACE
416       my_malloc(rdb_handler_memory_key,
417                 m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
418 #else
419       my_malloc(PSI_NOT_INSTRUMENTED,
420                 m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
421 #endif
422   if (m_encoder_arr == nullptr) {
423     return;
424   }
425 
426   for (uint i = 0; i < m_table->s->fields; i++) {
427     Field *const field = m_table->field[i];
428     m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL;
429 
430     /*
431       Check if this field is
432       - a part of primary key, and
433       - it can be decoded back from its key image.
434       If both hold, we don't need to store this field in the value part of
435       RocksDB's key-value pair.
436 
437       If hidden pk exists, we skip this check since the field will never be
438       part of the hidden pk.
439     */
440     if (!Rdb_key_def::table_has_hidden_pk(m_table)) {
441       KEY *const pk_info = &m_table->key_info[m_table->s->primary_key];
442       for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) {
443         // key_part->fieldnr is counted from 1
444         if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) {
445           get_storage_type(&m_encoder_arr[i], kp);
446           break;
447         }
448       }
449     }
450 
451     /*
452       The difference between pack_length and pack_length_in_rec is fairly
453       subtle. The only difference is in Field_bit case where it borrows some
454       bits in null bytes in memory to store the 'uneven' high bits, therefore
455       the pack_length is the length of remaining bits while the
456       pack_length_in_rec is the full length of all bits when you store it on
457       disk. Only MyIsam and archive supports it, indicating by
458       HA_CAN_BIT_FIELD. We don't handle this case today at all (nor do we need
459       to), and we use pack_length everywhere, so just assert it and move on.
460     */
461     assert(field->pack_length() == field->pack_length_in_rec());
462 
463     auto field_type = field->real_type();
464     m_encoder_arr[i].m_field_type = field_type;
465     m_encoder_arr[i].m_field_index = i;
466     m_encoder_arr[i].m_field_pack_length = field->pack_length();
467     m_encoder_arr[i].m_field_offset = field->ptr - m_table->record[0];
468 
469     if (field_type == MYSQL_TYPE_VARCHAR) {
470       auto varchar = reinterpret_cast<const Field_varstring *>(field);
471       m_encoder_arr[i].m_field_length = varchar->field_length;
472       m_encoder_arr[i].m_field_length_bytes = varchar->length_bytes;
473     } else {
474       m_encoder_arr[i].m_field_length = -1;
475       m_encoder_arr[i].m_field_length_bytes = -1;
476     }
477 
478     auto maybe_null = field->real_maybe_null();
479     if (maybe_null) {
480       m_encoder_arr[i].m_null_mask = cur_null_mask;
481       m_encoder_arr[i].m_null_offset = null_bytes_length;
482       m_encoder_arr[i].m_field_null_offset = field->null_offset();
483       m_encoder_arr[i].m_field_null_mask = field->null_bit;
484       if (cur_null_mask == 0x80) {
485         cur_null_mask = 0x1;
486         null_bytes_length++;
487       } else {
488         cur_null_mask = cur_null_mask << 1;
489       }
490     } else {
491       m_encoder_arr[i].m_null_offset = 0;
492       m_encoder_arr[i].m_null_mask = 0;
493     }
494   }
495 
496   // Count the last, unfinished NULL-bits byte
497   if (cur_null_mask != 0x1) {
498     null_bytes_length++;
499   }
500 
501   m_null_bytes_length_in_record = null_bytes_length;
502 }
503 
504 /*
505   EntryPoint for Decode:
506   Decode key slice(if requested) and value slice using built-in field
507   decoders
508   @param     key_def        IN          key definition to decode
509   @param     dst            OUT         Mysql buffer to fill decoded content
510   @param     key_slice      IN          RocksDB key slice to decode
511   @param     value_slice    IN          RocksDB value slice to decode
512   @return
513     0      OK
514     other  HA_ERR error code (can be SE-specific)
515 */
decode(const std::shared_ptr<Rdb_key_def> & key_def,uchar * dst,const rocksdb::Slice * key_slice,const rocksdb::Slice * value_slice,bool decode_value)516 int Rdb_converter::decode(const std::shared_ptr<Rdb_key_def> &key_def,
517                           uchar *dst,  // address to fill data
518                           const rocksdb::Slice *key_slice,
519                           const rocksdb::Slice *value_slice,
520                           bool decode_value) {
521   // Currently only support decode primary key, Will add decode secondary later
522   assert(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
523               key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
524 
525   const rocksdb::Slice *updated_key_slice = key_slice;
526 #ifndef NDEBUG
527   String last_rowkey;
528   last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin);
529   DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
530                   { dbug_modify_key_varchar8(&last_rowkey); });
531   rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length());
532   updated_key_slice = &rowkey_slice;
533 #endif
534   return convert_record_from_storage_format(key_def, updated_key_slice,
535                                             value_slice, dst);
536 }
537 
538 /*
539   Decode value slice header
540   @param    reader         IN          value slice reader
541   @param    pk_def         IN          key definition to decode
542   @param    unpack_slice   OUT         unpack info slice
543   @return
544     0      OK
545     other  HA_ERR error code (can be SE-specific)
546 */
decode_value_header(Rdb_string_reader * reader,const std::shared_ptr<Rdb_key_def> & pk_def,rocksdb::Slice * unpack_slice)547 int Rdb_converter::decode_value_header(
548     Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def,
549     rocksdb::Slice *unpack_slice) {
550   /* If it's a TTL record, skip the 8 byte TTL value */
551   if (pk_def->has_ttl()) {
552     const char *ttl_bytes;
553     if ((ttl_bytes = reader->read(ROCKSDB_SIZEOF_TTL_RECORD))) {
554       memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD);
555     } else {
556       return HA_ERR_ROCKSDB_CORRUPT_DATA;
557     }
558   }
559 
560   /* Other fields are decoded from the value */
561   if (m_null_bytes_length_in_record &&
562       !(m_null_bytes = reader->read(m_null_bytes_length_in_record))) {
563     return HA_ERR_ROCKSDB_CORRUPT_DATA;
564   }
565 
566   if (m_maybe_unpack_info) {
567     const char *unpack_info = reader->get_current_ptr();
568     if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) ||
569         !reader->read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) {
570       return HA_ERR_ROCKSDB_CORRUPT_DATA;
571     }
572 
573     uint16 unpack_info_len =
574         rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1));
575     *unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len);
576 
577     reader->read(unpack_info_len -
578                  Rdb_key_def::get_unpack_header_size(unpack_info[0]));
579   }
580 
581   return HA_EXIT_SUCCESS;
582 }
583 
584 /*
585   Convert RocksDb key slice and value slice to Mysql format
586   @param      key_def        IN           key definition to decode
587   @param      key_slice      IN           RocksDB key slice
588   @param      value_slice    IN           RocksDB value slice
589   @param      dst            OUT          MySql format address
590   @return
591     0      OK
592     other  HA_ERR error code (can be SE-specific)
593 */
convert_record_from_storage_format(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice * const key_slice,const rocksdb::Slice * const value_slice,uchar * const dst,bool decode_value)594 int Rdb_converter::convert_record_from_storage_format(
595     const std::shared_ptr<Rdb_key_def> &pk_def,
596     const rocksdb::Slice *const key_slice,
597     const rocksdb::Slice *const value_slice, uchar *const dst,
598     bool decode_value) {
599   bool skip_value = !decode_value || get_decode_fields()->size() == 0;
600   if (!m_key_requested && skip_value) {
601     return HA_EXIT_SUCCESS;
602   }
603 
604   int err = HA_EXIT_SUCCESS;
605 
606   Rdb_string_reader value_slice_reader(value_slice);
607   rocksdb::Slice unpack_slice;
608   err = decode_value_header(&value_slice_reader, pk_def, &unpack_slice);
609   if (err != HA_EXIT_SUCCESS) {
610     return err;
611   }
612 
613   /*
614     Decode PK fields from the key
615   */
616   if (m_key_requested) {
617     err = pk_def->unpack_record(m_table, dst, key_slice,
618                                 !unpack_slice.empty() ? &unpack_slice : nullptr,
619                                 false /* verify_checksum */);
620     if (err != HA_EXIT_SUCCESS) {
621       return err;
622     }
623   }
624 
625   if (skip_value) {
626     // We are done
627     return HA_EXIT_SUCCESS;
628   }
629 
630   Rdb_value_field_iterator<Rdb_convert_to_record_value_decoder>
631       value_field_iterator(m_table, &value_slice_reader, this, dst);
632 
633   // Decode value slices
634   while (!value_field_iterator.end_of_fields()) {
635     err = value_field_iterator.next();
636 
637     if (err != HA_EXIT_SUCCESS) {
638       return err;
639     }
640   }
641 
642   if (m_verify_row_debug_checksums) {
643     return verify_row_debug_checksum(pk_def, &value_slice_reader, key_slice,
644                                      value_slice);
645   }
646   return HA_EXIT_SUCCESS;
647 }
648 
649 /*
650   Verify checksum for row
651   @param      pk_def   IN     key def
652   @param      reader   IN     RocksDB value slice reader
653   @param      key      IN     RocksDB key slice
654   @param      value    IN     RocksDB value slice
655   @return
656     0      OK
657     other  HA_ERR error code (can be SE-specific)
658 */
verify_row_debug_checksum(const std::shared_ptr<Rdb_key_def> & pk_def,Rdb_string_reader * reader,const rocksdb::Slice * key,const rocksdb::Slice * value)659 int Rdb_converter::verify_row_debug_checksum(
660     const std::shared_ptr<Rdb_key_def> &pk_def, Rdb_string_reader *reader,
661     const rocksdb::Slice *key, const rocksdb::Slice *value) {
662   if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE &&
663       reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) {
664     uint32_t stored_key_chksum =
665         rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
666     uint32_t stored_val_chksum =
667         rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
668 
669     const ha_checksum computed_key_chksum =
670         my_core::my_checksum(0, rdb_slice_to_uchar_ptr(key), key->size());
671     const ha_checksum computed_val_chksum =
672         my_core::my_checksum(0, rdb_slice_to_uchar_ptr(value),
673                              value->size() - RDB_CHECKSUM_CHUNK_SIZE);
674 
675     DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;);
676 
677     if (stored_key_chksum != computed_key_chksum) {
678       pk_def->report_checksum_mismatch(true, key->data(), key->size());
679       return HA_ERR_ROCKSDB_CORRUPT_DATA;
680     }
681 
682     DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;);
683     if (stored_val_chksum != computed_val_chksum) {
684       pk_def->report_checksum_mismatch(false, value->data(), value->size());
685       return HA_ERR_ROCKSDB_CORRUPT_DATA;
686     }
687 
688     m_row_checksums_checked++;
689   }
690   if (reader->remaining_bytes()) {
691     return HA_ERR_ROCKSDB_CORRUPT_DATA;
692   }
693   return HA_EXIT_SUCCESS;
694 }
695 
696 /**
697   Convert record from table->record[0] form into a form that can be written
698   into rocksdb.
699 
700   @param pk_def               IN        Current key def
701   @pk_unpack_info             IN        Unpack info generated during key pack
702   @is_update_row              IN        Whether it is update row
703   @store_row_debug_checksums  IN        Whether to store checksums
704   @param ttl_bytes            IN/OUT    Old ttl value from previous record and
705                                         ttl value during current encode
706   @is_ttl_bytes_updated       OUT       Whether ttl bytes is updated
707   @param value_slice          OUT       Data slice with record data.
708 */
encode_value_slice(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice & pk_packed_slice,Rdb_string_writer * pk_unpack_info,bool is_update_row,bool store_row_debug_checksums,char * ttl_bytes,bool * is_ttl_bytes_updated,rocksdb::Slice * const value_slice)709 int Rdb_converter::encode_value_slice(
710     const std::shared_ptr<Rdb_key_def> &pk_def,
711     const rocksdb::Slice &pk_packed_slice, Rdb_string_writer *pk_unpack_info,
712     bool is_update_row, bool store_row_debug_checksums, char *ttl_bytes,
713     bool *is_ttl_bytes_updated, rocksdb::Slice *const value_slice) {
714   assert(pk_def != nullptr);
715   // Currently only primary key will store value slice
716   assert(pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
717               pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
718   assert_IMP(m_maybe_unpack_info, pk_unpack_info);
719 
720   bool has_ttl = pk_def->has_ttl();
721   bool has_ttl_column = !pk_def->m_ttl_column.empty();
722 
723   m_storage_record.length(0);
724 
725   if (has_ttl) {
726     /* If it's a TTL record, reserve space for 8 byte TTL value in front. */
727     m_storage_record.fill(
728         ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_length_in_record, 0);
729     // NOTE: is_ttl_bytes_updated is only used for update case
730     // During update, skip update sk key/values slice iff none of sk fields
731     // have changed and ttl bytes isn't changed. see
732     // ha_rocksdb::update_write_sk() for more info
733     *is_ttl_bytes_updated = false;
734     char *const data = const_cast<char *>(m_storage_record.ptr());
735     if (has_ttl_column) {
736       assert(pk_def->get_ttl_field_index() != UINT_MAX);
737       Field *const field = m_table->field[pk_def->get_ttl_field_index()];
738       assert(field->pack_length_in_rec() == ROCKSDB_SIZEOF_TTL_RECORD);
739       assert(field->real_type() == MYSQL_TYPE_LONGLONG);
740 
741       uint64 ts = uint8korr(field->ptr);
742 #ifndef NDEBUG
743       ts += rdb_dbug_set_ttl_rec_ts();
744 #endif
745       rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
746       if (is_update_row) {
747         *is_ttl_bytes_updated =
748             memcmp(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
749       }
750       // Also store in m_ttl_bytes to propagate to update_write_sk
751       memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
752     } else {
753       /*
754         For implicitly generated TTL records we need to copy over the old
755         TTL value from the old record in the event of an update. It was stored
756         in m_ttl_bytes.
757 
758         Otherwise, generate a timestamp using the current time.
759       */
760       if (is_update_row) {
761         memcpy(data, ttl_bytes, sizeof(uint64));
762       } else {
763         uint64 ts = static_cast<uint64>(std::time(nullptr));
764 #ifndef NDEBUG
765         ts += rdb_dbug_set_ttl_rec_ts();
766 #endif
767         rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
768         // Also store in m_ttl_bytes to propagate to update_write_sk
769         memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
770       }
771     }
772   } else {
773     /* All NULL bits are initially 0 */
774     m_storage_record.fill(m_null_bytes_length_in_record, 0);
775   }
776 
777   // If a primary key may have non-empty unpack_info for certain values,
778   // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
779   // itself was prepared in Rdb_key_def::pack_record.
780   if (m_maybe_unpack_info) {
781     m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
782                             pk_unpack_info->get_current_pos());
783   }
784   for (uint i = 0; i < m_table->s->fields; i++) {
785     Rdb_field_encoder &encoder = m_encoder_arr[i];
786     /* Don't pack decodable PK key parts */
787     if (encoder.m_storage_type != Rdb_field_encoder::STORE_ALL) {
788       continue;
789     }
790 
791     Field *const field = m_table->field[i];
792 
793     if (encoder.maybe_null()) {
794       char *data = const_cast<char *>(m_storage_record.ptr());
795       if (has_ttl) {
796         data += ROCKSDB_SIZEOF_TTL_RECORD;
797       }
798 
799       if (field->is_null()) {
800         data[encoder.m_null_offset] |= encoder.m_null_mask;
801         /* Don't write anything for NULL values */
802         continue;
803       }
804     }
805 
806     if (encoder.m_field_type == MYSQL_TYPE_BLOB ||
807         encoder.m_field_type == MYSQL_TYPE_JSON) {
808       my_core::Field_blob *blob =
809           reinterpret_cast<my_core::Field_blob *>(field);
810       /* Get the number of bytes needed to store length*/
811       const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
812 
813       /* Store the length of the value */
814       m_storage_record.append(reinterpret_cast<char *>(blob->ptr),
815                               length_bytes);
816 
817       /* Store the blob value itself */
818       char *data_ptr;
819       memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **));
820       m_storage_record.append(data_ptr, blob->get_length());
821     } else if (encoder.m_field_type == MYSQL_TYPE_VARCHAR) {
822       Field_varstring *const field_var =
823           reinterpret_cast<Field_varstring *>(field);
824       uint data_len;
825       /* field_var->length_bytes is 1 or 2 */
826       if (field_var->length_bytes == 1) {
827         data_len = field_var->ptr[0];
828       } else {
829         assert(field_var->length_bytes == 2);
830         data_len = uint2korr(field_var->ptr);
831       }
832       m_storage_record.append(reinterpret_cast<char *>(field_var->ptr),
833                               field_var->length_bytes + data_len);
834     } else {
835       /* Copy the field data */
836       const uint len = field->pack_length();
837       m_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
838     }
839   }
840 
841   if (store_row_debug_checksums) {
842     const ha_checksum key_crc32 = my_core::my_checksum(
843         0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
844     const ha_checksum val_crc32 =
845         my_core::my_checksum(0, rdb_mysql_str_to_uchar_str(&m_storage_record),
846                              m_storage_record.length());
847     uchar key_crc_buf[RDB_CHECKSUM_SIZE];
848     uchar val_crc_buf[RDB_CHECKSUM_SIZE];
849     rdb_netbuf_store_uint32(key_crc_buf, key_crc32);
850     rdb_netbuf_store_uint32(val_crc_buf, val_crc32);
851     m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1);
852     m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE);
853     m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE);
854   }
855 
856   *value_slice =
857       rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length());
858 
859   return HA_EXIT_SUCCESS;
860 }
861 }  // namespace myrocks
862