1 /*
2    Copyright (c) 2015, Facebook, Inc.
3 
4    This program is f
5    i the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
16 
17 #include <my_global.h>
18 
19 /* This C++ file's header file */
20 #include "./rdb_converter.h"
21 
22 /* Standard C++ header files */
23 #include <algorithm>
24 #include <map>
25 #include <string>
26 #include <vector>
27 
28 /* MySQL header files */
29 #include "./field.h"
30 #include "./key.h"
31 #include "./m_ctype.h"
32 #include "./my_bit.h"
33 #include "./my_bitmap.h"
34 #include "./sql_table.h"
35 
36 
37 /* MyRocks header files */
38 #include "./ha_rocksdb.h"
39 #include "./ha_rocksdb_proto.h"
40 #include "./my_stacktrace.h"
41 #include "./rdb_cf_manager.h"
42 #include "./rdb_psi.h"
43 #include "./rdb_utils.h"
44 
45 
46 namespace myrocks {
47 
dbug_modify_key_varchar8(String * on_disk_rec)48 void dbug_modify_key_varchar8(String *on_disk_rec) {
49   std::string res;
50   // The key starts with index number
51   res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE);
52 
53   // Then, a mem-comparable form of a varchar(8) value.
54   res.append("ABCDE\0\0\0\xFC", 9);
55   on_disk_rec->length(0);
56   on_disk_rec->append(res.data(), res.size());
57 }
58 
59 /*
60   Convert field from rocksdb storage format into Mysql Record format
61   @param    buf         OUT          start memory to fill converted data
62   @param    offset      IN/OUT       decoded data is stored in buf + offset
63   @param    table       IN           current table
64   @param    field       IN           current field
65   @param    reader      IN           rocksdb value slice reader
66   @param    decode      IN           whether to decode current field
67   @return
68     0      OK
69     other  HA_ERR error code (can be SE-specific)
70 */
decode(uchar * const buf,uint * offset,TABLE * table,my_core::Field * field,Rdb_field_encoder * field_dec,Rdb_string_reader * reader,bool decode,bool is_null)71 int Rdb_convert_to_record_value_decoder::decode(uchar *const buf, uint *offset,
72                                                 TABLE *table,
73                                                 my_core::Field *field,
74                                                 Rdb_field_encoder *field_dec,
75                                                 Rdb_string_reader *reader,
76                                                 bool decode, bool is_null) {
77   int err = HA_EXIT_SUCCESS;
78 
79   uint field_offset = field->ptr - table->record[0];
80   *offset = field_offset;
81   uint null_offset = field->null_offset();
82   bool maybe_null = field->real_maybe_null();
83   field->move_field(buf + field_offset,
84                     maybe_null ? buf + null_offset : nullptr, field->null_bit);
85 
86   if (is_null) {
87     if (decode) {
88       // This sets the NULL-bit of this record
89       field->set_null();
90       /*
91         Besides that, set the field value to default value. CHECKSUM TABLE
92         depends on this.
93       */
94       memcpy(field->ptr, table->s->default_values + field_offset,
95              field->pack_length());
96     }
97   } else {
98     if (decode) {
99       // sets non-null bits for this record
100       field->set_notnull();
101     }
102 
103     if (field_dec->m_field_type == MYSQL_TYPE_BLOB) {
104       err = decode_blob(table, field, reader, decode);
105     } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
106       err = decode_varchar(field, reader, decode);
107     } else {
108       err = decode_fixed_length_field(field, field_dec, reader, decode);
109     }
110   }
111 
112   // Restore field->ptr and field->null_ptr
113   field->move_field(table->record[0] + field_offset,
114                     maybe_null ? table->record[0] + null_offset : nullptr,
115                     field->null_bit);
116 
117   return err;
118 }
119 
120 /*
121   Convert blob from rocksdb storage format into Mysql Record format
122   @param    table       IN           current table
123   @param    field       IN           current field
124   @param    reader      IN           rocksdb value slice reader
125   @param    decode      IN           whether to decode current field
126   @return
127     0      OK
128     other  HA_ERR error code (can be SE-specific)
129 */
decode_blob(TABLE * table,Field * field,Rdb_string_reader * reader,bool decode)130 int Rdb_convert_to_record_value_decoder::decode_blob(TABLE *table, Field *field,
131                                                      Rdb_string_reader *reader,
132                                                      bool decode) {
133   my_core::Field_blob *blob = (my_core::Field_blob *)field;
134 
135   // Get the number of bytes needed to store length
136   const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
137 
138   const char *data_len_str;
139   if (!(data_len_str = reader->read(length_bytes))) {
140     return HA_ERR_ROCKSDB_CORRUPT_DATA;
141   }
142 
143   memcpy(blob->ptr, data_len_str, length_bytes);
144   uint32 data_len =
145       blob->get_length(reinterpret_cast<const uchar *>(data_len_str),
146                        length_bytes);
147   const char *blob_ptr;
148   if (!(blob_ptr = reader->read(data_len))) {
149     return HA_ERR_ROCKSDB_CORRUPT_DATA;
150   }
151 
152   if (decode) {
153     // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
154     // platforms)
155     memset(blob->ptr + length_bytes, 0, 8);
156     memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **));
157   }
158 
159   return HA_EXIT_SUCCESS;
160 }
161 
162 /*
163   Convert fixed length field from rocksdb storage format into Mysql Record
164   format
165   @param    field       IN           current field
166   @param    field_dec   IN           data structure conttain field encoding data
167   @param    reader      IN           rocksdb value slice reader
168   @param    decode      IN           whether to decode current field
169   @return
170     0      OK
171     other  HA_ERR error code (can be SE-specific)
172 */
decode_fixed_length_field(my_core::Field * const field,Rdb_field_encoder * field_dec,Rdb_string_reader * const reader,bool decode)173 int Rdb_convert_to_record_value_decoder::decode_fixed_length_field(
174     my_core::Field *const field, Rdb_field_encoder *field_dec,
175     Rdb_string_reader *const reader, bool decode) {
176   uint len = field_dec->m_pack_length_in_rec;
177   if (len > 0) {
178     const char *data_bytes;
179     if ((data_bytes = reader->read(len)) == nullptr) {
180       return HA_ERR_ROCKSDB_CORRUPT_DATA;
181     }
182 
183     if (decode) {
184       memcpy(field->ptr, data_bytes, len);
185     }
186   }
187 
188   return HA_EXIT_SUCCESS;
189 }
190 
191 /*
192   Convert varchar field from rocksdb storage format into Mysql Record format
193   @param    field       IN           current field
194   @param    field_dec   IN           data structure conttain field encoding data
195   @param    reader      IN           rocksdb value slice reader
196   @param    decode      IN           whether to decode current field
197   @return
198     0      OK
199     other  HA_ERR error code (can be SE-specific)
200 */
decode_varchar(Field * field,Rdb_string_reader * const reader,bool decode)201 int Rdb_convert_to_record_value_decoder::decode_varchar(
202     Field *field, Rdb_string_reader *const reader, bool decode) {
203   my_core::Field_varstring *const field_var = (my_core::Field_varstring *)field;
204 
205   const char *data_len_str;
206   if (!(data_len_str = reader->read(field_var->length_bytes))) {
207     return HA_ERR_ROCKSDB_CORRUPT_DATA;
208   }
209 
210   uint data_len;
211   // field_var->length_bytes is 1 or 2
212   if (field_var->length_bytes == 1) {
213     data_len = (uchar)data_len_str[0];
214   } else {
215     DBUG_ASSERT(field_var->length_bytes == 2);
216     data_len = uint2korr(data_len_str);
217   }
218 
219   if (data_len > field_var->field_length) {
220     // The data on disk is longer than table DDL allows?
221     return HA_ERR_ROCKSDB_CORRUPT_DATA;
222   }
223 
224   if (!reader->read(data_len)) {
225     return HA_ERR_ROCKSDB_CORRUPT_DATA;
226   }
227 
228   if (decode) {
229     memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len);
230   }
231 
232   return HA_EXIT_SUCCESS;
233 }
234 
235 template <typename value_field_decoder>
Rdb_value_field_iterator(TABLE * table,Rdb_string_reader * value_slice_reader,const Rdb_converter * rdb_converter,uchar * const buf)236 Rdb_value_field_iterator<value_field_decoder>::Rdb_value_field_iterator(
237     TABLE *table, Rdb_string_reader *value_slice_reader,
238     const Rdb_converter *rdb_converter, uchar *const buf)
239     : m_buf(buf) {
240   DBUG_ASSERT(table != nullptr);
241   DBUG_ASSERT(buf != nullptr);
242 
243   m_table = table;
244   m_value_slice_reader = value_slice_reader;
245   auto fields = rdb_converter->get_decode_fields();
246   m_field_iter = fields->begin();
247   m_field_end = fields->end();
248   m_null_bytes = rdb_converter->get_null_bytes();
249   m_offset = 0;
250 }
251 
252 // Iterate each requested field and decode one by one
253 template <typename value_field_decoder>
next()254 int Rdb_value_field_iterator<value_field_decoder>::next() {
255   int err = HA_EXIT_SUCCESS;
256   while (m_field_iter != m_field_end) {
257     m_field_dec = m_field_iter->m_field_enc;
258     bool decode = m_field_iter->m_decode;
259     bool maybe_null = m_field_dec->maybe_null();
260     // This is_null value is bind to how stroage format store its value
261     m_is_null = maybe_null && ((m_null_bytes[m_field_dec->m_null_offset] &
262                                 m_field_dec->m_null_mask) != 0);
263 
264     // Skip the bytes we need to skip
265     int skip = m_field_iter->m_skip;
266     if (skip && !m_value_slice_reader->read(skip)) {
267       return HA_ERR_ROCKSDB_CORRUPT_DATA;
268     }
269 
270     m_field = m_table->field[m_field_dec->m_field_index];
271     // Decode each field
272     err = value_field_decoder::decode(m_buf, &m_offset, m_table, m_field,
273                                       m_field_dec, m_value_slice_reader, decode,
274                                       m_is_null);
275     if (err != HA_EXIT_SUCCESS) {
276       return err;
277     }
278     m_field_iter++;
279     // Only break for the field that are actually decoding rather than skipping
280     if (decode) {
281       break;
282     }
283   }
284   return err;
285 }
286 
287 template <typename value_field_decoder>
end_of_fields() const288 bool Rdb_value_field_iterator<value_field_decoder>::end_of_fields() const {
289   return m_field_iter == m_field_end;
290 }
291 
292 template <typename value_field_decoder>
get_field() const293 Field *Rdb_value_field_iterator<value_field_decoder>::get_field() const {
294   DBUG_ASSERT(m_field != nullptr);
295   return m_field;
296 }
297 
298 template <typename value_field_decoder>
get_dst() const299 void *Rdb_value_field_iterator<value_field_decoder>::get_dst() const {
300   DBUG_ASSERT(m_buf != nullptr);
301   return m_buf + m_offset;
302 }
303 
304 template <typename value_field_decoder>
get_field_index() const305 int Rdb_value_field_iterator<value_field_decoder>::get_field_index() const {
306   DBUG_ASSERT(m_field_dec != nullptr);
307   return m_field_dec->m_field_index;
308 }
309 
310 template <typename value_field_decoder>
get_field_type() const311 enum_field_types Rdb_value_field_iterator<value_field_decoder>::get_field_type()
312     const {
313   DBUG_ASSERT(m_field_dec != nullptr);
314   return m_field_dec->m_field_type;
315 }
316 
317 template <typename value_field_decoder>
is_null() const318 bool Rdb_value_field_iterator<value_field_decoder>::is_null() const {
319   DBUG_ASSERT(m_field != nullptr);
320   return m_is_null;
321 }
322 
323 /*
324   Initialize Rdb_converter with table data
325   @param    thd        IN      Thread context
326   @param    tbl_def    IN      MyRocks table definition
327   @param    table      IN      Current open table
328 */
Rdb_converter(const THD * thd,const Rdb_tbl_def * tbl_def,TABLE * table)329 Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def,
330                              TABLE *table)
331     : m_thd(thd), m_tbl_def(tbl_def), m_table(table) {
332   DBUG_ASSERT(thd != nullptr);
333   DBUG_ASSERT(tbl_def != nullptr);
334   DBUG_ASSERT(table != nullptr);
335 
336   m_key_requested = false;
337   m_verify_row_debug_checksums = false;
338   m_maybe_unpack_info = false;
339   m_row_checksums_checked = 0;
340   m_null_bytes = nullptr;
341   setup_field_encoders();
342 }
343 
~Rdb_converter()344 Rdb_converter::~Rdb_converter() {
345   my_free(m_encoder_arr);
346   m_encoder_arr = nullptr;
347   // These are needed to suppress valgrind errors in rocksdb.partition
348   m_storage_record.free();
349 }
350 
351 /*
352   Decide storage type for each encoder
353 */
get_storage_type(Rdb_field_encoder * const encoder,const uint kp)354 void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
355                                      const uint kp) {
356   auto pk_descr =
357       m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)];
358   // STORE_SOME uses unpack_info.
359   if (pk_descr->has_unpack_info(kp)) {
360     DBUG_ASSERT(pk_descr->can_unpack(kp));
361     encoder->m_storage_type = Rdb_field_encoder::STORE_SOME;
362     m_maybe_unpack_info = true;
363   } else if (pk_descr->can_unpack(kp)) {
364     encoder->m_storage_type = Rdb_field_encoder::STORE_NONE;
365   }
366 }
367 
368 /*
369   @brief
370     Setup which fields will be unpacked when reading rows
371 
372   @detail
373     Three special cases when we still unpack all fields:
374     - When client requires decode_all_fields, such as this table is being
375   updated (m_lock_rows==RDB_LOCK_WRITE).
376     - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to
377   read all fields to find whether there is a row checksum at the end. We could
378   skip the fields instead of decoding them, but currently we do decoding.)
379     - On index merge as bitmap is cleared during that operation
380 
381   @seealso
382     Rdb_converter::setup_field_encoders()
383     Rdb_converter::convert_record_from_storage_format()
384 */
setup_field_decoders(const MY_BITMAP * field_map,bool decode_all_fields)385 void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
386                                          bool decode_all_fields) {
387   m_key_requested = false;
388   m_decoders_vect.clear();
389   int last_useful = 0;
390   int skip_size = 0;
391 
392   for (uint i = 0; i < m_table->s->fields; i++) {
393     // bitmap is cleared on index merge, but it still needs to decode columns
394     bool field_requested =
395         decode_all_fields || m_verify_row_debug_checksums ||
396         bitmap_is_clear_all(field_map) ||
397         bitmap_is_set(field_map, m_table->field[i]->field_index);
398 
399     // We only need the decoder if the whole record is stored.
400     if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
401       // the field potentially needs unpacking
402       if (field_requested) {
403         // the field is in the read set
404         m_key_requested = true;
405       }
406       continue;
407     }
408 
409     if (field_requested) {
410       // We will need to decode this field
411       m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size});
412       last_useful = m_decoders_vect.size();
413       skip_size = 0;
414     } else {
415       if (m_encoder_arr[i].uses_variable_len_encoding() ||
416           m_encoder_arr[i].maybe_null()) {
417         // For variable-length field, we need to read the data and skip it
418         m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size});
419         skip_size = 0;
420       } else {
421         // Fixed-width field can be skipped without looking at it.
422         // Add appropriate skip_size to the next field.
423         skip_size += m_encoder_arr[i].m_pack_length_in_rec;
424       }
425     }
426   }
427 
428   // It could be that the last few elements are varchars that just do
429   // skipping. Remove them.
430   m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
431                         m_decoders_vect.end());
432 }
433 
setup_field_encoders()434 void Rdb_converter::setup_field_encoders() {
435   uint null_bytes_length = 0;
436   uchar cur_null_mask = 0x1;
437 
438   m_encoder_arr = static_cast<Rdb_field_encoder *>(
439       my_malloc(PSI_INSTRUMENT_ME, m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
440   if (m_encoder_arr == nullptr) {
441     return;
442   }
443 
444   for (uint i = 0; i < m_table->s->fields; i++) {
445     Field *const field = m_table->field[i];
446     m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL;
447 
448     /*
449       Check if this field is
450       - a part of primary key, and
451       - it can be decoded back from its key image.
452       If both hold, we don't need to store this field in the value part of
453       RocksDB's key-value pair.
454 
455       If hidden pk exists, we skip this check since the field will never be
456       part of the hidden pk.
457     */
458     if (!Rdb_key_def::table_has_hidden_pk(m_table)) {
459       KEY *const pk_info = &m_table->key_info[m_table->s->primary_key];
460       for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) {
461         // key_part->fieldnr is counted from 1
462         if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) {
463           get_storage_type(&m_encoder_arr[i], kp);
464           break;
465         }
466       }
467     }
468 
469     m_encoder_arr[i].m_field_type = field->real_type();
470     m_encoder_arr[i].m_field_index = i;
471     m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec();
472 
473     if (field->real_maybe_null()) {
474       m_encoder_arr[i].m_null_mask = cur_null_mask;
475       m_encoder_arr[i].m_null_offset = null_bytes_length;
476       if (cur_null_mask == 0x80) {
477         cur_null_mask = 0x1;
478         null_bytes_length++;
479       } else {
480         cur_null_mask = cur_null_mask << 1;
481       }
482     } else {
483       m_encoder_arr[i].m_null_mask = 0;
484     }
485   }
486 
487   // Count the last, unfinished NULL-bits byte
488   if (cur_null_mask != 0x1) {
489     null_bytes_length++;
490   }
491 
492   m_null_bytes_length_in_record = null_bytes_length;
493 }
494 
495 /*
496   EntryPoint for Decode:
497   Decode key slice(if requested) and value slice using built-in field
498   decoders
499   @param     key_def        IN          key definition to decode
500   @param     dst            OUT         Mysql buffer to fill decoded content
501   @param     key_slice      IN          RocksDB key slice to decode
502   @param     value_slice    IN          RocksDB value slice to decode
503   @return
504     0      OK
505     other  HA_ERR error code (can be SE-specific)
506 */
decode(const std::shared_ptr<Rdb_key_def> & key_def,uchar * dst,const rocksdb::Slice * key_slice,const rocksdb::Slice * value_slice)507 int Rdb_converter::decode(const std::shared_ptr<Rdb_key_def> &key_def,
508                           uchar *dst,  // address to fill data
509                           const rocksdb::Slice *key_slice,
510                           const rocksdb::Slice *value_slice) {
511   // Currently only support decode primary key, Will add decode secondary later
512   DBUG_ASSERT(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
513               key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
514 
515   const rocksdb::Slice *updated_key_slice = key_slice;
516 #ifndef DBUG_OFF
517   String last_rowkey;
518   last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin);
519   DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
520                   { dbug_modify_key_varchar8(&last_rowkey); });
521   rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length());
522   updated_key_slice = &rowkey_slice;
523 #endif
524   return convert_record_from_storage_format(key_def, updated_key_slice,
525                                             value_slice, dst);
526 }
527 
528 /*
529   Decode value slice header
530   @param    reader         IN          value slice reader
531   @param    pk_def         IN          key definition to decode
532   @param    unpack_slice   OUT         unpack info slice
533   @return
534     0      OK
535     other  HA_ERR error code (can be SE-specific)
536 */
decode_value_header(Rdb_string_reader * reader,const std::shared_ptr<Rdb_key_def> & pk_def,rocksdb::Slice * unpack_slice)537 int Rdb_converter::decode_value_header(
538     Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def,
539     rocksdb::Slice *unpack_slice) {
540   /* If it's a TTL record, skip the 8 byte TTL value */
541   if (pk_def->has_ttl()) {
542     const char *ttl_bytes;
543     if ((ttl_bytes = reader->read(ROCKSDB_SIZEOF_TTL_RECORD))) {
544       memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD);
545     } else {
546       return HA_ERR_ROCKSDB_CORRUPT_DATA;
547     }
548   }
549 
550   /* Other fields are decoded from the value */
551   if (m_null_bytes_length_in_record &&
552       !(m_null_bytes = reader->read(m_null_bytes_length_in_record))) {
553     return HA_ERR_ROCKSDB_CORRUPT_DATA;
554   }
555 
556   if (m_maybe_unpack_info) {
557     const char *unpack_info = reader->get_current_ptr();
558     if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) ||
559         !reader->read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) {
560       return HA_ERR_ROCKSDB_CORRUPT_DATA;
561     }
562 
563     uint16 unpack_info_len =
564         rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1));
565     *unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len);
566 
567     reader->read(unpack_info_len -
568                  Rdb_key_def::get_unpack_header_size(unpack_info[0]));
569   }
570 
571   return HA_EXIT_SUCCESS;
572 }
573 
574 /*
575   Convert RocksDb key slice and value slice to Mysql format
576   @param      key_def        IN           key definition to decode
577   @param      key_slice      IN           RocksDB key slice
578   @param      value_slice    IN           RocksDB value slice
579   @param      dst            OUT          MySql format address
580   @return
581     0      OK
582     other  HA_ERR error code (can be SE-specific)
583 */
convert_record_from_storage_format(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice * const key_slice,const rocksdb::Slice * const value_slice,uchar * const dst)584 int Rdb_converter::convert_record_from_storage_format(
585     const std::shared_ptr<Rdb_key_def> &pk_def,
586     const rocksdb::Slice *const key_slice,
587     const rocksdb::Slice *const value_slice, uchar *const dst) {
588   int err = HA_EXIT_SUCCESS;
589 
590   Rdb_string_reader value_slice_reader(value_slice);
591   rocksdb::Slice unpack_slice;
592   err = decode_value_header(&value_slice_reader, pk_def, &unpack_slice);
593   if (err != HA_EXIT_SUCCESS) {
594     return err;
595   }
596 
597   /*
598     Decode PK fields from the key
599   */
600   if (m_key_requested) {
601     err = pk_def->unpack_record(m_table, dst, key_slice,
602                                 !unpack_slice.empty() ? &unpack_slice : nullptr,
603                                 false /* verify_checksum */);
604   }
605   if (err != HA_EXIT_SUCCESS) {
606     return err;
607   }
608 
609   Rdb_value_field_iterator<Rdb_convert_to_record_value_decoder>
610       value_field_iterator(m_table, &value_slice_reader, this, dst);
611 
612   // Decode value slices
613   while (!value_field_iterator.end_of_fields()) {
614     err = value_field_iterator.next();
615 
616     if (err != HA_EXIT_SUCCESS) {
617       return err;
618     }
619   }
620 
621   if (m_verify_row_debug_checksums) {
622     return verify_row_debug_checksum(pk_def, &value_slice_reader, key_slice,
623                                      value_slice);
624   }
625   return HA_EXIT_SUCCESS;
626 }
627 
628 /*
629   Verify checksum for row
630   @param      pk_def   IN     key def
631   @param      reader   IN     RocksDB value slice reader
632   @param      key      IN     RocksDB key slice
633   @param      value    IN     RocksDB value slice
634   @return
635     0      OK
636     other  HA_ERR error code (can be SE-specific)
637 */
verify_row_debug_checksum(const std::shared_ptr<Rdb_key_def> & pk_def,Rdb_string_reader * reader,const rocksdb::Slice * key,const rocksdb::Slice * value)638 int Rdb_converter::verify_row_debug_checksum(
639     const std::shared_ptr<Rdb_key_def> &pk_def, Rdb_string_reader *reader,
640     const rocksdb::Slice *key, const rocksdb::Slice *value) {
641   if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE &&
642       reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) {
643     uint32_t stored_key_chksum =
644         rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
645     uint32_t stored_val_chksum =
646         rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
647 
648     const uint32_t computed_key_chksum =
649         my_core::my_checksum(0, rdb_slice_to_uchar_ptr(key), key->size());
650     const uint32_t computed_val_chksum =
651         my_core::my_checksum(0, rdb_slice_to_uchar_ptr(value),
652                        value->size() - RDB_CHECKSUM_CHUNK_SIZE);
653 
654     DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;);
655 
656     if (stored_key_chksum != computed_key_chksum) {
657       pk_def->report_checksum_mismatch(true, key->data(), key->size());
658       return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
659     }
660 
661     DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;);
662     if (stored_val_chksum != computed_val_chksum) {
663       pk_def->report_checksum_mismatch(false, value->data(), value->size());
664       return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
665     }
666 
667     m_row_checksums_checked++;
668   }
669   if (reader->remaining_bytes()) {
670     return HA_ERR_ROCKSDB_CORRUPT_DATA;
671   }
672   return HA_EXIT_SUCCESS;
673 }
674 
675 /**
676   Convert record from table->record[0] form into a form that can be written
677   into rocksdb.
678 
679   @param pk_def               IN        Current key def
680   @pk_unpack_info             IN        Unpack info generated during key pack
681   @is_update_row              IN        Whether it is update row
682   @store_row_debug_checksums  IN        Whether to store checksums
683   @param ttl_bytes            IN/OUT    Old ttl value from previous record and
684                                         ttl value during current encode
685   @is_ttl_bytes_updated       OUT       Whether ttl bytes is updated
686   @param value_slice          OUT       Data slice with record data.
687 */
encode_value_slice(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice & pk_packed_slice,Rdb_string_writer * pk_unpack_info,bool is_update_row,bool store_row_debug_checksums,char * ttl_bytes,bool * is_ttl_bytes_updated,rocksdb::Slice * const value_slice)688 int Rdb_converter::encode_value_slice(
689     const std::shared_ptr<Rdb_key_def> &pk_def,
690     const rocksdb::Slice &pk_packed_slice, Rdb_string_writer *pk_unpack_info,
691     bool is_update_row, bool store_row_debug_checksums, char *ttl_bytes,
692     bool *is_ttl_bytes_updated, rocksdb::Slice *const value_slice) {
693   DBUG_ASSERT(pk_def != nullptr);
694   // Currently only primary key will store value slice
695   DBUG_ASSERT(pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
696               pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
697   DBUG_ASSERT_IMP(m_maybe_unpack_info, pk_unpack_info);
698 
699   bool has_ttl = pk_def->has_ttl();
700   bool has_ttl_column = !pk_def->m_ttl_column.empty();
701 
702   m_storage_record.length(0);
703 
704   if (has_ttl) {
705     /* If it's a TTL record, reserve space for 8 byte TTL value in front. */
706     m_storage_record.fill(
707         ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_length_in_record, 0);
708     // NOTE: is_ttl_bytes_updated is only used for update case
709     // During update, skip update sk key/values slice iff none of sk fields
710     // have changed and ttl bytes isn't changed. see
711     // ha_rocksdb::update_write_sk() for more info
712     *is_ttl_bytes_updated = false;
713     char *const data = const_cast<char *>(m_storage_record.ptr());
714     if (has_ttl_column) {
715       DBUG_ASSERT(pk_def->get_ttl_field_index() != UINT_MAX);
716       Field *const field = m_table->field[pk_def->get_ttl_field_index()];
717       DBUG_ASSERT(field->pack_length_in_rec() == ROCKSDB_SIZEOF_TTL_RECORD);
718       DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
719 
720       uint64 ts = uint8korr(field->ptr);
721 #ifndef DBUG_OFF
722       ts += rdb_dbug_set_ttl_rec_ts();
723 #endif
724       rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
725       if (is_update_row) {
726         *is_ttl_bytes_updated =
727             memcmp(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
728       }
729       // Also store in m_ttl_bytes to propagate to update_write_sk
730       memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
731     } else {
732       /*
733         For implicitly generated TTL records we need to copy over the old
734         TTL value from the old record in the event of an update. It was stored
735         in m_ttl_bytes.
736 
737         Otherwise, generate a timestamp using the current time.
738       */
739       if (is_update_row) {
740         memcpy(data, ttl_bytes, sizeof(uint64));
741       } else {
742         uint64 ts = static_cast<uint64>(std::time(nullptr));
743 #ifndef DBUG_OFF
744         ts += rdb_dbug_set_ttl_rec_ts();
745 #endif
746         rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
747         // Also store in m_ttl_bytes to propagate to update_write_sk
748         memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
749       }
750     }
751   } else {
752     /* All NULL bits are initially 0 */
753     m_storage_record.fill(m_null_bytes_length_in_record, 0);
754   }
755 
756   // If a primary key may have non-empty unpack_info for certain values,
757   // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
758   // itself was prepared in Rdb_key_def::pack_record.
759   if (m_maybe_unpack_info) {
760     m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
761                             pk_unpack_info->get_current_pos());
762   }
763   for (uint i = 0; i < m_table->s->fields; i++) {
764     Rdb_field_encoder &encoder = m_encoder_arr[i];
765     /* Don't pack decodable PK key parts */
766     if (encoder.m_storage_type != Rdb_field_encoder::STORE_ALL) {
767       continue;
768     }
769 
770     Field *const field = m_table->field[i];
771     if (encoder.maybe_null()) {
772       char *data = const_cast<char *>(m_storage_record.ptr());
773       if (has_ttl) {
774         data += ROCKSDB_SIZEOF_TTL_RECORD;
775       }
776 
777       if (field->is_null()) {
778         data[encoder.m_null_offset] |= encoder.m_null_mask;
779         /* Don't write anything for NULL values */
780         continue;
781       }
782     }
783 
784     if (encoder.m_field_type == MYSQL_TYPE_BLOB) {
785       my_core::Field_blob *blob =
786           reinterpret_cast<my_core::Field_blob *>(field);
787       /* Get the number of bytes needed to store length*/
788       const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
789 
790       /* Store the length of the value */
791       m_storage_record.append(reinterpret_cast<char *>(blob->ptr),
792                               length_bytes);
793 
794       /* Store the blob value itself */
795       char *data_ptr;
796       memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **));
797       m_storage_record.append(data_ptr, blob->get_length());
798     } else if (encoder.m_field_type == MYSQL_TYPE_VARCHAR) {
799       Field_varstring *const field_var =
800           reinterpret_cast<Field_varstring *>(field);
801       uint data_len;
802       /* field_var->length_bytes is 1 or 2 */
803       if (field_var->length_bytes == 1) {
804         data_len = field_var->ptr[0];
805       } else {
806         DBUG_ASSERT(field_var->length_bytes == 2);
807         data_len = uint2korr(field_var->ptr);
808       }
809       m_storage_record.append(reinterpret_cast<char *>(field_var->ptr),
810                               field_var->length_bytes + data_len);
811     } else {
812       /* Copy the field data */
813       const uint len = field->pack_length_in_rec();
814       m_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
815     }
816   }
817 
818   if (store_row_debug_checksums) {
819     const uint32_t key_crc32 = my_core::my_checksum(
820         0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
821     const uint32_t val_crc32 =
822         my_core::my_checksum(0, rdb_mysql_str_to_uchar_str(&m_storage_record),
823                        m_storage_record.length());
824     uchar key_crc_buf[RDB_CHECKSUM_SIZE];
825     uchar val_crc_buf[RDB_CHECKSUM_SIZE];
826     rdb_netbuf_store_uint32(key_crc_buf, key_crc32);
827     rdb_netbuf_store_uint32(val_crc_buf, val_crc32);
828     m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1);
829     m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE);
830     m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE);
831   }
832 
833   *value_slice =
834       rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length());
835 
836   return HA_EXIT_SUCCESS;
837 }
838 }  // namespace myrocks
839