1 /*
2 Copyright (c) 2015, Facebook, Inc.
3
4 This program is f
5 i the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17 /* This C++ file's header file */
18 #include "./rdb_converter.h"
19
20 /* Standard C++ header files */
21 #include <algorithm>
22 #include <map>
23 #include <string>
24 #include <vector>
25
26 /* MySQL header files */
27 #include "log.h"
28 #include "my_stacktrace.h"
29 #include "sql_array.h"
30
31 /* MyRocks header files */
32 #include "./ha_rocksdb.h"
33 #include "./ha_rocksdb_proto.h"
34 #include "./rdb_datadic.h"
35 #include "./rdb_psi.h"
36 #include "./rdb_utils.h"
37
38 namespace myrocks {
39
dbug_modify_key_varchar8(String * on_disk_rec)40 void dbug_modify_key_varchar8(String *on_disk_rec) {
41 std::string res;
42 // The key starts with index number
43 res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE);
44
45 // Then, a mem-comparable form of a varchar(8) value.
46 res.append("ABCDE\0\0\0\xFC", 9);
47 on_disk_rec->length(0);
48 on_disk_rec->append(res.data(), res.size());
49 }
50
51 /*
52 Convert field from rocksdb storage format into Mysql Record format
53 @param buf OUT start memory to fill converted data
54 @param offset IN/OUT decoded data is stored in buf + offset
55 @param table IN current table
56 @param field IN current field
57 @param reader IN rocksdb value slice reader
58 @param decode IN whether to decode current field
59 @return
60 0 OK
61 other HA_ERR error code (can be SE-specific)
62 */
decode(uchar * const buf,TABLE * table,Rdb_field_encoder * field_dec,Rdb_string_reader * reader,bool decode,bool is_null)63 int Rdb_convert_to_record_value_decoder::decode(uchar *const buf, TABLE *table,
64 Rdb_field_encoder *field_dec,
65 Rdb_string_reader *reader,
66 bool decode, bool is_null) {
67 int err = HA_EXIT_SUCCESS;
68 auto ptr = buf + field_dec->m_field_offset;
69 if (is_null) {
70 if (decode && field_dec->maybe_null()) {
71 // This sets the NULL-bit of this record
72 buf[field_dec->m_field_null_offset] |= field_dec->m_field_null_mask;
73
74 /*
75 Besides that, set the field value to default value. CHECKSUM TABLE
76 depends on this.
77 */
78 memcpy(ptr, table->s->default_values + field_dec->m_field_offset,
79 field_dec->m_field_pack_length);
80 }
81 } else {
82 if (decode && field_dec->maybe_null()) {
83 // sets non-null bits for this record
84 buf[field_dec->m_field_null_offset] &= ~(field_dec->m_field_null_mask);
85 }
86
87 if (field_dec->m_field_type == MYSQL_TYPE_BLOB ||
88 field_dec->m_field_type == MYSQL_TYPE_JSON) {
89 err = decode_blob(table, ptr, field_dec, reader, decode);
90 } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
91 err = decode_varchar(ptr, field_dec, reader, decode);
92 } else {
93 err = decode_fixed_length_field(ptr, field_dec, reader, decode);
94 }
95 }
96
97 return err;
98 }
99
100 /*
101 Convert blob from rocksdb storage format into Mysql Record format
102 @param table IN current table
103 @param field IN current field
104 @param reader IN rocksdb value slice reader
105 @param decode IN whether to decode current field
106 @return
107 0 OK
108 other HA_ERR error code (can be SE-specific)
109 */
decode_blob(TABLE * table,uchar * const buf,Rdb_field_encoder * field_dec,Rdb_string_reader * reader,bool decode)110 int Rdb_convert_to_record_value_decoder::decode_blob(
111 TABLE *table, uchar *const buf, Rdb_field_encoder *field_dec,
112 Rdb_string_reader *reader, bool decode) {
113 // Get the number of bytes needed to store length
114 const uint length_bytes =
115 field_dec->m_field_pack_length - portable_sizeof_char_ptr;
116
117 const char *data_len_str;
118 if (!(data_len_str = reader->read(length_bytes))) {
119 return HA_ERR_ROCKSDB_CORRUPT_DATA;
120 }
121
122 memcpy(buf, data_len_str, length_bytes);
123 uint32 data_len =
124 Field_blob::get_length(reinterpret_cast<const uchar *>(data_len_str),
125 length_bytes, table->s->db_low_byte_first);
126 const char *blob_ptr;
127 if (!(blob_ptr = reader->read(data_len))) {
128 return HA_ERR_ROCKSDB_CORRUPT_DATA;
129 }
130
131 if (decode) {
132 // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
133 // platforms)
134 memset(buf + length_bytes, 0, 8);
135 memcpy(buf + length_bytes, &blob_ptr, sizeof(uchar **));
136 }
137
138 return HA_EXIT_SUCCESS;
139 }
140
141 /*
142 Convert fixed length field from rocksdb storage format into Mysql Record
143 format
144 @param field IN current field
145 @param field_dec IN data structure conttain field encoding data
146 @param reader IN rocksdb value slice reader
147 @param decode IN whether to decode current field
148 @return
149 0 OK
150 other HA_ERR error code (can be SE-specific)
151 */
decode_fixed_length_field(uchar * const buf,Rdb_field_encoder * field_dec,Rdb_string_reader * const reader,bool decode)152 int Rdb_convert_to_record_value_decoder::decode_fixed_length_field(
153 uchar *const buf, Rdb_field_encoder *field_dec,
154 Rdb_string_reader *const reader, bool decode) {
155 uint len = field_dec->m_field_pack_length;
156 if (len > 0) {
157 const char *data_bytes;
158 if ((data_bytes = reader->read(len)) == nullptr) {
159 return HA_ERR_ROCKSDB_CORRUPT_DATA;
160 }
161
162 if (decode) {
163 memcpy(buf, data_bytes, len);
164 }
165 }
166
167 return HA_EXIT_SUCCESS;
168 }
169
170 /*
171 Convert varchar field from rocksdb storage format into Mysql Record format
172 @param field IN current field
173 @param field_dec IN data structure conttain field encoding data
174 @param reader IN rocksdb value slice reader
175 @param decode IN whether to decode current field
176 @return
177 0 OK
178 other HA_ERR error code (can be SE-specific)
179 */
decode_varchar(uchar * const buf,Rdb_field_encoder * field_dec,Rdb_string_reader * const reader,bool decode)180 int Rdb_convert_to_record_value_decoder::decode_varchar(
181 uchar *const buf, Rdb_field_encoder *field_dec,
182 Rdb_string_reader *const reader, bool decode) {
183 const char *data_len_str;
184 if (!(data_len_str = reader->read(field_dec->m_field_length_bytes))) {
185 return HA_ERR_ROCKSDB_CORRUPT_DATA;
186 }
187
188 uint data_len;
189 // field_dec->length_bytes is 1 or 2
190 if (field_dec->m_field_length_bytes == 1) {
191 data_len = (uchar)data_len_str[0];
192 } else {
193 assert(field_dec->m_field_length_bytes == 2);
194 data_len = uint2korr(data_len_str);
195 }
196
197 if (data_len > field_dec->m_field_length) {
198 // The data on disk is longer than table DDL allows?
199 return HA_ERR_ROCKSDB_CORRUPT_DATA;
200 }
201
202 if (!reader->read(data_len)) {
203 return HA_ERR_ROCKSDB_CORRUPT_DATA;
204 }
205
206 if (decode) {
207 memcpy(buf, data_len_str, field_dec->m_field_length_bytes + data_len);
208 }
209
210 return HA_EXIT_SUCCESS;
211 }
212
213 template <typename value_field_decoder>
Rdb_value_field_iterator(TABLE * table,Rdb_string_reader * value_slice_reader,const Rdb_converter * rdb_converter,uchar * const buf)214 Rdb_value_field_iterator<value_field_decoder>::Rdb_value_field_iterator(
215 TABLE *table, Rdb_string_reader *value_slice_reader,
216 const Rdb_converter *rdb_converter, uchar *const buf)
217 : m_buf(buf) {
218 assert(table != nullptr);
219 assert(buf != nullptr);
220
221 m_table = table;
222 m_value_slice_reader = value_slice_reader;
223 auto fields = rdb_converter->get_decode_fields();
224 m_field_iter = fields->begin();
225 m_field_end = fields->end();
226 m_null_bytes = rdb_converter->get_null_bytes();
227 }
228
229 // Iterate each requested field and decode one by one
230 template <typename value_field_decoder>
next()231 int Rdb_value_field_iterator<value_field_decoder>::next() {
232 int err = HA_EXIT_SUCCESS;
233 while (m_field_iter != m_field_end) {
234 m_field_dec = m_field_iter->m_field_enc;
235 bool decode = m_field_iter->m_decode;
236 bool maybe_null = m_field_dec->maybe_null();
237 // This is_null value is bind to how stroage format store its value
238 m_is_null = maybe_null && ((m_null_bytes[m_field_dec->m_null_offset] &
239 m_field_dec->m_null_mask) != 0);
240
241 // Skip the bytes we need to skip
242 int skip = m_field_iter->m_skip;
243 if (skip && !m_value_slice_reader->read(skip)) {
244 return HA_ERR_ROCKSDB_CORRUPT_DATA;
245 }
246
247 // Decode each field
248 err = value_field_decoder::decode(m_buf, m_table, m_field_dec,
249 m_value_slice_reader, decode, m_is_null);
250 if (err != HA_EXIT_SUCCESS) {
251 return err;
252 }
253
254 m_field_iter++;
255
256 // Only break for the field that are actually decoding rather than skipping
257 if (decode) {
258 break;
259 }
260 }
261 return err;
262 }
263
264 template <typename value_field_decoder>
end_of_fields() const265 bool Rdb_value_field_iterator<value_field_decoder>::end_of_fields() const {
266 return m_field_iter == m_field_end;
267 }
268
269 template <typename value_field_decoder>
get_dst() const270 void *Rdb_value_field_iterator<value_field_decoder>::get_dst() const {
271 assert(m_buf != nullptr);
272 return m_buf + m_offset;
273 }
274
275 template <typename value_field_decoder>
get_field_index() const276 int Rdb_value_field_iterator<value_field_decoder>::get_field_index() const {
277 assert(m_field_dec != nullptr);
278 return m_field_dec->m_field_index;
279 }
280
281 template <typename value_field_decoder>
get_field_type() const282 enum_field_types Rdb_value_field_iterator<value_field_decoder>::get_field_type()
283 const {
284 assert(m_field_dec != nullptr);
285 return m_field_dec->m_field_type;
286 }
287
288 template <typename value_field_decoder>
is_null() const289 bool Rdb_value_field_iterator<value_field_decoder>::is_null() const {
290 return m_is_null;
291 }
292
293 /*
294 Initialize Rdb_converter with table data
295 @param thd IN Thread context
296 @param tbl_def IN MyRocks table definition
297 @param table IN Current open table
298 */
Rdb_converter(const THD * thd,const Rdb_tbl_def * tbl_def,TABLE * table)299 Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def,
300 TABLE *table)
301 : m_thd(thd), m_tbl_def(tbl_def), m_table(table) {
302 assert(thd != nullptr);
303 assert(tbl_def != nullptr);
304 assert(table != nullptr);
305
306 m_key_requested = false;
307 m_verify_row_debug_checksums = false;
308 m_maybe_unpack_info = false;
309 m_row_checksums_checked = 0;
310 m_null_bytes = nullptr;
311 setup_field_encoders();
312 m_lookup_bitmap = {nullptr, 0, 0, nullptr, nullptr};
313 }
314
~Rdb_converter()315 Rdb_converter::~Rdb_converter() {
316 my_free(m_encoder_arr);
317 m_encoder_arr = nullptr;
318 // These are needed to suppress valgrind errors in rocksdb.partition
319 m_storage_record.mem_free();
320 bitmap_free(&m_lookup_bitmap);
321 }
322
323 /*
324 Decide storage type for each encoder
325 */
get_storage_type(Rdb_field_encoder * const encoder,const uint kp)326 void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
327 const uint kp) {
328 auto pk_descr =
329 m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)];
330 // STORE_SOME uses unpack_info.
331 if (pk_descr->has_unpack_info(kp)) {
332 assert(pk_descr->can_unpack(kp));
333 encoder->m_storage_type = Rdb_field_encoder::STORE_SOME;
334 m_maybe_unpack_info = true;
335 } else if (pk_descr->can_unpack(kp)) {
336 encoder->m_storage_type = Rdb_field_encoder::STORE_NONE;
337 }
338 }
339
340 /*
341 @brief
342 Setup which fields will be unpacked when reading rows
343
344 @detail
345 Two special cases when we still unpack all fields:
346 - When client requires decode_all_fields, such as this table is being
347 updated (m_lock_rows==RDB_LOCK_WRITE).
348 - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to
349 read all fields to find whether there is a row checksum at the end. We could
350 skip the fields instead of decoding them, but currently we do decoding.)
351
352 @seealso
353 Rdb_converter::setup_field_encoders()
354 Rdb_converter::convert_record_from_storage_format()
355 */
setup_field_decoders(const MY_BITMAP * field_map,uint active_index,bool keyread_only,bool decode_all_fields)356 void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
357 uint active_index, bool keyread_only,
358 bool decode_all_fields) {
359 m_key_requested = false;
360 m_decoders_vect.clear();
361 bitmap_free(&m_lookup_bitmap);
362 int last_useful = 0;
363 int skip_size = 0;
364
365 for (uint i = 0; i < m_table->s->fields; i++) {
366 bool field_requested =
367 decode_all_fields || m_verify_row_debug_checksums ||
368 bitmap_is_set(field_map, m_table->field[i]->field_index);
369
370 // We only need the decoder if the whole record is stored.
371 if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
372 // the field potentially needs unpacking
373 if (field_requested) {
374 // the field is in the read set
375 m_key_requested = true;
376 }
377 continue;
378 }
379
380 if (field_requested) {
381 // We will need to decode this field
382 m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size});
383 last_useful = m_decoders_vect.size();
384 skip_size = 0;
385 } else {
386 if (m_encoder_arr[i].uses_variable_len_encoding() ||
387 m_encoder_arr[i].maybe_null()) {
388 // For variable-length field, we need to read the data and skip it
389 m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size});
390 skip_size = 0;
391 } else {
392 // Fixed-width field can be skipped without looking at it.
393 // Add appropriate skip_size to the next field.
394 skip_size += m_encoder_arr[i].m_field_pack_length;
395 }
396 }
397 }
398
399 // It could be that the last few elements are varchars that just do
400 // skipping. Remove them.
401 m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
402 m_decoders_vect.end());
403
404 if (!keyread_only && active_index != m_table->s->primary_key) {
405 m_tbl_def->m_key_descr_arr[active_index]->get_lookup_bitmap(
406 m_table, &m_lookup_bitmap);
407 }
408 }
409
setup_field_encoders()410 void Rdb_converter::setup_field_encoders() {
411 uint null_bytes_length = 0;
412 uchar cur_null_mask = 0x1;
413
414 m_encoder_arr = static_cast<Rdb_field_encoder *>(
415 #ifdef HAVE_PSI_INTERFACE
416 my_malloc(rdb_handler_memory_key,
417 m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
418 #else
419 my_malloc(PSI_NOT_INSTRUMENTED,
420 m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
421 #endif
422 if (m_encoder_arr == nullptr) {
423 return;
424 }
425
426 for (uint i = 0; i < m_table->s->fields; i++) {
427 Field *const field = m_table->field[i];
428 m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL;
429
430 /*
431 Check if this field is
432 - a part of primary key, and
433 - it can be decoded back from its key image.
434 If both hold, we don't need to store this field in the value part of
435 RocksDB's key-value pair.
436
437 If hidden pk exists, we skip this check since the field will never be
438 part of the hidden pk.
439 */
440 if (!Rdb_key_def::table_has_hidden_pk(m_table)) {
441 KEY *const pk_info = &m_table->key_info[m_table->s->primary_key];
442 for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) {
443 // key_part->fieldnr is counted from 1
444 if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) {
445 get_storage_type(&m_encoder_arr[i], kp);
446 break;
447 }
448 }
449 }
450
451 /*
452 The difference between pack_length and pack_length_in_rec is fairly
453 subtle. The only difference is in Field_bit case where it borrows some
454 bits in null bytes in memory to store the 'uneven' high bits, therefore
455 the pack_length is the length of remaining bits while the
456 pack_length_in_rec is the full length of all bits when you store it on
457 disk. Only MyIsam and archive supports it, indicating by
458 HA_CAN_BIT_FIELD. We don't handle this case today at all (nor do we need
459 to), and we use pack_length everywhere, so just assert it and move on.
460 */
461 assert(field->pack_length() == field->pack_length_in_rec());
462
463 auto field_type = field->real_type();
464 m_encoder_arr[i].m_field_type = field_type;
465 m_encoder_arr[i].m_field_index = i;
466 m_encoder_arr[i].m_field_pack_length = field->pack_length();
467 m_encoder_arr[i].m_field_offset = field->ptr - m_table->record[0];
468
469 if (field_type == MYSQL_TYPE_VARCHAR) {
470 auto varchar = reinterpret_cast<const Field_varstring *>(field);
471 m_encoder_arr[i].m_field_length = varchar->field_length;
472 m_encoder_arr[i].m_field_length_bytes = varchar->length_bytes;
473 } else {
474 m_encoder_arr[i].m_field_length = -1;
475 m_encoder_arr[i].m_field_length_bytes = -1;
476 }
477
478 auto maybe_null = field->real_maybe_null();
479 if (maybe_null) {
480 m_encoder_arr[i].m_null_mask = cur_null_mask;
481 m_encoder_arr[i].m_null_offset = null_bytes_length;
482 m_encoder_arr[i].m_field_null_offset = field->null_offset();
483 m_encoder_arr[i].m_field_null_mask = field->null_bit;
484 if (cur_null_mask == 0x80) {
485 cur_null_mask = 0x1;
486 null_bytes_length++;
487 } else {
488 cur_null_mask = cur_null_mask << 1;
489 }
490 } else {
491 m_encoder_arr[i].m_null_offset = 0;
492 m_encoder_arr[i].m_null_mask = 0;
493 }
494 }
495
496 // Count the last, unfinished NULL-bits byte
497 if (cur_null_mask != 0x1) {
498 null_bytes_length++;
499 }
500
501 m_null_bytes_length_in_record = null_bytes_length;
502 }
503
504 /*
505 EntryPoint for Decode:
506 Decode key slice(if requested) and value slice using built-in field
507 decoders
508 @param key_def IN key definition to decode
509 @param dst OUT Mysql buffer to fill decoded content
510 @param key_slice IN RocksDB key slice to decode
511 @param value_slice IN RocksDB value slice to decode
512 @return
513 0 OK
514 other HA_ERR error code (can be SE-specific)
515 */
decode(const std::shared_ptr<Rdb_key_def> & key_def,uchar * dst,const rocksdb::Slice * key_slice,const rocksdb::Slice * value_slice,bool decode_value)516 int Rdb_converter::decode(const std::shared_ptr<Rdb_key_def> &key_def,
517 uchar *dst, // address to fill data
518 const rocksdb::Slice *key_slice,
519 const rocksdb::Slice *value_slice,
520 bool decode_value) {
521 // Currently only support decode primary key, Will add decode secondary later
522 assert(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
523 key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
524
525 const rocksdb::Slice *updated_key_slice = key_slice;
526 #ifndef NDEBUG
527 String last_rowkey;
528 last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin);
529 DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
530 { dbug_modify_key_varchar8(&last_rowkey); });
531 rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length());
532 updated_key_slice = &rowkey_slice;
533 #endif
534 return convert_record_from_storage_format(key_def, updated_key_slice,
535 value_slice, dst);
536 }
537
538 /*
539 Decode value slice header
540 @param reader IN value slice reader
541 @param pk_def IN key definition to decode
542 @param unpack_slice OUT unpack info slice
543 @return
544 0 OK
545 other HA_ERR error code (can be SE-specific)
546 */
decode_value_header(Rdb_string_reader * reader,const std::shared_ptr<Rdb_key_def> & pk_def,rocksdb::Slice * unpack_slice)547 int Rdb_converter::decode_value_header(
548 Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def,
549 rocksdb::Slice *unpack_slice) {
550 /* If it's a TTL record, skip the 8 byte TTL value */
551 if (pk_def->has_ttl()) {
552 const char *ttl_bytes;
553 if ((ttl_bytes = reader->read(ROCKSDB_SIZEOF_TTL_RECORD))) {
554 memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD);
555 } else {
556 return HA_ERR_ROCKSDB_CORRUPT_DATA;
557 }
558 }
559
560 /* Other fields are decoded from the value */
561 if (m_null_bytes_length_in_record &&
562 !(m_null_bytes = reader->read(m_null_bytes_length_in_record))) {
563 return HA_ERR_ROCKSDB_CORRUPT_DATA;
564 }
565
566 if (m_maybe_unpack_info) {
567 const char *unpack_info = reader->get_current_ptr();
568 if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) ||
569 !reader->read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) {
570 return HA_ERR_ROCKSDB_CORRUPT_DATA;
571 }
572
573 uint16 unpack_info_len =
574 rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1));
575 *unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len);
576
577 reader->read(unpack_info_len -
578 Rdb_key_def::get_unpack_header_size(unpack_info[0]));
579 }
580
581 return HA_EXIT_SUCCESS;
582 }
583
584 /*
585 Convert RocksDb key slice and value slice to Mysql format
586 @param key_def IN key definition to decode
587 @param key_slice IN RocksDB key slice
588 @param value_slice IN RocksDB value slice
589 @param dst OUT MySql format address
590 @return
591 0 OK
592 other HA_ERR error code (can be SE-specific)
593 */
convert_record_from_storage_format(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice * const key_slice,const rocksdb::Slice * const value_slice,uchar * const dst,bool decode_value)594 int Rdb_converter::convert_record_from_storage_format(
595 const std::shared_ptr<Rdb_key_def> &pk_def,
596 const rocksdb::Slice *const key_slice,
597 const rocksdb::Slice *const value_slice, uchar *const dst,
598 bool decode_value) {
599 bool skip_value = !decode_value || get_decode_fields()->size() == 0;
600 if (!m_key_requested && skip_value) {
601 return HA_EXIT_SUCCESS;
602 }
603
604 int err = HA_EXIT_SUCCESS;
605
606 Rdb_string_reader value_slice_reader(value_slice);
607 rocksdb::Slice unpack_slice;
608 err = decode_value_header(&value_slice_reader, pk_def, &unpack_slice);
609 if (err != HA_EXIT_SUCCESS) {
610 return err;
611 }
612
613 /*
614 Decode PK fields from the key
615 */
616 if (m_key_requested) {
617 err = pk_def->unpack_record(m_table, dst, key_slice,
618 !unpack_slice.empty() ? &unpack_slice : nullptr,
619 false /* verify_checksum */);
620 if (err != HA_EXIT_SUCCESS) {
621 return err;
622 }
623 }
624
625 if (skip_value) {
626 // We are done
627 return HA_EXIT_SUCCESS;
628 }
629
630 Rdb_value_field_iterator<Rdb_convert_to_record_value_decoder>
631 value_field_iterator(m_table, &value_slice_reader, this, dst);
632
633 // Decode value slices
634 while (!value_field_iterator.end_of_fields()) {
635 err = value_field_iterator.next();
636
637 if (err != HA_EXIT_SUCCESS) {
638 return err;
639 }
640 }
641
642 if (m_verify_row_debug_checksums) {
643 return verify_row_debug_checksum(pk_def, &value_slice_reader, key_slice,
644 value_slice);
645 }
646 return HA_EXIT_SUCCESS;
647 }
648
649 /*
650 Verify checksum for row
651 @param pk_def IN key def
652 @param reader IN RocksDB value slice reader
653 @param key IN RocksDB key slice
654 @param value IN RocksDB value slice
655 @return
656 0 OK
657 other HA_ERR error code (can be SE-specific)
658 */
verify_row_debug_checksum(const std::shared_ptr<Rdb_key_def> & pk_def,Rdb_string_reader * reader,const rocksdb::Slice * key,const rocksdb::Slice * value)659 int Rdb_converter::verify_row_debug_checksum(
660 const std::shared_ptr<Rdb_key_def> &pk_def, Rdb_string_reader *reader,
661 const rocksdb::Slice *key, const rocksdb::Slice *value) {
662 if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE &&
663 reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) {
664 uint32_t stored_key_chksum =
665 rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
666 uint32_t stored_val_chksum =
667 rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
668
669 const ha_checksum computed_key_chksum =
670 my_core::my_checksum(0, rdb_slice_to_uchar_ptr(key), key->size());
671 const ha_checksum computed_val_chksum =
672 my_core::my_checksum(0, rdb_slice_to_uchar_ptr(value),
673 value->size() - RDB_CHECKSUM_CHUNK_SIZE);
674
675 DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;);
676
677 if (stored_key_chksum != computed_key_chksum) {
678 pk_def->report_checksum_mismatch(true, key->data(), key->size());
679 return HA_ERR_ROCKSDB_CORRUPT_DATA;
680 }
681
682 DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;);
683 if (stored_val_chksum != computed_val_chksum) {
684 pk_def->report_checksum_mismatch(false, value->data(), value->size());
685 return HA_ERR_ROCKSDB_CORRUPT_DATA;
686 }
687
688 m_row_checksums_checked++;
689 }
690 if (reader->remaining_bytes()) {
691 return HA_ERR_ROCKSDB_CORRUPT_DATA;
692 }
693 return HA_EXIT_SUCCESS;
694 }
695
696 /**
697 Convert record from table->record[0] form into a form that can be written
698 into rocksdb.
699
700 @param pk_def IN Current key def
701 @pk_unpack_info IN Unpack info generated during key pack
702 @is_update_row IN Whether it is update row
703 @store_row_debug_checksums IN Whether to store checksums
704 @param ttl_bytes IN/OUT Old ttl value from previous record and
705 ttl value during current encode
706 @is_ttl_bytes_updated OUT Whether ttl bytes is updated
707 @param value_slice OUT Data slice with record data.
708 */
encode_value_slice(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice & pk_packed_slice,Rdb_string_writer * pk_unpack_info,bool is_update_row,bool store_row_debug_checksums,char * ttl_bytes,bool * is_ttl_bytes_updated,rocksdb::Slice * const value_slice)709 int Rdb_converter::encode_value_slice(
710 const std::shared_ptr<Rdb_key_def> &pk_def,
711 const rocksdb::Slice &pk_packed_slice, Rdb_string_writer *pk_unpack_info,
712 bool is_update_row, bool store_row_debug_checksums, char *ttl_bytes,
713 bool *is_ttl_bytes_updated, rocksdb::Slice *const value_slice) {
714 assert(pk_def != nullptr);
715 // Currently only primary key will store value slice
716 assert(pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
717 pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
718 assert_IMP(m_maybe_unpack_info, pk_unpack_info);
719
720 bool has_ttl = pk_def->has_ttl();
721 bool has_ttl_column = !pk_def->m_ttl_column.empty();
722
723 m_storage_record.length(0);
724
725 if (has_ttl) {
726 /* If it's a TTL record, reserve space for 8 byte TTL value in front. */
727 m_storage_record.fill(
728 ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_length_in_record, 0);
729 // NOTE: is_ttl_bytes_updated is only used for update case
730 // During update, skip update sk key/values slice iff none of sk fields
731 // have changed and ttl bytes isn't changed. see
732 // ha_rocksdb::update_write_sk() for more info
733 *is_ttl_bytes_updated = false;
734 char *const data = const_cast<char *>(m_storage_record.ptr());
735 if (has_ttl_column) {
736 assert(pk_def->get_ttl_field_index() != UINT_MAX);
737 Field *const field = m_table->field[pk_def->get_ttl_field_index()];
738 assert(field->pack_length_in_rec() == ROCKSDB_SIZEOF_TTL_RECORD);
739 assert(field->real_type() == MYSQL_TYPE_LONGLONG);
740
741 uint64 ts = uint8korr(field->ptr);
742 #ifndef NDEBUG
743 ts += rdb_dbug_set_ttl_rec_ts();
744 #endif
745 rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
746 if (is_update_row) {
747 *is_ttl_bytes_updated =
748 memcmp(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
749 }
750 // Also store in m_ttl_bytes to propagate to update_write_sk
751 memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
752 } else {
753 /*
754 For implicitly generated TTL records we need to copy over the old
755 TTL value from the old record in the event of an update. It was stored
756 in m_ttl_bytes.
757
758 Otherwise, generate a timestamp using the current time.
759 */
760 if (is_update_row) {
761 memcpy(data, ttl_bytes, sizeof(uint64));
762 } else {
763 uint64 ts = static_cast<uint64>(std::time(nullptr));
764 #ifndef NDEBUG
765 ts += rdb_dbug_set_ttl_rec_ts();
766 #endif
767 rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
768 // Also store in m_ttl_bytes to propagate to update_write_sk
769 memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
770 }
771 }
772 } else {
773 /* All NULL bits are initially 0 */
774 m_storage_record.fill(m_null_bytes_length_in_record, 0);
775 }
776
777 // If a primary key may have non-empty unpack_info for certain values,
778 // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
779 // itself was prepared in Rdb_key_def::pack_record.
780 if (m_maybe_unpack_info) {
781 m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
782 pk_unpack_info->get_current_pos());
783 }
784 for (uint i = 0; i < m_table->s->fields; i++) {
785 Rdb_field_encoder &encoder = m_encoder_arr[i];
786 /* Don't pack decodable PK key parts */
787 if (encoder.m_storage_type != Rdb_field_encoder::STORE_ALL) {
788 continue;
789 }
790
791 Field *const field = m_table->field[i];
792
793 if (encoder.maybe_null()) {
794 char *data = const_cast<char *>(m_storage_record.ptr());
795 if (has_ttl) {
796 data += ROCKSDB_SIZEOF_TTL_RECORD;
797 }
798
799 if (field->is_null()) {
800 data[encoder.m_null_offset] |= encoder.m_null_mask;
801 /* Don't write anything for NULL values */
802 continue;
803 }
804 }
805
806 if (encoder.m_field_type == MYSQL_TYPE_BLOB ||
807 encoder.m_field_type == MYSQL_TYPE_JSON) {
808 my_core::Field_blob *blob =
809 reinterpret_cast<my_core::Field_blob *>(field);
810 /* Get the number of bytes needed to store length*/
811 const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
812
813 /* Store the length of the value */
814 m_storage_record.append(reinterpret_cast<char *>(blob->ptr),
815 length_bytes);
816
817 /* Store the blob value itself */
818 char *data_ptr;
819 memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **));
820 m_storage_record.append(data_ptr, blob->get_length());
821 } else if (encoder.m_field_type == MYSQL_TYPE_VARCHAR) {
822 Field_varstring *const field_var =
823 reinterpret_cast<Field_varstring *>(field);
824 uint data_len;
825 /* field_var->length_bytes is 1 or 2 */
826 if (field_var->length_bytes == 1) {
827 data_len = field_var->ptr[0];
828 } else {
829 assert(field_var->length_bytes == 2);
830 data_len = uint2korr(field_var->ptr);
831 }
832 m_storage_record.append(reinterpret_cast<char *>(field_var->ptr),
833 field_var->length_bytes + data_len);
834 } else {
835 /* Copy the field data */
836 const uint len = field->pack_length();
837 m_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
838 }
839 }
840
841 if (store_row_debug_checksums) {
842 const ha_checksum key_crc32 = my_core::my_checksum(
843 0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
844 const ha_checksum val_crc32 =
845 my_core::my_checksum(0, rdb_mysql_str_to_uchar_str(&m_storage_record),
846 m_storage_record.length());
847 uchar key_crc_buf[RDB_CHECKSUM_SIZE];
848 uchar val_crc_buf[RDB_CHECKSUM_SIZE];
849 rdb_netbuf_store_uint32(key_crc_buf, key_crc32);
850 rdb_netbuf_store_uint32(val_crc_buf, val_crc32);
851 m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1);
852 m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE);
853 m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE);
854 }
855
856 *value_slice =
857 rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length());
858
859 return HA_EXIT_SUCCESS;
860 }
861 } // namespace myrocks
862