1 /*
2 Copyright (c) 2015, Facebook, Inc.
3
4 This program is f
5 i the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
17 #include <my_global.h>
18
19 /* This C++ file's header file */
20 #include "./rdb_converter.h"
21
22 /* Standard C++ header files */
23 #include <algorithm>
24 #include <map>
25 #include <string>
26 #include <vector>
27
28 /* MySQL header files */
29 #include "./field.h"
30 #include "./key.h"
31 #include "./m_ctype.h"
32 #include "./my_bit.h"
33 #include "./my_bitmap.h"
34 #include "./sql_table.h"
35
36
37 /* MyRocks header files */
38 #include "./ha_rocksdb.h"
39 #include "./ha_rocksdb_proto.h"
40 #include "./my_stacktrace.h"
41 #include "./rdb_cf_manager.h"
42 #include "./rdb_psi.h"
43 #include "./rdb_utils.h"
44
45
46 namespace myrocks {
47
dbug_modify_key_varchar8(String * on_disk_rec)48 void dbug_modify_key_varchar8(String *on_disk_rec) {
49 std::string res;
50 // The key starts with index number
51 res.append(on_disk_rec->ptr(), Rdb_key_def::INDEX_NUMBER_SIZE);
52
53 // Then, a mem-comparable form of a varchar(8) value.
54 res.append("ABCDE\0\0\0\xFC", 9);
55 on_disk_rec->length(0);
56 on_disk_rec->append(res.data(), res.size());
57 }
58
59 /*
60 Convert field from rocksdb storage format into Mysql Record format
61 @param buf OUT start memory to fill converted data
62 @param offset IN/OUT decoded data is stored in buf + offset
63 @param table IN current table
64 @param field IN current field
65 @param reader IN rocksdb value slice reader
66 @param decode IN whether to decode current field
67 @return
68 0 OK
69 other HA_ERR error code (can be SE-specific)
70 */
decode(uchar * const buf,uint * offset,TABLE * table,my_core::Field * field,Rdb_field_encoder * field_dec,Rdb_string_reader * reader,bool decode,bool is_null)71 int Rdb_convert_to_record_value_decoder::decode(uchar *const buf, uint *offset,
72 TABLE *table,
73 my_core::Field *field,
74 Rdb_field_encoder *field_dec,
75 Rdb_string_reader *reader,
76 bool decode, bool is_null) {
77 int err = HA_EXIT_SUCCESS;
78
79 uint field_offset = field->ptr - table->record[0];
80 *offset = field_offset;
81 uint null_offset = field->null_offset();
82 bool maybe_null = field->real_maybe_null();
83 field->move_field(buf + field_offset,
84 maybe_null ? buf + null_offset : nullptr, field->null_bit);
85
86 if (is_null) {
87 if (decode) {
88 // This sets the NULL-bit of this record
89 field->set_null();
90 /*
91 Besides that, set the field value to default value. CHECKSUM TABLE
92 depends on this.
93 */
94 memcpy(field->ptr, table->s->default_values + field_offset,
95 field->pack_length());
96 }
97 } else {
98 if (decode) {
99 // sets non-null bits for this record
100 field->set_notnull();
101 }
102
103 if (field_dec->m_field_type == MYSQL_TYPE_BLOB) {
104 err = decode_blob(table, field, reader, decode);
105 } else if (field_dec->m_field_type == MYSQL_TYPE_VARCHAR) {
106 err = decode_varchar(field, reader, decode);
107 } else {
108 err = decode_fixed_length_field(field, field_dec, reader, decode);
109 }
110 }
111
112 // Restore field->ptr and field->null_ptr
113 field->move_field(table->record[0] + field_offset,
114 maybe_null ? table->record[0] + null_offset : nullptr,
115 field->null_bit);
116
117 return err;
118 }
119
120 /*
121 Convert blob from rocksdb storage format into Mysql Record format
122 @param table IN current table
123 @param field IN current field
124 @param reader IN rocksdb value slice reader
125 @param decode IN whether to decode current field
126 @return
127 0 OK
128 other HA_ERR error code (can be SE-specific)
129 */
decode_blob(TABLE * table,Field * field,Rdb_string_reader * reader,bool decode)130 int Rdb_convert_to_record_value_decoder::decode_blob(TABLE *table, Field *field,
131 Rdb_string_reader *reader,
132 bool decode) {
133 my_core::Field_blob *blob = (my_core::Field_blob *)field;
134
135 // Get the number of bytes needed to store length
136 const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
137
138 const char *data_len_str;
139 if (!(data_len_str = reader->read(length_bytes))) {
140 return HA_ERR_ROCKSDB_CORRUPT_DATA;
141 }
142
143 memcpy(blob->ptr, data_len_str, length_bytes);
144 uint32 data_len =
145 blob->get_length(reinterpret_cast<const uchar *>(data_len_str),
146 length_bytes);
147 const char *blob_ptr;
148 if (!(blob_ptr = reader->read(data_len))) {
149 return HA_ERR_ROCKSDB_CORRUPT_DATA;
150 }
151
152 if (decode) {
153 // set 8-byte pointer to 0, like innodb does (relevant for 32-bit
154 // platforms)
155 memset(blob->ptr + length_bytes, 0, 8);
156 memcpy(blob->ptr + length_bytes, &blob_ptr, sizeof(uchar **));
157 }
158
159 return HA_EXIT_SUCCESS;
160 }
161
162 /*
163 Convert fixed length field from rocksdb storage format into Mysql Record
164 format
165 @param field IN current field
166 @param field_dec IN data structure conttain field encoding data
167 @param reader IN rocksdb value slice reader
168 @param decode IN whether to decode current field
169 @return
170 0 OK
171 other HA_ERR error code (can be SE-specific)
172 */
decode_fixed_length_field(my_core::Field * const field,Rdb_field_encoder * field_dec,Rdb_string_reader * const reader,bool decode)173 int Rdb_convert_to_record_value_decoder::decode_fixed_length_field(
174 my_core::Field *const field, Rdb_field_encoder *field_dec,
175 Rdb_string_reader *const reader, bool decode) {
176 uint len = field_dec->m_pack_length_in_rec;
177 if (len > 0) {
178 const char *data_bytes;
179 if ((data_bytes = reader->read(len)) == nullptr) {
180 return HA_ERR_ROCKSDB_CORRUPT_DATA;
181 }
182
183 if (decode) {
184 memcpy(field->ptr, data_bytes, len);
185 }
186 }
187
188 return HA_EXIT_SUCCESS;
189 }
190
191 /*
192 Convert varchar field from rocksdb storage format into Mysql Record format
193 @param field IN current field
194 @param field_dec IN data structure conttain field encoding data
195 @param reader IN rocksdb value slice reader
196 @param decode IN whether to decode current field
197 @return
198 0 OK
199 other HA_ERR error code (can be SE-specific)
200 */
decode_varchar(Field * field,Rdb_string_reader * const reader,bool decode)201 int Rdb_convert_to_record_value_decoder::decode_varchar(
202 Field *field, Rdb_string_reader *const reader, bool decode) {
203 my_core::Field_varstring *const field_var = (my_core::Field_varstring *)field;
204
205 const char *data_len_str;
206 if (!(data_len_str = reader->read(field_var->length_bytes))) {
207 return HA_ERR_ROCKSDB_CORRUPT_DATA;
208 }
209
210 uint data_len;
211 // field_var->length_bytes is 1 or 2
212 if (field_var->length_bytes == 1) {
213 data_len = (uchar)data_len_str[0];
214 } else {
215 DBUG_ASSERT(field_var->length_bytes == 2);
216 data_len = uint2korr(data_len_str);
217 }
218
219 if (data_len > field_var->field_length) {
220 // The data on disk is longer than table DDL allows?
221 return HA_ERR_ROCKSDB_CORRUPT_DATA;
222 }
223
224 if (!reader->read(data_len)) {
225 return HA_ERR_ROCKSDB_CORRUPT_DATA;
226 }
227
228 if (decode) {
229 memcpy(field_var->ptr, data_len_str, field_var->length_bytes + data_len);
230 }
231
232 return HA_EXIT_SUCCESS;
233 }
234
235 template <typename value_field_decoder>
Rdb_value_field_iterator(TABLE * table,Rdb_string_reader * value_slice_reader,const Rdb_converter * rdb_converter,uchar * const buf)236 Rdb_value_field_iterator<value_field_decoder>::Rdb_value_field_iterator(
237 TABLE *table, Rdb_string_reader *value_slice_reader,
238 const Rdb_converter *rdb_converter, uchar *const buf)
239 : m_buf(buf) {
240 DBUG_ASSERT(table != nullptr);
241 DBUG_ASSERT(buf != nullptr);
242
243 m_table = table;
244 m_value_slice_reader = value_slice_reader;
245 auto fields = rdb_converter->get_decode_fields();
246 m_field_iter = fields->begin();
247 m_field_end = fields->end();
248 m_null_bytes = rdb_converter->get_null_bytes();
249 m_offset = 0;
250 }
251
252 // Iterate each requested field and decode one by one
253 template <typename value_field_decoder>
next()254 int Rdb_value_field_iterator<value_field_decoder>::next() {
255 int err = HA_EXIT_SUCCESS;
256 while (m_field_iter != m_field_end) {
257 m_field_dec = m_field_iter->m_field_enc;
258 bool decode = m_field_iter->m_decode;
259 bool maybe_null = m_field_dec->maybe_null();
260 // This is_null value is bind to how stroage format store its value
261 m_is_null = maybe_null && ((m_null_bytes[m_field_dec->m_null_offset] &
262 m_field_dec->m_null_mask) != 0);
263
264 // Skip the bytes we need to skip
265 int skip = m_field_iter->m_skip;
266 if (skip && !m_value_slice_reader->read(skip)) {
267 return HA_ERR_ROCKSDB_CORRUPT_DATA;
268 }
269
270 m_field = m_table->field[m_field_dec->m_field_index];
271 // Decode each field
272 err = value_field_decoder::decode(m_buf, &m_offset, m_table, m_field,
273 m_field_dec, m_value_slice_reader, decode,
274 m_is_null);
275 if (err != HA_EXIT_SUCCESS) {
276 return err;
277 }
278 m_field_iter++;
279 // Only break for the field that are actually decoding rather than skipping
280 if (decode) {
281 break;
282 }
283 }
284 return err;
285 }
286
287 template <typename value_field_decoder>
end_of_fields() const288 bool Rdb_value_field_iterator<value_field_decoder>::end_of_fields() const {
289 return m_field_iter == m_field_end;
290 }
291
292 template <typename value_field_decoder>
get_field() const293 Field *Rdb_value_field_iterator<value_field_decoder>::get_field() const {
294 DBUG_ASSERT(m_field != nullptr);
295 return m_field;
296 }
297
298 template <typename value_field_decoder>
get_dst() const299 void *Rdb_value_field_iterator<value_field_decoder>::get_dst() const {
300 DBUG_ASSERT(m_buf != nullptr);
301 return m_buf + m_offset;
302 }
303
304 template <typename value_field_decoder>
get_field_index() const305 int Rdb_value_field_iterator<value_field_decoder>::get_field_index() const {
306 DBUG_ASSERT(m_field_dec != nullptr);
307 return m_field_dec->m_field_index;
308 }
309
310 template <typename value_field_decoder>
get_field_type() const311 enum_field_types Rdb_value_field_iterator<value_field_decoder>::get_field_type()
312 const {
313 DBUG_ASSERT(m_field_dec != nullptr);
314 return m_field_dec->m_field_type;
315 }
316
317 template <typename value_field_decoder>
is_null() const318 bool Rdb_value_field_iterator<value_field_decoder>::is_null() const {
319 DBUG_ASSERT(m_field != nullptr);
320 return m_is_null;
321 }
322
323 /*
324 Initialize Rdb_converter with table data
325 @param thd IN Thread context
326 @param tbl_def IN MyRocks table definition
327 @param table IN Current open table
328 */
Rdb_converter(const THD * thd,const Rdb_tbl_def * tbl_def,TABLE * table)329 Rdb_converter::Rdb_converter(const THD *thd, const Rdb_tbl_def *tbl_def,
330 TABLE *table)
331 : m_thd(thd), m_tbl_def(tbl_def), m_table(table) {
332 DBUG_ASSERT(thd != nullptr);
333 DBUG_ASSERT(tbl_def != nullptr);
334 DBUG_ASSERT(table != nullptr);
335
336 m_key_requested = false;
337 m_verify_row_debug_checksums = false;
338 m_maybe_unpack_info = false;
339 m_row_checksums_checked = 0;
340 m_null_bytes = nullptr;
341 setup_field_encoders();
342 }
343
~Rdb_converter()344 Rdb_converter::~Rdb_converter() {
345 my_free(m_encoder_arr);
346 m_encoder_arr = nullptr;
347 // These are needed to suppress valgrind errors in rocksdb.partition
348 m_storage_record.free();
349 }
350
351 /*
352 Decide storage type for each encoder
353 */
get_storage_type(Rdb_field_encoder * const encoder,const uint kp)354 void Rdb_converter::get_storage_type(Rdb_field_encoder *const encoder,
355 const uint kp) {
356 auto pk_descr =
357 m_tbl_def->m_key_descr_arr[ha_rocksdb::pk_index(m_table, m_tbl_def)];
358 // STORE_SOME uses unpack_info.
359 if (pk_descr->has_unpack_info(kp)) {
360 DBUG_ASSERT(pk_descr->can_unpack(kp));
361 encoder->m_storage_type = Rdb_field_encoder::STORE_SOME;
362 m_maybe_unpack_info = true;
363 } else if (pk_descr->can_unpack(kp)) {
364 encoder->m_storage_type = Rdb_field_encoder::STORE_NONE;
365 }
366 }
367
368 /*
369 @brief
370 Setup which fields will be unpacked when reading rows
371
372 @detail
373 Three special cases when we still unpack all fields:
374 - When client requires decode_all_fields, such as this table is being
375 updated (m_lock_rows==RDB_LOCK_WRITE).
376 - When @@rocksdb_verify_row_debug_checksums is ON (In this mode, we need to
377 read all fields to find whether there is a row checksum at the end. We could
378 skip the fields instead of decoding them, but currently we do decoding.)
379 - On index merge as bitmap is cleared during that operation
380
381 @seealso
382 Rdb_converter::setup_field_encoders()
383 Rdb_converter::convert_record_from_storage_format()
384 */
setup_field_decoders(const MY_BITMAP * field_map,bool decode_all_fields)385 void Rdb_converter::setup_field_decoders(const MY_BITMAP *field_map,
386 bool decode_all_fields) {
387 m_key_requested = false;
388 m_decoders_vect.clear();
389 int last_useful = 0;
390 int skip_size = 0;
391
392 for (uint i = 0; i < m_table->s->fields; i++) {
393 // bitmap is cleared on index merge, but it still needs to decode columns
394 bool field_requested =
395 decode_all_fields || m_verify_row_debug_checksums ||
396 bitmap_is_clear_all(field_map) ||
397 bitmap_is_set(field_map, m_table->field[i]->field_index);
398
399 // We only need the decoder if the whole record is stored.
400 if (m_encoder_arr[i].m_storage_type != Rdb_field_encoder::STORE_ALL) {
401 // the field potentially needs unpacking
402 if (field_requested) {
403 // the field is in the read set
404 m_key_requested = true;
405 }
406 continue;
407 }
408
409 if (field_requested) {
410 // We will need to decode this field
411 m_decoders_vect.push_back({&m_encoder_arr[i], true, skip_size});
412 last_useful = m_decoders_vect.size();
413 skip_size = 0;
414 } else {
415 if (m_encoder_arr[i].uses_variable_len_encoding() ||
416 m_encoder_arr[i].maybe_null()) {
417 // For variable-length field, we need to read the data and skip it
418 m_decoders_vect.push_back({&m_encoder_arr[i], false, skip_size});
419 skip_size = 0;
420 } else {
421 // Fixed-width field can be skipped without looking at it.
422 // Add appropriate skip_size to the next field.
423 skip_size += m_encoder_arr[i].m_pack_length_in_rec;
424 }
425 }
426 }
427
428 // It could be that the last few elements are varchars that just do
429 // skipping. Remove them.
430 m_decoders_vect.erase(m_decoders_vect.begin() + last_useful,
431 m_decoders_vect.end());
432 }
433
setup_field_encoders()434 void Rdb_converter::setup_field_encoders() {
435 uint null_bytes_length = 0;
436 uchar cur_null_mask = 0x1;
437
438 m_encoder_arr = static_cast<Rdb_field_encoder *>(
439 my_malloc(m_table->s->fields * sizeof(Rdb_field_encoder), MYF(0)));
440 if (m_encoder_arr == nullptr) {
441 return;
442 }
443
444 for (uint i = 0; i < m_table->s->fields; i++) {
445 Field *const field = m_table->field[i];
446 m_encoder_arr[i].m_storage_type = Rdb_field_encoder::STORE_ALL;
447
448 /*
449 Check if this field is
450 - a part of primary key, and
451 - it can be decoded back from its key image.
452 If both hold, we don't need to store this field in the value part of
453 RocksDB's key-value pair.
454
455 If hidden pk exists, we skip this check since the field will never be
456 part of the hidden pk.
457 */
458 if (!Rdb_key_def::table_has_hidden_pk(m_table)) {
459 KEY *const pk_info = &m_table->key_info[m_table->s->primary_key];
460 for (uint kp = 0; kp < pk_info->user_defined_key_parts; kp++) {
461 // key_part->fieldnr is counted from 1
462 if (field->field_index + 1 == pk_info->key_part[kp].fieldnr) {
463 get_storage_type(&m_encoder_arr[i], kp);
464 break;
465 }
466 }
467 }
468
469 m_encoder_arr[i].m_field_type = field->real_type();
470 m_encoder_arr[i].m_field_index = i;
471 m_encoder_arr[i].m_pack_length_in_rec = field->pack_length_in_rec();
472
473 if (field->real_maybe_null()) {
474 m_encoder_arr[i].m_null_mask = cur_null_mask;
475 m_encoder_arr[i].m_null_offset = null_bytes_length;
476 if (cur_null_mask == 0x80) {
477 cur_null_mask = 0x1;
478 null_bytes_length++;
479 } else {
480 cur_null_mask = cur_null_mask << 1;
481 }
482 } else {
483 m_encoder_arr[i].m_null_mask = 0;
484 }
485 }
486
487 // Count the last, unfinished NULL-bits byte
488 if (cur_null_mask != 0x1) {
489 null_bytes_length++;
490 }
491
492 m_null_bytes_length_in_record = null_bytes_length;
493 }
494
495 /*
496 EntryPoint for Decode:
497 Decode key slice(if requested) and value slice using built-in field
498 decoders
499 @param key_def IN key definition to decode
500 @param dst OUT Mysql buffer to fill decoded content
501 @param key_slice IN RocksDB key slice to decode
502 @param value_slice IN RocksDB value slice to decode
503 @return
504 0 OK
505 other HA_ERR error code (can be SE-specific)
506 */
decode(const std::shared_ptr<Rdb_key_def> & key_def,uchar * dst,const rocksdb::Slice * key_slice,const rocksdb::Slice * value_slice)507 int Rdb_converter::decode(const std::shared_ptr<Rdb_key_def> &key_def,
508 uchar *dst, // address to fill data
509 const rocksdb::Slice *key_slice,
510 const rocksdb::Slice *value_slice) {
511 // Currently only support decode primary key, Will add decode secondary later
512 DBUG_ASSERT(key_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
513 key_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
514
515 const rocksdb::Slice *updated_key_slice = key_slice;
516 #ifndef DBUG_OFF
517 String last_rowkey;
518 last_rowkey.copy(key_slice->data(), key_slice->size(), &my_charset_bin);
519 DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_read1",
520 { dbug_modify_key_varchar8(&last_rowkey); });
521 rocksdb::Slice rowkey_slice(last_rowkey.ptr(), last_rowkey.length());
522 updated_key_slice = &rowkey_slice;
523 #endif
524 return convert_record_from_storage_format(key_def, updated_key_slice,
525 value_slice, dst);
526 }
527
528 /*
529 Decode value slice header
530 @param reader IN value slice reader
531 @param pk_def IN key definition to decode
532 @param unpack_slice OUT unpack info slice
533 @return
534 0 OK
535 other HA_ERR error code (can be SE-specific)
536 */
decode_value_header(Rdb_string_reader * reader,const std::shared_ptr<Rdb_key_def> & pk_def,rocksdb::Slice * unpack_slice)537 int Rdb_converter::decode_value_header(
538 Rdb_string_reader *reader, const std::shared_ptr<Rdb_key_def> &pk_def,
539 rocksdb::Slice *unpack_slice) {
540 /* If it's a TTL record, skip the 8 byte TTL value */
541 if (pk_def->has_ttl()) {
542 const char *ttl_bytes;
543 if ((ttl_bytes = reader->read(ROCKSDB_SIZEOF_TTL_RECORD))) {
544 memcpy(m_ttl_bytes, ttl_bytes, ROCKSDB_SIZEOF_TTL_RECORD);
545 } else {
546 return HA_ERR_ROCKSDB_CORRUPT_DATA;
547 }
548 }
549
550 /* Other fields are decoded from the value */
551 if (m_null_bytes_length_in_record &&
552 !(m_null_bytes = reader->read(m_null_bytes_length_in_record))) {
553 return HA_ERR_ROCKSDB_CORRUPT_DATA;
554 }
555
556 if (m_maybe_unpack_info) {
557 const char *unpack_info = reader->get_current_ptr();
558 if (!unpack_info || !Rdb_key_def::is_unpack_data_tag(unpack_info[0]) ||
559 !reader->read(Rdb_key_def::get_unpack_header_size(unpack_info[0]))) {
560 return HA_ERR_ROCKSDB_CORRUPT_DATA;
561 }
562
563 uint16 unpack_info_len =
564 rdb_netbuf_to_uint16(reinterpret_cast<const uchar *>(unpack_info + 1));
565 *unpack_slice = rocksdb::Slice(unpack_info, unpack_info_len);
566
567 reader->read(unpack_info_len -
568 Rdb_key_def::get_unpack_header_size(unpack_info[0]));
569 }
570
571 return HA_EXIT_SUCCESS;
572 }
573
574 /*
575 Convert RocksDb key slice and value slice to Mysql format
576 @param key_def IN key definition to decode
577 @param key_slice IN RocksDB key slice
578 @param value_slice IN RocksDB value slice
579 @param dst OUT MySql format address
580 @return
581 0 OK
582 other HA_ERR error code (can be SE-specific)
583 */
convert_record_from_storage_format(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice * const key_slice,const rocksdb::Slice * const value_slice,uchar * const dst)584 int Rdb_converter::convert_record_from_storage_format(
585 const std::shared_ptr<Rdb_key_def> &pk_def,
586 const rocksdb::Slice *const key_slice,
587 const rocksdb::Slice *const value_slice, uchar *const dst) {
588 int err = HA_EXIT_SUCCESS;
589
590 Rdb_string_reader value_slice_reader(value_slice);
591 rocksdb::Slice unpack_slice;
592 err = decode_value_header(&value_slice_reader, pk_def, &unpack_slice);
593 if (err != HA_EXIT_SUCCESS) {
594 return err;
595 }
596
597 /*
598 Decode PK fields from the key
599 */
600 if (m_key_requested) {
601 err = pk_def->unpack_record(m_table, dst, key_slice,
602 !unpack_slice.empty() ? &unpack_slice : nullptr,
603 false /* verify_checksum */);
604 }
605 if (err != HA_EXIT_SUCCESS) {
606 return err;
607 }
608
609 Rdb_value_field_iterator<Rdb_convert_to_record_value_decoder>
610 value_field_iterator(m_table, &value_slice_reader, this, dst);
611
612 // Decode value slices
613 while (!value_field_iterator.end_of_fields()) {
614 err = value_field_iterator.next();
615
616 if (err != HA_EXIT_SUCCESS) {
617 return err;
618 }
619 }
620
621 if (m_verify_row_debug_checksums) {
622 return verify_row_debug_checksum(pk_def, &value_slice_reader, key_slice,
623 value_slice);
624 }
625 return HA_EXIT_SUCCESS;
626 }
627
628 /*
629 Verify checksum for row
630 @param pk_def IN key def
631 @param reader IN RocksDB value slice reader
632 @param key IN RocksDB key slice
633 @param value IN RocksDB value slice
634 @return
635 0 OK
636 other HA_ERR error code (can be SE-specific)
637 */
verify_row_debug_checksum(const std::shared_ptr<Rdb_key_def> & pk_def,Rdb_string_reader * reader,const rocksdb::Slice * key,const rocksdb::Slice * value)638 int Rdb_converter::verify_row_debug_checksum(
639 const std::shared_ptr<Rdb_key_def> &pk_def, Rdb_string_reader *reader,
640 const rocksdb::Slice *key, const rocksdb::Slice *value) {
641 if (reader->remaining_bytes() == RDB_CHECKSUM_CHUNK_SIZE &&
642 reader->read(1)[0] == RDB_CHECKSUM_DATA_TAG) {
643 uint32_t stored_key_chksum =
644 rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
645 uint32_t stored_val_chksum =
646 rdb_netbuf_to_uint32((const uchar *)reader->read(RDB_CHECKSUM_SIZE));
647
648 const uint32_t computed_key_chksum =
649 my_core::crc32(0, rdb_slice_to_uchar_ptr(key), key->size());
650 const uint32_t computed_val_chksum =
651 my_core::crc32(0, rdb_slice_to_uchar_ptr(value),
652 value->size() - RDB_CHECKSUM_CHUNK_SIZE);
653
654 DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum1", stored_key_chksum++;);
655
656 if (stored_key_chksum != computed_key_chksum) {
657 pk_def->report_checksum_mismatch(true, key->data(), key->size());
658 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
659 }
660
661 DBUG_EXECUTE_IF("myrocks_simulate_bad_pk_checksum2", stored_val_chksum++;);
662 if (stored_val_chksum != computed_val_chksum) {
663 pk_def->report_checksum_mismatch(false, value->data(), value->size());
664 return HA_ERR_ROCKSDB_CHECKSUM_MISMATCH;
665 }
666
667 m_row_checksums_checked++;
668 }
669 if (reader->remaining_bytes()) {
670 return HA_ERR_ROCKSDB_CORRUPT_DATA;
671 }
672 return HA_EXIT_SUCCESS;
673 }
674
675 /**
676 Convert record from table->record[0] form into a form that can be written
677 into rocksdb.
678
679 @param pk_def IN Current key def
680 @pk_unpack_info IN Unpack info generated during key pack
681 @is_update_row IN Whether it is update row
682 @store_row_debug_checksums IN Whether to store checksums
683 @param ttl_bytes IN/OUT Old ttl value from previous record and
684 ttl value during current encode
685 @is_ttl_bytes_updated OUT Whether ttl bytes is updated
686 @param value_slice OUT Data slice with record data.
687 */
encode_value_slice(const std::shared_ptr<Rdb_key_def> & pk_def,const rocksdb::Slice & pk_packed_slice,Rdb_string_writer * pk_unpack_info,bool is_update_row,bool store_row_debug_checksums,char * ttl_bytes,bool * is_ttl_bytes_updated,rocksdb::Slice * const value_slice)688 int Rdb_converter::encode_value_slice(
689 const std::shared_ptr<Rdb_key_def> &pk_def,
690 const rocksdb::Slice &pk_packed_slice, Rdb_string_writer *pk_unpack_info,
691 bool is_update_row, bool store_row_debug_checksums, char *ttl_bytes,
692 bool *is_ttl_bytes_updated, rocksdb::Slice *const value_slice) {
693 DBUG_ASSERT(pk_def != nullptr);
694 // Currently only primary key will store value slice
695 DBUG_ASSERT(pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_PRIMARY ||
696 pk_def->m_index_type == Rdb_key_def::INDEX_TYPE_HIDDEN_PRIMARY);
697 DBUG_ASSERT_IMP(m_maybe_unpack_info, pk_unpack_info);
698
699 bool has_ttl = pk_def->has_ttl();
700 bool has_ttl_column = !pk_def->m_ttl_column.empty();
701
702 m_storage_record.length(0);
703
704 if (has_ttl) {
705 /* If it's a TTL record, reserve space for 8 byte TTL value in front. */
706 m_storage_record.fill(
707 ROCKSDB_SIZEOF_TTL_RECORD + m_null_bytes_length_in_record, 0);
708 // NOTE: is_ttl_bytes_updated is only used for update case
709 // During update, skip update sk key/values slice iff none of sk fields
710 // have changed and ttl bytes isn't changed. see
711 // ha_rocksdb::update_write_sk() for more info
712 *is_ttl_bytes_updated = false;
713 char *const data = const_cast<char *>(m_storage_record.ptr());
714 if (has_ttl_column) {
715 DBUG_ASSERT(pk_def->get_ttl_field_index() != UINT_MAX);
716 Field *const field = m_table->field[pk_def->get_ttl_field_index()];
717 DBUG_ASSERT(field->pack_length_in_rec() == ROCKSDB_SIZEOF_TTL_RECORD);
718 DBUG_ASSERT(field->real_type() == MYSQL_TYPE_LONGLONG);
719
720 uint64 ts = uint8korr(field->ptr);
721 #ifndef DBUG_OFF
722 ts += rdb_dbug_set_ttl_rec_ts();
723 #endif
724 rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
725 if (is_update_row) {
726 *is_ttl_bytes_updated =
727 memcmp(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
728 }
729 // Also store in m_ttl_bytes to propagate to update_write_sk
730 memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
731 } else {
732 /*
733 For implicitly generated TTL records we need to copy over the old
734 TTL value from the old record in the event of an update. It was stored
735 in m_ttl_bytes.
736
737 Otherwise, generate a timestamp using the current time.
738 */
739 if (is_update_row) {
740 memcpy(data, ttl_bytes, sizeof(uint64));
741 } else {
742 uint64 ts = static_cast<uint64>(std::time(nullptr));
743 #ifndef DBUG_OFF
744 ts += rdb_dbug_set_ttl_rec_ts();
745 #endif
746 rdb_netbuf_store_uint64(reinterpret_cast<uchar *>(data), ts);
747 // Also store in m_ttl_bytes to propagate to update_write_sk
748 memcpy(ttl_bytes, data, ROCKSDB_SIZEOF_TTL_RECORD);
749 }
750 }
751 } else {
752 /* All NULL bits are initially 0 */
753 m_storage_record.fill(m_null_bytes_length_in_record, 0);
754 }
755
756 // If a primary key may have non-empty unpack_info for certain values,
757 // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
758 // itself was prepared in Rdb_key_def::pack_record.
759 if (m_maybe_unpack_info) {
760 m_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
761 pk_unpack_info->get_current_pos());
762 }
763 for (uint i = 0; i < m_table->s->fields; i++) {
764 Rdb_field_encoder &encoder = m_encoder_arr[i];
765 /* Don't pack decodable PK key parts */
766 if (encoder.m_storage_type != Rdb_field_encoder::STORE_ALL) {
767 continue;
768 }
769
770 Field *const field = m_table->field[i];
771 if (encoder.maybe_null()) {
772 char *data = const_cast<char *>(m_storage_record.ptr());
773 if (has_ttl) {
774 data += ROCKSDB_SIZEOF_TTL_RECORD;
775 }
776
777 if (field->is_null()) {
778 data[encoder.m_null_offset] |= encoder.m_null_mask;
779 /* Don't write anything for NULL values */
780 continue;
781 }
782 }
783
784 if (encoder.m_field_type == MYSQL_TYPE_BLOB) {
785 my_core::Field_blob *blob =
786 reinterpret_cast<my_core::Field_blob *>(field);
787 /* Get the number of bytes needed to store length*/
788 const uint length_bytes = blob->pack_length() - portable_sizeof_char_ptr;
789
790 /* Store the length of the value */
791 m_storage_record.append(reinterpret_cast<char *>(blob->ptr),
792 length_bytes);
793
794 /* Store the blob value itself */
795 char *data_ptr;
796 memcpy(&data_ptr, blob->ptr + length_bytes, sizeof(uchar **));
797 m_storage_record.append(data_ptr, blob->get_length());
798 } else if (encoder.m_field_type == MYSQL_TYPE_VARCHAR) {
799 Field_varstring *const field_var =
800 reinterpret_cast<Field_varstring *>(field);
801 uint data_len;
802 /* field_var->length_bytes is 1 or 2 */
803 if (field_var->length_bytes == 1) {
804 data_len = field_var->ptr[0];
805 } else {
806 DBUG_ASSERT(field_var->length_bytes == 2);
807 data_len = uint2korr(field_var->ptr);
808 }
809 m_storage_record.append(reinterpret_cast<char *>(field_var->ptr),
810 field_var->length_bytes + data_len);
811 } else {
812 /* Copy the field data */
813 const uint len = field->pack_length_in_rec();
814 m_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
815 }
816 }
817
818 if (store_row_debug_checksums) {
819 const uint32_t key_crc32 = my_core::crc32(
820 0, rdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
821 const uint32_t val_crc32 =
822 my_core::crc32(0, rdb_mysql_str_to_uchar_str(&m_storage_record),
823 m_storage_record.length());
824 uchar key_crc_buf[RDB_CHECKSUM_SIZE];
825 uchar val_crc_buf[RDB_CHECKSUM_SIZE];
826 rdb_netbuf_store_uint32(key_crc_buf, key_crc32);
827 rdb_netbuf_store_uint32(val_crc_buf, val_crc32);
828 m_storage_record.append((const char *)&RDB_CHECKSUM_DATA_TAG, 1);
829 m_storage_record.append((const char *)key_crc_buf, RDB_CHECKSUM_SIZE);
830 m_storage_record.append((const char *)val_crc_buf, RDB_CHECKSUM_SIZE);
831 }
832
833 *value_slice =
834 rocksdb::Slice(m_storage_record.ptr(), m_storage_record.length());
835
836 return HA_EXIT_SUCCESS;
837 }
838 } // namespace myrocks
839