1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDBis is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22 ======= */
23
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26 // Update operation codes. These codes get stuffed into update messages, so they can not change.
27 // The operations are currently stored in a single byte in the update message, so only 256 operations
28 // are supported. When we need more, we can use the last (255) code to indicate that the operation code
29 // is expanded beyond 1 byte.
30 enum {
31 UPDATE_OP_COL_ADD_OR_DROP = 0,
32
33 UPDATE_OP_EXPAND_VARIABLE_OFFSETS = 1,
34 UPDATE_OP_EXPAND_INT = 2,
35 UPDATE_OP_EXPAND_UINT = 3,
36 UPDATE_OP_EXPAND_CHAR = 4,
37 UPDATE_OP_EXPAND_BINARY = 5,
38 UPDATE_OP_EXPAND_BLOB = 6,
39
40 UPDATE_OP_UPDATE_1 = 10,
41 UPDATE_OP_UPSERT_1 = 11,
42 UPDATE_OP_UPDATE_2 = 12,
43 UPDATE_OP_UPSERT_2 = 13,
44 };
45
46 // Field types used in the update messages
47 enum {
48 UPDATE_TYPE_UNKNOWN = 0,
49 UPDATE_TYPE_INT = 1,
50 UPDATE_TYPE_UINT = 2,
51 UPDATE_TYPE_CHAR = 3,
52 UPDATE_TYPE_BINARY = 4,
53 UPDATE_TYPE_VARCHAR = 5,
54 UPDATE_TYPE_VARBINARY = 6,
55 UPDATE_TYPE_TEXT = 7,
56 UPDATE_TYPE_BLOB = 8,
57 };
58
59 #define UP_COL_ADD_OR_DROP UPDATE_OP_COL_ADD_OR_DROP
60
61 // add or drop column sub-operations
62 #define COL_DROP 0xaa
63 #define COL_ADD 0xbb
64
65 // add or drop column types
66 #define COL_FIXED 0xcc
67 #define COL_VAR 0xdd
68 #define COL_BLOB 0xee
69
70 #define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8
71
72 // how much space do I need for the mutators?
73 // static stuff first:
74 // operation 1 == UP_COL_ADD_OR_DROP
75 // 8 - old null, new null
76 // 2 - old num_offset, new num_offset
77 // 8 - old fixed_field size, new fixed_field_size
78 // 8 - old and new length of offsets
79 // 8 - old and new starting null bit position
80 // TOTAL: 27
81
82 // dynamic stuff:
83 // 4 - number of columns
84 // for each column:
85 // 1 - add or drop
86 // 1 - is nullable
87 // 4 - if nullable, position
88 // 1 - if add, whether default is null or not
89 // 1 - if fixed, var, or not
90 // for fixed, entire default
91 // for var, 4 bytes length, then entire default
92 // for blob, nothing
93 // So, an upperbound is 4 + num_fields(12) + all default stuff
94
95 // static blob stuff:
96 // 4 - num blobs
97 // 1 byte for each num blobs in old table
98 // So, an upperbound is 4 + kc_info->num_blobs
99
100 // dynamic blob stuff:
101 // for each blob added:
102 // 1 - state if we are adding or dropping
103 // 4 - blob index
104 // if add, 1 len bytes
105 // at most, 4 0's
106 // So, upperbound is num_blobs(1+4+1+4) = num_columns*10
107
108 // The expand varchar offsets message is used to expand the size of an offset
109 // from 1 to 2 bytes. Not VLQ coded.
110 // uint8 operation = UPDATE_OP_EXPAND_VARIABLE_OFFSETS
111 // uint32 number of offsets
112 // uint32 starting offset of the variable length field offsets
113
114 // Expand the size of a fixed length column message. Not VLQ coded.
115 // The field type is encoded in the operation code.
116 // uint8 operation = UPDATE_OP_EXPAND_INT/UINT/CHAR/BINARY
117 // uint32 offset offset of the field
118 // uint32 old length the old length of the field's value
119 // uint32 new length the new length of the field's value
120
121 // uint8 operation = UPDATE_OP_EXPAND_CHAR/BINARY
122 // uint32 offset offset of the field
123 // uint32 old length the old length of the field's value
124 // uint32 new length the new length of the field's value
125 // uint8 pad char
126
127 // Expand blobs message. VLQ coded.
128 // uint8 operation = UPDATE_OP_EXPAND_BLOB
129 // uint32 start variable offset
130 // uint32 variable offset bytes
131 // uint32 bytes per offset
132 // uint32 num blobs = N
133 // uint8 old lengths[N]
134 // uint8 new lengths[N]
135
136 // Update and Upsert version 1 messages. Not VLQ coded. Not used anymore, but
137 // may be in the fractal tree from a previous build.
138 //
139 // Field descriptor:
140 // Operations:
141 // update operation 4 == { '=', '+', '-' }
142 // x = k
143 // x = x + k
144 // x = x - k
145 // field type 4 see field types above
146 // unused 4 unused
147 // field null num 4 bit 31 is 1 if the field is nullible and the
148 // remaining bits contain the null bit number
149 // field offset 4 for fixed fields, this is the offset from
150 // begining of the row of the field
151 // value:
152 // value length 4 == N, length of the value
153 // value N value to add or subtract
154 //
155 // Update_1 message:
156 // Operation 1 == UPDATE_OP_UPDATE_1
157 // fixed field offset 4 offset of the beginning of the fixed fields
158 // var field offset 4 offset of the variable length offsets
159 // var_offset_bytes 1 length of offsets (Note: not big enough)
160 // bytes_per_offset 4 number of bytes per offset
161 // Number of update ops 4 == N
162 // Update ops [N]
163 //
164 // Upsert_1 message:
165 // Operation 1 == UPDATE_OP_UPSERT_1
166 // Insert row:
167 // length 4 == N
168 // data N
169 // fixed field offset 4 offset of the beginning of the fixed fields
170 // var field offset 4 offset of the variable length offsets
171 // var_offset_bytes 1 length of offsets (Note: not big enough)
172 // bytes_per_offset 4 number of bytes per offset
173 // Number of update ops 4 == N
174 // Update ops [N]
175
176 // Update and Upserver version 2 messages. VLQ coded.
177 // Update version 2
178 // uint8 operation = UPDATE_OP_UPDATE_2
179 // uint32 number of update ops = N
180 // uint8 update ops [ N ]
181 //
182 // Upsert version 2
183 // uint8 operation = UPDATE_OP_UPSERT_2
184 // uint32 insert length = N
185 // uint8 insert data [ N ]
186 // uint32 number of update ops = M
187 // update ops [ M ]
188 //
189 // Variable fields info
190 // uint32 update operation = 'v'
191 // uint32 start offset
192 // uint32 num varchars
193 // uint32 bytes per offset
194 //
195 // Blobs info
196 // uint32 update operation = 'b'
197 // uint32 num blobs = N
198 // uint8 blob lengths [ N ]
199 //
200 // Update operation on fixed length fields
201 // uint32 update operation = '=', '+', '-'
202 // uint32 field type
203 // uint32 null num 0 => not nullable, otherwise encoded as field_null_num + 1
204 // uint32 offset
205 // uint32 value length = N
206 // uint8 value [ N ]
207 //
208 // Update operation on varchar fields
209 // uint32 update operation = '='
210 // uint32 field type
211 // uint32 null num
212 // uint32 var index
213 // uint32 value length = N
214 // uint8 value [ N ]
215 //
216 // Update operation on blob fields
217 // uint32 update operation = '='
218 // uint32 field type
219 // uint32 null num
220 // uint32 blob index
221 // uint32 value length = N
222 // uint8 value [ N ]
223
224 #include "tokudb_buffer.h"
225 #include "tokudb_math.h"
226
227 //
228 // checks whether the bit at index pos in data is set or not
229 //
is_overall_null_position_set(uchar * data,uint32_t pos)230 static inline bool is_overall_null_position_set(uchar* data, uint32_t pos) {
231 uint32_t offset = pos/8;
232 uchar remainder = pos%8;
233 uchar null_bit = 1<<remainder;
234 return ((data[offset] & null_bit) != 0);
235 }
236
237 //
238 // sets the bit at index pos in data to 1 if is_null, 0 otherwise
239 //
set_overall_null_position(uchar * data,uint32_t pos,bool is_null)240 static inline void set_overall_null_position(
241 uchar* data,
242 uint32_t pos,
243 bool is_null) {
244
245 uint32_t offset = pos/8;
246 uchar remainder = pos%8;
247 uchar null_bit = 1<<remainder;
248 if (is_null) {
249 data[offset] |= null_bit;
250 }
251 else {
252 data[offset] &= ~null_bit;
253 }
254 }
255
copy_null_bits(uint32_t start_old_pos,uint32_t start_new_pos,uint32_t num_bits,uchar * old_null_bytes,uchar * new_null_bytes)256 static inline void copy_null_bits(
257 uint32_t start_old_pos,
258 uint32_t start_new_pos,
259 uint32_t num_bits,
260 uchar* old_null_bytes,
261 uchar* new_null_bytes) {
262 for (uint32_t i = 0; i < num_bits; i++) {
263 uint32_t curr_old_pos = i + start_old_pos;
264 uint32_t curr_new_pos = i + start_new_pos;
265 // copy over old null bytes
266 if (is_overall_null_position_set(old_null_bytes,curr_old_pos)) {
267 set_overall_null_position(new_null_bytes,curr_new_pos,true);
268 }
269 else {
270 set_overall_null_position(new_null_bytes,curr_new_pos,false);
271 }
272 }
273 }
274
copy_var_fields(uint32_t start_old_num_var_field,uint32_t num_var_fields,uchar * old_var_field_offset_ptr,uchar old_num_offset_bytes,uchar * start_new_var_field_data_ptr,uchar * start_new_var_field_offset_ptr,uchar * new_var_field_data_ptr,uchar * old_var_field_data_ptr,uint32_t new_num_offset_bytes,uint32_t * num_data_bytes_written,uint32_t * num_offset_bytes_written)275 static inline void copy_var_fields(
276 //index of var fields that we should start writing
277 uint32_t start_old_num_var_field,
278 // number of var fields to copy
279 uint32_t num_var_fields,
280 //static ptr to where offset bytes begin in old row
281 uchar* old_var_field_offset_ptr,
282 //number of offset bytes used in old row
283 uchar old_num_offset_bytes,
284 // where the new var data should be written
285 uchar* start_new_var_field_data_ptr,
286 // where the new var offsets should be written
287 uchar* start_new_var_field_offset_ptr,
288 // pointer to beginning of var fields in new row
289 uchar* new_var_field_data_ptr,
290 // pointer to beginning of var fields in old row
291 uchar* old_var_field_data_ptr,
292 // number of offset bytes used in new row
293 uint32_t new_num_offset_bytes,
294 uint32_t* num_data_bytes_written,
295 uint32_t* num_offset_bytes_written) {
296
297 uchar* curr_new_var_field_data_ptr = start_new_var_field_data_ptr;
298 uchar* curr_new_var_field_offset_ptr = start_new_var_field_offset_ptr;
299 for (uint32_t i = 0; i < num_var_fields; i++) {
300 uint32_t field_len;
301 uint32_t start_read_offset;
302 uint32_t curr_old = i + start_old_num_var_field;
303 uchar* data_to_copy = NULL;
304 // get the length and pointer to data that needs to be copied
305 get_var_field_info(
306 &field_len,
307 &start_read_offset,
308 curr_old,
309 old_var_field_offset_ptr,
310 old_num_offset_bytes);
311 data_to_copy = old_var_field_data_ptr + start_read_offset;
312 // now need to copy field_len bytes starting from data_to_copy
313 curr_new_var_field_data_ptr = write_var_field(
314 curr_new_var_field_offset_ptr,
315 curr_new_var_field_data_ptr,
316 new_var_field_data_ptr,
317 data_to_copy,
318 field_len,
319 new_num_offset_bytes);
320 curr_new_var_field_offset_ptr += new_num_offset_bytes;
321 }
322 *num_data_bytes_written =
323 (uint32_t)(curr_new_var_field_data_ptr - start_new_var_field_data_ptr);
324 *num_offset_bytes_written =
325 (uint32_t)(curr_new_var_field_offset_ptr -
326 start_new_var_field_offset_ptr);
327 }
328
copy_toku_blob(uchar * to_ptr,uchar * from_ptr,uint32_t len_bytes,bool skip)329 static inline uint32_t copy_toku_blob(
330 uchar* to_ptr,
331 uchar* from_ptr,
332 uint32_t len_bytes,
333 bool skip) {
334
335 uint32_t length = 0;
336 if (!skip) {
337 memcpy(to_ptr, from_ptr, len_bytes);
338 }
339 length = get_blob_field_len(from_ptr,len_bytes);
340 if (!skip) {
341 memcpy(to_ptr + len_bytes, from_ptr + len_bytes, length);
342 }
343 return (length + len_bytes);
344 }
345
tokudb_hcad_update_fun(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)346 static int tokudb_hcad_update_fun(const DBT* old_val,
347 const DBT* extra,
348 void (*set_val)(const DBT* new_val,
349 void* set_extra),
350 void* set_extra) {
351 uint32_t max_num_bytes;
352 uint32_t num_columns;
353 DBT new_val;
354 uint32_t num_bytes_left;
355 uint32_t num_var_fields_to_copy;
356 uint32_t num_data_bytes_written = 0;
357 uint32_t num_offset_bytes_written = 0;
358 int error;
359 memset(&new_val, 0, sizeof(DBT));
360 uchar operation;
361 uchar* new_val_data = NULL;
362 uchar* extra_pos = NULL;
363 uchar* extra_pos_start = NULL;
364 //
365 // info for pointers into rows
366 //
367 uint32_t old_num_null_bytes;
368 uint32_t new_num_null_bytes;
369 uchar old_num_offset_bytes;
370 uchar new_num_offset_bytes;
371 uint32_t old_fixed_field_size;
372 uint32_t new_fixed_field_size;
373 uint32_t old_len_of_offsets;
374 uint32_t new_len_of_offsets;
375
376 uchar* old_fixed_field_ptr = NULL;
377 uchar* new_fixed_field_ptr = NULL;
378 uint32_t curr_old_fixed_offset;
379 uint32_t curr_new_fixed_offset;
380
381 uchar* old_null_bytes = NULL;
382 uchar* new_null_bytes = NULL;
383 uint32_t curr_old_null_pos;
384 uint32_t curr_new_null_pos;
385 uint32_t old_null_bits_left;
386 uint32_t new_null_bits_left;
387 uint32_t overall_null_bits_left;
388
389 uint32_t old_num_var_fields;
390 // uint32_t new_num_var_fields;
391 uint32_t curr_old_num_var_field;
392 uint32_t curr_new_num_var_field;
393 uchar* old_var_field_offset_ptr = NULL;
394 uchar* new_var_field_offset_ptr = NULL;
395 uchar* curr_new_var_field_offset_ptr = NULL;
396 uchar* old_var_field_data_ptr = NULL;
397 uchar* new_var_field_data_ptr = NULL;
398 uchar* curr_new_var_field_data_ptr = NULL;
399
400 uint32_t start_blob_offset;
401 uchar* start_blob_ptr;
402 uint32_t num_blob_bytes;
403
404 // came across a delete, nothing to update
405 if (old_val == NULL) {
406 error = 0;
407 goto cleanup;
408 }
409
410 extra_pos_start = (uchar *)extra->data;
411 extra_pos = (uchar *)extra->data;
412
413 operation = extra_pos[0];
414 extra_pos++;
415 assert_always(operation == UP_COL_ADD_OR_DROP);
416
417 memcpy(&old_num_null_bytes, extra_pos, sizeof(uint32_t));
418 extra_pos += sizeof(uint32_t);
419 memcpy(&new_num_null_bytes, extra_pos, sizeof(uint32_t));
420 extra_pos += sizeof(uint32_t);
421
422 old_num_offset_bytes = extra_pos[0];
423 extra_pos++;
424 new_num_offset_bytes = extra_pos[0];
425 extra_pos++;
426
427 memcpy(&old_fixed_field_size, extra_pos, sizeof(uint32_t));
428 extra_pos += sizeof(uint32_t);
429 memcpy(&new_fixed_field_size, extra_pos, sizeof(uint32_t));
430 extra_pos += sizeof(uint32_t);
431
432 memcpy(&old_len_of_offsets, extra_pos, sizeof(uint32_t));
433 extra_pos += sizeof(uint32_t);
434 memcpy(&new_len_of_offsets, extra_pos, sizeof(uint32_t));
435 extra_pos += sizeof(uint32_t);
436
437 max_num_bytes =
438 old_val->size + extra->size + new_len_of_offsets + new_fixed_field_size;
439 new_val_data = (uchar *)tokudb::memory::malloc(
440 max_num_bytes,
441 MYF(MY_FAE));
442 if (new_val_data == NULL) {
443 error = ENOMEM;
444 goto cleanup;
445 }
446
447 old_fixed_field_ptr = (uchar *) old_val->data;
448 old_fixed_field_ptr += old_num_null_bytes;
449 new_fixed_field_ptr = new_val_data + new_num_null_bytes;
450 curr_old_fixed_offset = 0;
451 curr_new_fixed_offset = 0;
452
453 old_num_var_fields = old_len_of_offsets/old_num_offset_bytes;
454 // new_num_var_fields = new_len_of_offsets/new_num_offset_bytes;
455 // following fields will change as we write the variable data
456 old_var_field_offset_ptr = old_fixed_field_ptr + old_fixed_field_size;
457 new_var_field_offset_ptr = new_fixed_field_ptr + new_fixed_field_size;
458 old_var_field_data_ptr = old_var_field_offset_ptr + old_len_of_offsets;
459 new_var_field_data_ptr = new_var_field_offset_ptr + new_len_of_offsets;
460 curr_new_var_field_offset_ptr = new_var_field_offset_ptr;
461 curr_new_var_field_data_ptr = new_var_field_data_ptr;
462 curr_old_num_var_field = 0;
463 curr_new_num_var_field = 0;
464
465 old_null_bytes = (uchar *)old_val->data;
466 new_null_bytes = new_val_data;
467
468 memcpy(&curr_old_null_pos, extra_pos, sizeof(uint32_t));
469 extra_pos += sizeof(uint32_t);
470 memcpy(&curr_new_null_pos, extra_pos, sizeof(uint32_t));
471 extra_pos += sizeof(uint32_t);
472
473 memcpy(&num_columns, extra_pos, sizeof(num_columns));
474 extra_pos += sizeof(num_columns);
475
476 memset(new_null_bytes, 0, new_num_null_bytes); // shut valgrind up
477
478 //
479 // now go through and apply the change into new_val_data
480 //
481 for (uint32_t i = 0; i < num_columns; i++) {
482 uchar op_type = extra_pos[0];
483 bool is_null_default = false;
484 extra_pos++;
485
486 assert_always(op_type == COL_DROP || op_type == COL_ADD);
487 bool nullable = (extra_pos[0] != 0);
488 extra_pos++;
489 if (nullable) {
490 uint32_t null_bit_position;
491 memcpy(&null_bit_position, extra_pos, sizeof(uint32_t));
492 extra_pos += sizeof(uint32_t);
493 uint32_t num_bits;
494 if (op_type == COL_DROP) {
495 assert_always(curr_old_null_pos <= null_bit_position);
496 num_bits = null_bit_position - curr_old_null_pos;
497 } else {
498 assert_always(curr_new_null_pos <= null_bit_position);
499 num_bits = null_bit_position - curr_new_null_pos;
500 }
501 copy_null_bits(
502 curr_old_null_pos,
503 curr_new_null_pos,
504 num_bits,
505 old_null_bytes,
506 new_null_bytes);
507 // update the positions
508 curr_new_null_pos += num_bits;
509 curr_old_null_pos += num_bits;
510 if (op_type == COL_DROP) {
511 curr_old_null_pos++; // account for dropped column
512 } else {
513 is_null_default = (extra_pos[0] != 0);
514 extra_pos++;
515 set_overall_null_position(
516 new_null_bytes,
517 null_bit_position,
518 is_null_default);
519 curr_new_null_pos++; //account for added column
520 }
521 }
522 uchar col_type = extra_pos[0];
523 extra_pos++;
524 if (col_type == COL_FIXED) {
525 uint32_t col_offset;
526 uint32_t col_size;
527 uint32_t num_bytes_to_copy;
528 memcpy(&col_offset, extra_pos, sizeof(uint32_t));
529 extra_pos += sizeof(uint32_t);
530 memcpy(&col_size, extra_pos, sizeof(uint32_t));
531 extra_pos += sizeof(uint32_t);
532
533 if (op_type == COL_DROP) {
534 num_bytes_to_copy = col_offset - curr_old_fixed_offset;
535 } else {
536 num_bytes_to_copy = col_offset - curr_new_fixed_offset;
537 }
538 memcpy(
539 new_fixed_field_ptr + curr_new_fixed_offset,
540 old_fixed_field_ptr + curr_old_fixed_offset,
541 num_bytes_to_copy);
542 curr_old_fixed_offset += num_bytes_to_copy;
543 curr_new_fixed_offset += num_bytes_to_copy;
544 if (op_type == COL_DROP) {
545 // move old_fixed_offset val to skip OVER column that is
546 // being dropped
547 curr_old_fixed_offset += col_size;
548 } else {
549 if (is_null_default) {
550 // copy zeroes
551 memset(
552 new_fixed_field_ptr + curr_new_fixed_offset,
553 0,
554 col_size);
555 } else {
556 // copy data from extra_pos into new row
557 memcpy(
558 new_fixed_field_ptr + curr_new_fixed_offset,
559 extra_pos,
560 col_size);
561 extra_pos += col_size;
562 }
563 curr_new_fixed_offset += col_size;
564 }
565
566 } else if (col_type == COL_VAR) {
567 uint32_t var_col_index;
568 memcpy(&var_col_index, extra_pos, sizeof(uint32_t));
569 extra_pos += sizeof(uint32_t);
570 if (op_type == COL_DROP) {
571 num_var_fields_to_copy = var_col_index - curr_old_num_var_field;
572 } else {
573 num_var_fields_to_copy = var_col_index - curr_new_num_var_field;
574 }
575 copy_var_fields(
576 curr_old_num_var_field,
577 num_var_fields_to_copy,
578 old_var_field_offset_ptr,
579 old_num_offset_bytes,
580 curr_new_var_field_data_ptr,
581 curr_new_var_field_offset_ptr,
582 // pointer to beginning of var fields in new row
583 new_var_field_data_ptr,
584 // pointer to beginning of var fields in old row
585 old_var_field_data_ptr,
586 // number of offset bytes used in new row
587 new_num_offset_bytes,
588 &num_data_bytes_written,
589 &num_offset_bytes_written);
590 curr_new_var_field_data_ptr += num_data_bytes_written;
591 curr_new_var_field_offset_ptr += num_offset_bytes_written;
592 curr_new_num_var_field += num_var_fields_to_copy;
593 curr_old_num_var_field += num_var_fields_to_copy;
594 if (op_type == COL_DROP) {
595 curr_old_num_var_field++; // skip over dropped field
596 } else {
597 if (is_null_default) {
598 curr_new_var_field_data_ptr = write_var_field(
599 curr_new_var_field_offset_ptr,
600 curr_new_var_field_data_ptr,
601 new_var_field_data_ptr,
602 NULL, //copying no data
603 0, //copying 0 bytes
604 new_num_offset_bytes);
605 curr_new_var_field_offset_ptr += new_num_offset_bytes;
606 } else {
607 uint32_t data_length;
608 memcpy(&data_length, extra_pos, sizeof(data_length));
609 extra_pos += sizeof(data_length);
610 curr_new_var_field_data_ptr = write_var_field(
611 curr_new_var_field_offset_ptr,
612 curr_new_var_field_data_ptr,
613 new_var_field_data_ptr,
614 extra_pos, //copying data from mutator
615 data_length, //copying data_length bytes
616 new_num_offset_bytes);
617 extra_pos += data_length;
618 curr_new_var_field_offset_ptr += new_num_offset_bytes;
619 }
620 curr_new_num_var_field++; //account for added column
621 }
622 } else if (col_type == COL_BLOB) {
623 // handle blob data later
624 continue;
625 } else {
626 assert_unreachable();
627 }
628 }
629 // finish copying the null stuff
630 old_null_bits_left = 8*old_num_null_bytes - curr_old_null_pos;
631 new_null_bits_left = 8*new_num_null_bytes - curr_new_null_pos;
632 overall_null_bits_left = old_null_bits_left;
633 set_if_smaller(overall_null_bits_left, new_null_bits_left);
634 copy_null_bits(
635 curr_old_null_pos,
636 curr_new_null_pos,
637 overall_null_bits_left,
638 old_null_bytes,
639 new_null_bytes);
640 // finish copying fixed field stuff
641 num_bytes_left = old_fixed_field_size - curr_old_fixed_offset;
642 memcpy(
643 new_fixed_field_ptr + curr_new_fixed_offset,
644 old_fixed_field_ptr + curr_old_fixed_offset,
645 num_bytes_left);
646 curr_old_fixed_offset += num_bytes_left;
647 curr_new_fixed_offset += num_bytes_left;
648 // sanity check
649 assert_always(curr_new_fixed_offset == new_fixed_field_size);
650
651 // finish copying var field stuff
652 num_var_fields_to_copy = old_num_var_fields - curr_old_num_var_field;
653 copy_var_fields(
654 curr_old_num_var_field,
655 num_var_fields_to_copy,
656 old_var_field_offset_ptr,
657 old_num_offset_bytes,
658 curr_new_var_field_data_ptr,
659 curr_new_var_field_offset_ptr,
660 // pointer to beginning of var fields in new row
661 new_var_field_data_ptr,
662 // pointer to beginning of var fields in old row
663 old_var_field_data_ptr,
664 // number of offset bytes used in new row
665 new_num_offset_bytes,
666 &num_data_bytes_written,
667 &num_offset_bytes_written);
668 curr_new_var_field_offset_ptr += num_offset_bytes_written;
669 curr_new_var_field_data_ptr += num_data_bytes_written;
670 // sanity check
671 assert_always(curr_new_var_field_offset_ptr == new_var_field_data_ptr);
672
673 // start handling blobs
674 get_blob_field_info(
675 &start_blob_offset,
676 old_len_of_offsets,
677 old_var_field_data_ptr,
678 old_num_offset_bytes);
679 start_blob_ptr = old_var_field_data_ptr + start_blob_offset;
680 // if nothing else in extra, then there are no blobs to add or drop, so
681 // can copy blobs straight
682 if ((extra_pos - extra_pos_start) == extra->size) {
683 num_blob_bytes = old_val->size - (start_blob_ptr - old_null_bytes);
684 memcpy(curr_new_var_field_data_ptr, start_blob_ptr, num_blob_bytes);
685 curr_new_var_field_data_ptr += num_blob_bytes;
686 } else {
687 // else, there is blob information to process
688 uchar* len_bytes = NULL;
689 uint32_t curr_old_blob = 0;
690 uint32_t curr_new_blob = 0;
691 uint32_t num_old_blobs = 0;
692 uchar* curr_old_blob_ptr = start_blob_ptr;
693 memcpy(&num_old_blobs, extra_pos, sizeof(num_old_blobs));
694 extra_pos += sizeof(num_old_blobs);
695 len_bytes = extra_pos;
696 extra_pos += num_old_blobs;
697 // copy over blob fields one by one
698 while ((extra_pos - extra_pos_start) < extra->size) {
699 uchar op_type = extra_pos[0];
700 extra_pos++;
701 uint32_t num_blobs_to_copy = 0;
702 uint32_t blob_index;
703 memcpy(&blob_index, extra_pos, sizeof(blob_index));
704 extra_pos += sizeof(blob_index);
705 assert_always (op_type == COL_DROP || op_type == COL_ADD);
706 if (op_type == COL_DROP) {
707 num_blobs_to_copy = blob_index - curr_old_blob;
708 } else {
709 num_blobs_to_copy = blob_index - curr_new_blob;
710 }
711 for (uint32_t i = 0; i < num_blobs_to_copy; i++) {
712 uint32_t num_bytes_written = copy_toku_blob(
713 curr_new_var_field_data_ptr,
714 curr_old_blob_ptr,
715 len_bytes[curr_old_blob + i],
716 false);
717 curr_old_blob_ptr += num_bytes_written;
718 curr_new_var_field_data_ptr += num_bytes_written;
719 }
720 curr_old_blob += num_blobs_to_copy;
721 curr_new_blob += num_blobs_to_copy;
722 if (op_type == COL_DROP) {
723 // skip over blob in row
724 uint32_t num_bytes = copy_toku_blob(
725 NULL,
726 curr_old_blob_ptr,
727 len_bytes[curr_old_blob],
728 true);
729 curr_old_blob++;
730 curr_old_blob_ptr += num_bytes;
731 } else {
732 // copy new data
733 uint32_t new_len_bytes = extra_pos[0];
734 extra_pos++;
735 uint32_t num_bytes = copy_toku_blob(
736 curr_new_var_field_data_ptr,
737 extra_pos,
738 new_len_bytes,
739 false);
740 curr_new_blob++;
741 curr_new_var_field_data_ptr += num_bytes;
742 extra_pos += num_bytes;
743 }
744 }
745 num_blob_bytes = old_val->size - (curr_old_blob_ptr - old_null_bytes);
746 memcpy(curr_new_var_field_data_ptr, curr_old_blob_ptr, num_blob_bytes);
747 curr_new_var_field_data_ptr += num_blob_bytes;
748 }
749 new_val.data = new_val_data;
750 new_val.size = curr_new_var_field_data_ptr - new_val_data;
751 set_val(&new_val, set_extra);
752
753 error = 0;
754 cleanup:
755 tokudb::memory::free(new_val_data);
756 return error;
757 }
758
759 // Expand the variable offset array in the old row given the update mesage
760 // in the extra.
tokudb_expand_variable_offsets(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)761 static int tokudb_expand_variable_offsets(const DBT* old_val,
762 const DBT* extra,
763 void (*set_val)(const DBT* new_val,
764 void* set_extra),
765 void* set_extra) {
766 int error = 0;
767 tokudb::buffer extra_val(extra->data, 0, extra->size);
768
769 // decode the operation
770 uint8_t operation;
771 extra_val.consume(&operation, sizeof operation);
772 assert_always(operation == UPDATE_OP_EXPAND_VARIABLE_OFFSETS);
773
774 // decode number of offsets
775 uint32_t number_of_offsets;
776 extra_val.consume(&number_of_offsets, sizeof number_of_offsets);
777
778 // decode the offset start
779 uint32_t offset_start;
780 extra_val.consume(&offset_start, sizeof offset_start);
781
782 assert_always(extra_val.size() == extra_val.limit());
783
784 DBT new_val; memset(&new_val, 0, sizeof new_val);
785
786 if (old_val != NULL) {
787 assert_always(offset_start + number_of_offsets <= old_val->size);
788
789 // compute the new val from the old val
790 uchar* old_val_ptr = (uchar*)old_val->data;
791
792 // allocate space for the new val's data
793 uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
794 number_of_offsets + old_val->size,
795 MYF(MY_FAE));
796 if (!new_val_ptr) {
797 error = ENOMEM;
798 goto cleanup;
799 }
800 new_val.data = new_val_ptr;
801
802 // copy up to the start of the varchar offset
803 memcpy(new_val_ptr, old_val_ptr, offset_start);
804 new_val_ptr += offset_start;
805 old_val_ptr += offset_start;
806
807 // expand each offset from 1 to 2 bytes
808 for (uint32_t i = 0; i < number_of_offsets; i++) {
809 uint16_t new_offset = *old_val_ptr;
810 int2store(new_val_ptr, new_offset);
811 new_val_ptr += 2;
812 old_val_ptr += 1;
813 }
814
815 // copy the rest of the row
816 size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
817 memcpy(new_val_ptr, old_val_ptr, n);
818 new_val_ptr += n;
819 old_val_ptr += n;
820 new_val.size = new_val_ptr - (uchar *)new_val.data;
821
822 assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
823 assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
824
825 // set the new val
826 set_val(&new_val, set_extra);
827 }
828
829 error = 0;
830
831 cleanup:
832 tokudb::memory::free(new_val.data);
833 return error;
834 }
835
836 // Expand an int field in a old row given the expand message in the extra.
tokudb_expand_int_field(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)837 static int tokudb_expand_int_field(const DBT* old_val,
838 const DBT* extra,
839 void (*set_val)(const DBT* new_val,
840 void* set_extra),
841 void* set_extra) {
842 int error = 0;
843 tokudb::buffer extra_val(extra->data, 0, extra->size);
844
845 uint8_t operation;
846 extra_val.consume(&operation, sizeof operation);
847 assert_always(
848 operation == UPDATE_OP_EXPAND_INT ||
849 operation == UPDATE_OP_EXPAND_UINT);
850 uint32_t the_offset;
851 extra_val.consume(&the_offset, sizeof the_offset);
852 uint32_t old_length;
853 extra_val.consume(&old_length, sizeof old_length);
854 uint32_t new_length;
855 extra_val.consume(&new_length, sizeof new_length);
856 assert_always(extra_val.size() == extra_val.limit());
857
858 assert_always(new_length >= old_length); // expand only
859
860 DBT new_val; memset(&new_val, 0, sizeof new_val);
861
862 if (old_val != NULL) {
863 // old field within the old val
864 assert_always(the_offset + old_length <= old_val->size);
865
866 // compute the new val from the old val
867 uchar* old_val_ptr = (uchar*)old_val->data;
868
869 // allocate space for the new val's data
870 uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
871 old_val->size + (new_length - old_length),
872 MYF(MY_FAE));
873 if (!new_val_ptr) {
874 error = ENOMEM;
875 goto cleanup;
876 }
877 new_val.data = new_val_ptr;
878
879 // copy up to the old offset
880 memcpy(new_val_ptr, old_val_ptr, the_offset);
881 new_val_ptr += the_offset;
882 old_val_ptr += the_offset;
883
884 switch (operation) {
885 case UPDATE_OP_EXPAND_INT:
886 // fill the entire new value with ones or zeros depending on the
887 // sign bit the encoding is little endian
888 memset(
889 new_val_ptr,
890 (old_val_ptr[old_length-1] & 0x80) ? 0xff : 0x00,
891 new_length);
892 // overlay the low bytes of the new value with the old value
893 memcpy(new_val_ptr, old_val_ptr, old_length);
894 new_val_ptr += new_length;
895 old_val_ptr += old_length;
896 break;
897 case UPDATE_OP_EXPAND_UINT:
898 // fill the entire new value with zeros
899 memset(new_val_ptr, 0, new_length);
900 // overlay the low bytes of the new value with the old value
901 memcpy(new_val_ptr, old_val_ptr, old_length);
902 new_val_ptr += new_length;
903 old_val_ptr += old_length;
904 break;
905 default:
906 assert_unreachable();
907 }
908
909 // copy the rest
910 size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
911 memcpy(new_val_ptr, old_val_ptr, n);
912 new_val_ptr += n;
913 old_val_ptr += n;
914 new_val.size = new_val_ptr - (uchar *)new_val.data;
915
916 assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
917 assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
918
919 // set the new val
920 set_val(&new_val, set_extra);
921 }
922
923 error = 0;
924
925 cleanup:
926 tokudb::memory::free(new_val.data);
927 return error;
928 }
929
930 // Expand a char field in a old row given the expand message in the extra.
tokudb_expand_char_field(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)931 static int tokudb_expand_char_field(const DBT* old_val,
932 const DBT* extra,
933 void (*set_val)(const DBT* new_val,
934 void* set_extra),
935 void* set_extra) {
936 int error = 0;
937 tokudb::buffer extra_val(extra->data, 0, extra->size);
938
939 uint8_t operation;
940 extra_val.consume(&operation, sizeof operation);
941 assert_always(
942 operation == UPDATE_OP_EXPAND_CHAR ||
943 operation == UPDATE_OP_EXPAND_BINARY);
944 uint32_t the_offset;
945 extra_val.consume(&the_offset, sizeof the_offset);
946 uint32_t old_length;
947 extra_val.consume(&old_length, sizeof old_length);
948 uint32_t new_length;
949 extra_val.consume(&new_length, sizeof new_length);
950 uchar pad_char;
951 extra_val.consume(&pad_char, sizeof pad_char);
952 assert_always(extra_val.size() == extra_val.limit());
953
954 assert_always(new_length >= old_length); // expand only
955
956 DBT new_val; memset(&new_val, 0, sizeof new_val);
957
958 if (old_val != NULL) {
959 // old field within the old val
960 assert_always(the_offset + old_length <= old_val->size);
961
962 // compute the new val from the old val
963 uchar* old_val_ptr = (uchar*)old_val->data;
964
965 // allocate space for the new val's data
966 uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
967 old_val->size + (new_length - old_length),
968 MYF(MY_FAE));
969 if (!new_val_ptr) {
970 error = ENOMEM;
971 goto cleanup;
972 }
973 new_val.data = new_val_ptr;
974
975 // copy up to the old offset
976 memcpy(new_val_ptr, old_val_ptr, the_offset);
977 new_val_ptr += the_offset;
978 old_val_ptr += the_offset;
979
980 switch (operation) {
981 case UPDATE_OP_EXPAND_CHAR:
982 case UPDATE_OP_EXPAND_BINARY:
983 // fill the entire new value with the pad char
984 memset(new_val_ptr, pad_char, new_length);
985 // overlay the low bytes of the new value with the old value
986 memcpy(new_val_ptr, old_val_ptr, old_length);
987 new_val_ptr += new_length;
988 old_val_ptr += old_length;
989 break;
990 default:
991 assert_unreachable();
992 }
993
994 // copy the rest
995 size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
996 memcpy(new_val_ptr, old_val_ptr, n);
997 new_val_ptr += n;
998 old_val_ptr += n;
999 new_val.size = new_val_ptr - (uchar *)new_val.data;
1000
1001 assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
1002 assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
1003
1004 // set the new val
1005 set_val(&new_val, set_extra);
1006 }
1007
1008 error = 0;
1009
1010 cleanup:
1011 tokudb::memory::free(new_val.data);
1012 return error;
1013 }
1014
1015 namespace tokudb {
1016
1017 class var_fields {
1018 public:
var_fields()1019 inline var_fields() {
1020 }
init_var_fields(uint32_t var_offset,uint32_t offset_bytes,uint32_t bytes_per_offset,tokudb::buffer * val_buffer)1021 inline void init_var_fields(
1022 uint32_t var_offset,
1023 uint32_t offset_bytes,
1024 uint32_t bytes_per_offset,
1025 tokudb::buffer* val_buffer) {
1026
1027 assert_always(
1028 bytes_per_offset == 0 ||
1029 bytes_per_offset == 1 ||
1030 bytes_per_offset == 2);
1031 m_var_offset = var_offset;
1032 m_val_offset = m_var_offset + offset_bytes;
1033 m_bytes_per_offset = bytes_per_offset;
1034 if (bytes_per_offset > 0) {
1035 m_num_fields = offset_bytes / bytes_per_offset;
1036 } else {
1037 assert_always(offset_bytes == 0);
1038 m_num_fields = 0;
1039 }
1040 m_val_buffer = val_buffer;
1041 }
1042 uint32_t value_offset(uint32_t var_index);
1043 uint32_t value_length(uint32_t var_index);
1044 void update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s);
1045 uint32_t end_offset();
1046 void replace(
1047 uint32_t var_index,
1048 void* new_val_ptr,
1049 uint32_t new_val_length);
1050 private:
1051 uint32_t read_offset(uint32_t var_index);
1052 void write_offset(uint32_t var_index, uint32_t v);
1053 private:
1054 uint32_t m_var_offset;
1055 uint32_t m_val_offset;
1056 uint32_t m_bytes_per_offset;
1057 uint32_t m_num_fields;
1058 tokudb::buffer* m_val_buffer;
1059 };
1060
1061 // Return the ith variable length offset
read_offset(uint32_t var_index)1062 uint32_t var_fields::read_offset(uint32_t var_index) {
1063 uint32_t offset = 0;
1064 m_val_buffer->read(
1065 &offset, m_bytes_per_offset, m_var_offset + var_index * m_bytes_per_offset);
1066 return offset;
1067 }
1068
1069 // Write the ith variable length offset with a new offset.
write_offset(uint32_t var_index,uint32_t new_offset)1070 void var_fields::write_offset(uint32_t var_index, uint32_t new_offset) {
1071 m_val_buffer->write(
1072 &new_offset,
1073 m_bytes_per_offset,
1074 m_var_offset + var_index * m_bytes_per_offset);
1075 }
1076
1077 // Return the offset of the ith variable length field
value_offset(uint32_t var_index)1078 uint32_t var_fields::value_offset(uint32_t var_index) {
1079 assert_always(var_index < m_num_fields);
1080 if (var_index == 0)
1081 return m_val_offset;
1082 else
1083 return m_val_offset + read_offset(var_index-1);
1084 }
1085
1086 // Return the length of the ith variable length field
value_length(uint32_t var_index)1087 uint32_t var_fields::value_length(uint32_t var_index) {
1088 assert_always(var_index < m_num_fields);
1089 if (var_index == 0)
1090 return read_offset(0);
1091 else
1092 return read_offset(var_index) - read_offset(var_index-1);
1093 }
1094
1095 // The length of the ith variable length fields changed.
1096 // Update all of the subsequent offsets.
update_offsets(uint32_t var_index,uint32_t old_s,uint32_t new_s)1097 void var_fields::update_offsets(
1098 uint32_t var_index,
1099 uint32_t old_s,
1100 uint32_t new_s) {
1101
1102 assert_always(var_index < m_num_fields);
1103 if (old_s == new_s)
1104 return;
1105 for (uint i = var_index; i < m_num_fields; i++) {
1106 uint32_t v = read_offset(i);
1107 if (new_s > old_s)
1108 write_offset(i, v + (new_s - old_s));
1109 else
1110 write_offset(i, v - (old_s - new_s));
1111 }
1112 }
1113
end_offset()1114 uint32_t var_fields::end_offset() {
1115 if (m_num_fields == 0)
1116 return m_val_offset;
1117 else
1118 return m_val_offset + read_offset(m_num_fields-1);
1119 }
1120
replace(uint32_t var_index,void * new_val_ptr,uint32_t new_val_length)1121 void var_fields::replace(
1122 uint32_t var_index,
1123 void* new_val_ptr,
1124 uint32_t new_val_length) {
1125
1126 // replace the new val with the extra val
1127 uint32_t the_offset = value_offset(var_index);
1128 uint32_t old_s = value_length(var_index);
1129 uint32_t new_s = new_val_length;
1130 m_val_buffer->replace(the_offset, old_s, new_val_ptr, new_s);
1131
1132 // update the var offsets
1133 update_offsets(var_index, old_s, new_s);
1134 }
1135
1136 class blob_fields {
1137 public:
blob_fields()1138 blob_fields() {
1139 }
init_blob_fields(uint32_t num_blobs,const uint8_t * blob_lengths,tokudb::buffer * val_buffer)1140 void init_blob_fields(
1141 uint32_t num_blobs,
1142 const uint8_t* blob_lengths,
1143 tokudb::buffer* val_buffer) {
1144 m_num_blobs = num_blobs;
1145 m_blob_lengths = blob_lengths;
1146 m_val_buffer = val_buffer;
1147 }
start_blobs(uint32_t offset)1148 void start_blobs(uint32_t offset) {
1149 m_blob_offset = offset;
1150 }
1151 void replace(uint32_t blob_index, uint32_t length, void *p);
1152
1153 void expand_length(
1154 uint32_t blob_index,
1155 uint8_t old_length_length,
1156 uint8_t new_length_length);
1157 private:
1158 uint32_t read_length(uint32_t offset, size_t size);
1159 void write_length(uint32_t offset, size_t size, uint32_t new_length);
1160 uint32_t blob_offset(uint32_t blob_index);
1161 private:
1162 uint32_t m_blob_offset;
1163 uint32_t m_num_blobs;
1164 const uint8_t *m_blob_lengths;
1165 tokudb::buffer *m_val_buffer;
1166 };
1167
read_length(uint32_t offset,size_t blob_length)1168 uint32_t blob_fields::read_length(uint32_t offset, size_t blob_length) {
1169 uint32_t length = 0;
1170 m_val_buffer->read(&length, blob_length, offset);
1171 return length;
1172 }
1173
write_length(uint32_t offset,size_t size,uint32_t new_length)1174 void blob_fields::write_length(
1175 uint32_t offset,
1176 size_t size,
1177 uint32_t new_length) {
1178 m_val_buffer->write(&new_length, size, offset);
1179 }
1180
blob_offset(uint32_t blob_index)1181 uint32_t blob_fields::blob_offset(uint32_t blob_index) {
1182 assert_always(blob_index < m_num_blobs);
1183 uint32_t offset = m_blob_offset;
1184 for (uint i = 0; i < blob_index; i++) {
1185 uint32_t blob_length = m_blob_lengths[i];
1186 uint32_t length = read_length(offset, blob_length);
1187 offset += blob_length + length;
1188 }
1189 return offset;
1190 }
1191
replace(uint32_t blob_index,uint32_t new_length,void * new_value)1192 void blob_fields::replace(
1193 uint32_t blob_index,
1194 uint32_t new_length,
1195 void* new_value) {
1196
1197 assert_always(blob_index < m_num_blobs);
1198
1199 // compute the ith blob offset
1200 uint32_t offset = blob_offset(blob_index);
1201 uint8_t blob_length = m_blob_lengths[blob_index];
1202
1203 // read the old length
1204 uint32_t old_length = read_length(offset, blob_length);
1205
1206 // replace the data
1207 m_val_buffer->replace(
1208 offset + blob_length,
1209 old_length,
1210 new_value,
1211 new_length);
1212
1213 // write the new length
1214 write_length(offset, blob_length, new_length);
1215 }
1216
expand_length(uint32_t blob_index,uint8_t old_length_length,uint8_t new_length_length)1217 void blob_fields::expand_length(
1218 uint32_t blob_index,
1219 uint8_t old_length_length,
1220 uint8_t new_length_length) {
1221
1222 assert_always(blob_index < m_num_blobs);
1223 assert_always(old_length_length == m_blob_lengths[blob_index]);
1224
1225 // compute the ith blob offset
1226 uint32_t offset = blob_offset(blob_index);
1227
1228 // read the blob length
1229 uint32_t blob_length = read_length(offset, old_length_length);
1230
1231 // expand the length
1232 m_val_buffer->replace(
1233 offset,
1234 old_length_length,
1235 &blob_length,
1236 new_length_length);
1237 }
1238
1239 class value_map {
1240 public:
value_map(tokudb::buffer * val_buffer)1241 value_map(tokudb::buffer *val_buffer) : m_val_buffer(val_buffer) {
1242 }
1243
init_var_fields(uint32_t var_offset,uint32_t offset_bytes,uint32_t bytes_per_offset)1244 void init_var_fields(
1245 uint32_t var_offset,
1246 uint32_t offset_bytes,
1247 uint32_t bytes_per_offset) {
1248
1249 m_var_fields.init_var_fields(
1250 var_offset,
1251 offset_bytes,
1252 bytes_per_offset,
1253 m_val_buffer);
1254 }
1255
init_blob_fields(uint32_t num_blobs,const uint8_t * blob_lengths)1256 void init_blob_fields(uint32_t num_blobs, const uint8_t *blob_lengths) {
1257 m_blob_fields.init_blob_fields(num_blobs, blob_lengths, m_val_buffer);
1258 }
1259
1260 // Replace the value of a fixed length field
replace_fixed(uint32_t the_offset,uint32_t field_null_num,void * new_val_ptr,uint32_t new_val_length)1261 void replace_fixed(
1262 uint32_t the_offset,
1263 uint32_t field_null_num,
1264 void* new_val_ptr,
1265 uint32_t new_val_length) {
1266
1267 m_val_buffer->replace(
1268 the_offset,
1269 new_val_length,
1270 new_val_ptr,
1271 new_val_length);
1272 maybe_clear_null(field_null_num);
1273 }
1274
1275 // Replace the value of a variable length field
replace_varchar(uint32_t var_index,uint32_t field_null_num,void * new_val_ptr,uint32_t new_val_length)1276 void replace_varchar(
1277 uint32_t var_index,
1278 uint32_t field_null_num,
1279 void* new_val_ptr,
1280 uint32_t new_val_length) {
1281
1282 m_var_fields.replace(var_index, new_val_ptr, new_val_length);
1283 maybe_clear_null(field_null_num);
1284 }
1285
1286 // Replace the value of a blob field
replace_blob(uint32_t blob_index,uint32_t field_null_num,void * new_val_ptr,uint32_t new_val_length)1287 void replace_blob(
1288 uint32_t blob_index,
1289 uint32_t field_null_num,
1290 void* new_val_ptr,
1291 uint32_t new_val_length) {
1292
1293 m_blob_fields.start_blobs(m_var_fields.end_offset());
1294 m_blob_fields.replace(blob_index, new_val_length, new_val_ptr);
1295 maybe_clear_null(field_null_num);
1296 }
1297
1298 void expand_blob_lengths(
1299 uint32_t num_blob,
1300 const uint8_t* old_length,
1301 const uint8_t* new_length);
1302
1303 void int_op(
1304 uint32_t operation,
1305 uint32_t the_offset,
1306 uint32_t length,
1307 uint32_t field_null_num,
1308 tokudb::buffer& old_val,
1309 void* extra_val);
1310
1311 void uint_op(
1312 uint32_t operation,
1313 uint32_t the_offset,
1314 uint32_t length,
1315 uint32_t field_null_num,
1316 tokudb::buffer& old_val,
1317 void* extra_val);
1318
1319 private:
is_null(uint32_t null_num,uchar * null_bytes)1320 bool is_null(uint32_t null_num, uchar *null_bytes) {
1321 bool field_is_null = false;
1322 if (null_num) {
1323 if (null_num & (1<<31))
1324 null_num &= ~(1<<31);
1325 else
1326 null_num -= 1;
1327 field_is_null = is_overall_null_position_set(null_bytes, null_num);
1328 }
1329 return field_is_null;
1330 }
1331
maybe_clear_null(uint32_t null_num)1332 void maybe_clear_null(uint32_t null_num) {
1333 if (null_num) {
1334 if (null_num & (1<<31))
1335 null_num &= ~(1<<31);
1336 else
1337 null_num -= 1;
1338 set_overall_null_position(
1339 (uchar*)m_val_buffer->data(),
1340 null_num,
1341 false);
1342 }
1343 }
1344
1345 private:
1346 var_fields m_var_fields;
1347 blob_fields m_blob_fields;
1348 tokudb::buffer *m_val_buffer;
1349 };
1350
1351 // Update an int field: signed newval@offset = old_val@offset OP extra_val
int_op(uint32_t operation,uint32_t the_offset,uint32_t length,uint32_t field_null_num,tokudb::buffer & old_val,void * extra_val)1352 void value_map::int_op(
1353 uint32_t operation,
1354 uint32_t the_offset,
1355 uint32_t length,
1356 uint32_t field_null_num,
1357 tokudb::buffer &old_val,
1358 void* extra_val) {
1359
1360 assert_always(the_offset + length <= m_val_buffer->size());
1361 assert_always(the_offset + length <= old_val.size());
1362 assert_always(
1363 length == 1 || length == 2 || length == 3 ||
1364 length == 4 || length == 8);
1365
1366 uchar *old_val_ptr = (uchar *) old_val.data();
1367 bool field_is_null = is_null(field_null_num, old_val_ptr);
1368 int64_t v = 0;
1369 memcpy(&v, old_val_ptr + the_offset, length);
1370 v = tokudb::int_sign_extend(v, 8*length);
1371 int64_t extra_v = 0;
1372 memcpy(&extra_v, extra_val, length);
1373 extra_v = tokudb::int_sign_extend(extra_v, 8*length);
1374 switch (operation) {
1375 case '+':
1376 if (!field_is_null) {
1377 bool over;
1378 v = tokudb::int_add(v, extra_v, 8*length, &over);
1379 if (over) {
1380 if (extra_v > 0)
1381 v = tokudb::int_high_endpoint(8*length);
1382 else
1383 v = tokudb::int_low_endpoint(8*length);
1384 }
1385 m_val_buffer->replace(the_offset, length, &v, length);
1386 }
1387 break;
1388 case '-':
1389 if (!field_is_null) {
1390 bool over;
1391 v = tokudb::int_sub(v, extra_v, 8*length, &over);
1392 if (over) {
1393 if (extra_v > 0)
1394 v = tokudb::int_low_endpoint(8*length);
1395 else
1396 v = tokudb::int_high_endpoint(8*length);
1397 }
1398 m_val_buffer->replace(the_offset, length, &v, length);
1399 }
1400 break;
1401 default:
1402 assert_unreachable();
1403 }
1404 }
1405
1406 // Update an unsigned field: unsigned newval@offset = old_val@offset OP extra_val
uint_op(uint32_t operation,uint32_t the_offset,uint32_t length,uint32_t field_null_num,tokudb::buffer & old_val,void * extra_val)1407 void value_map::uint_op(
1408 uint32_t operation,
1409 uint32_t the_offset,
1410 uint32_t length,
1411 uint32_t field_null_num,
1412 tokudb::buffer& old_val,
1413 void* extra_val) {
1414
1415 assert_always(the_offset + length <= m_val_buffer->size());
1416 assert_always(the_offset + length <= old_val.size());
1417 assert_always(
1418 length == 1 || length == 2 || length == 3 ||
1419 length == 4 || length == 8);
1420
1421 uchar *old_val_ptr = (uchar *) old_val.data();
1422 bool field_is_null = is_null(field_null_num, old_val_ptr);
1423 uint64_t v = 0;
1424 memcpy(&v, old_val_ptr + the_offset, length);
1425 uint64_t extra_v = 0;
1426 memcpy(&extra_v, extra_val, length);
1427 switch (operation) {
1428 case '+':
1429 if (!field_is_null) {
1430 bool over;
1431 v = tokudb::uint_add(v, extra_v, 8*length, &over);
1432 if (over) {
1433 v = tokudb::uint_high_endpoint(8*length);
1434 }
1435 m_val_buffer->replace(the_offset, length, &v, length);
1436 }
1437 break;
1438 case '-':
1439 if (!field_is_null) {
1440 bool over;
1441 v = tokudb::uint_sub(v, extra_v, 8*length, &over);
1442 if (over) {
1443 v = tokudb::uint_low_endpoint(8*length);
1444 }
1445 m_val_buffer->replace(the_offset, length, &v, length);
1446 }
1447 break;
1448 default:
1449 assert_unreachable();
1450 }
1451 }
1452
expand_blob_lengths(uint32_t num_blob,const uint8_t * old_length,const uint8_t * new_length)1453 void value_map::expand_blob_lengths(
1454 uint32_t num_blob,
1455 const uint8_t* old_length,
1456 const uint8_t* new_length) {
1457
1458 uint8_t current_length[num_blob];
1459 memcpy(current_length, old_length, num_blob);
1460 for (uint32_t i = 0; i < num_blob; i++) {
1461 if (new_length[i] > current_length[i]) {
1462 m_blob_fields.init_blob_fields(
1463 num_blob,
1464 current_length,
1465 m_val_buffer);
1466 m_blob_fields.start_blobs(m_var_fields.end_offset());
1467 m_blob_fields.expand_length(i, current_length[i], new_length[i]);
1468 current_length[i] = new_length[i];
1469 }
1470 }
1471 }
1472
1473 }
1474
consume_uint32(tokudb::buffer & b)1475 static uint32_t consume_uint32(tokudb::buffer &b) {
1476 uint32_t n;
1477 size_t s = b.consume_ui<uint32_t>(&n);
1478 assert_always(s > 0);
1479 return n;
1480 }
1481
consume_uint8_array(tokudb::buffer & b,uint32_t array_size)1482 static uint8_t *consume_uint8_array(tokudb::buffer &b, uint32_t array_size) {
1483 uint8_t *p = (uint8_t *) b.consume_ptr(array_size);
1484 assert_always(p);
1485 return p;
1486 }
1487
tokudb_expand_blobs(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1488 static int tokudb_expand_blobs(const DBT* old_val_dbt,
1489 const DBT* extra,
1490 void (*set_val)(const DBT* new_val_dbt,
1491 void* set_extra),
1492 void* set_extra) {
1493 tokudb::buffer extra_val(extra->data, 0, extra->size);
1494
1495 uint8_t operation;
1496 extra_val.consume(&operation, sizeof operation);
1497 assert_always(operation == UPDATE_OP_EXPAND_BLOB);
1498
1499 if (old_val_dbt != NULL) {
1500 // new val = old val
1501 tokudb::buffer new_val;
1502 new_val.append(old_val_dbt->data, old_val_dbt->size);
1503
1504 tokudb::value_map vd(&new_val);
1505
1506 // decode variable field info
1507 uint32_t var_field_offset = consume_uint32(extra_val);
1508 uint32_t var_offset_bytes = consume_uint32(extra_val);
1509 uint32_t bytes_per_offset = consume_uint32(extra_val);
1510 vd.init_var_fields(
1511 var_field_offset,
1512 var_offset_bytes,
1513 bytes_per_offset);
1514
1515 // decode blob info
1516 uint32_t num_blob = consume_uint32(extra_val);
1517 const uint8_t* old_blob_length =
1518 consume_uint8_array(extra_val, num_blob);
1519 const uint8_t* new_blob_length =
1520 consume_uint8_array(extra_val, num_blob);
1521 assert_always(extra_val.size() == extra_val.limit());
1522
1523 // expand blob lengths
1524 vd.expand_blob_lengths(num_blob, old_blob_length, new_blob_length);
1525
1526 // set the new val
1527 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1528 new_val_dbt.data = new_val.data();
1529 new_val_dbt.size = new_val.size();
1530 set_val(&new_val_dbt, set_extra);
1531 }
1532 return 0;
1533 }
1534
1535 // Decode and apply a sequence of update operations defined in the extra to
1536 // the old value and put the result in the new value.
apply_1_updates(tokudb::value_map & vd,tokudb::buffer & old_val,tokudb::buffer & extra_val)1537 static void apply_1_updates(tokudb::value_map& vd,
1538 tokudb::buffer& old_val,
1539 tokudb::buffer& extra_val) {
1540 uint32_t num_updates;
1541 extra_val.consume(&num_updates, sizeof num_updates);
1542 for ( ; num_updates > 0; num_updates--) {
1543 // get the update operation
1544 uint32_t update_operation;
1545 extra_val.consume(&update_operation, sizeof update_operation);
1546 uint32_t field_type;
1547 extra_val.consume(&field_type, sizeof field_type);
1548 uint32_t unused;
1549 extra_val.consume(&unused, sizeof unused);
1550 uint32_t field_null_num;
1551 extra_val.consume(&field_null_num, sizeof field_null_num);
1552 uint32_t the_offset;
1553 extra_val.consume(&the_offset, sizeof the_offset);
1554 uint32_t extra_val_length;
1555 extra_val.consume(&extra_val_length, sizeof extra_val_length);
1556 void *extra_val_ptr = extra_val.consume_ptr(extra_val_length);
1557
1558 // apply the update
1559 switch (field_type) {
1560 case UPDATE_TYPE_INT:
1561 if (update_operation == '=')
1562 vd.replace_fixed(
1563 the_offset,
1564 field_null_num,
1565 extra_val_ptr,
1566 extra_val_length);
1567 else
1568 vd.int_op(
1569 update_operation,
1570 the_offset,
1571 extra_val_length,
1572 field_null_num,
1573 old_val,
1574 extra_val_ptr);
1575 break;
1576 case UPDATE_TYPE_UINT:
1577 if (update_operation == '=')
1578 vd.replace_fixed(
1579 the_offset,
1580 field_null_num,
1581 extra_val_ptr,
1582 extra_val_length);
1583 else
1584 vd.uint_op(
1585 update_operation,
1586 the_offset,
1587 extra_val_length,
1588 field_null_num,
1589 old_val,
1590 extra_val_ptr);
1591 break;
1592 case UPDATE_TYPE_CHAR:
1593 case UPDATE_TYPE_BINARY:
1594 if (update_operation == '=')
1595 vd.replace_fixed(
1596 the_offset,
1597 field_null_num,
1598 extra_val_ptr,
1599 extra_val_length);
1600 else
1601 assert_unreachable();
1602 break;
1603 default:
1604 assert_unreachable();
1605 break;
1606 }
1607 }
1608 assert_always(extra_val.size() == extra_val.limit());
1609 }
1610
1611 // Simple update handler. Decode the update message, apply the update operations
1612 // to the old value, and set the new value.
tokudb_update_1_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1613 static int tokudb_update_1_fun(const DBT* old_val_dbt,
1614 const DBT* extra,
1615 void (*set_val)(const DBT* new_val_dbt,
1616 void* set_extra),
1617 void* set_extra) {
1618 tokudb::buffer extra_val(extra->data, 0, extra->size);
1619
1620 uint8_t operation;
1621 extra_val.consume(&operation, sizeof operation);
1622 assert_always(operation == UPDATE_OP_UPDATE_1);
1623
1624 if (old_val_dbt != NULL) {
1625 // get the simple descriptor
1626 uint32_t m_fixed_field_offset;
1627 extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
1628 uint32_t m_var_field_offset;
1629 extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
1630 uint32_t m_var_offset_bytes;
1631 extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
1632 uint32_t m_bytes_per_offset;
1633 extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
1634
1635 tokudb::buffer old_val(
1636 old_val_dbt->data,
1637 old_val_dbt->size,
1638 old_val_dbt->size);
1639
1640 // new val = old val
1641 tokudb::buffer new_val;
1642 new_val.append(old_val_dbt->data, old_val_dbt->size);
1643
1644 tokudb::value_map vd(&new_val);
1645 vd.init_var_fields(
1646 m_var_field_offset,
1647 m_var_offset_bytes,
1648 m_bytes_per_offset);
1649
1650 // apply updates to new val
1651 apply_1_updates(vd, old_val, extra_val);
1652
1653 // set the new val
1654 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1655 new_val_dbt.data = new_val.data();
1656 new_val_dbt.size = new_val.size();
1657 set_val(&new_val_dbt, set_extra);
1658 }
1659
1660 return 0;
1661 }
1662
1663 // Simple upsert handler. Decode the upsert message. If the key does not exist,
1664 // then insert a new value from the extra.
1665 // Otherwise, apply the update operations to the old value, and then set the
1666 // new value.
tokudb_upsert_1_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1667 static int tokudb_upsert_1_fun(const DBT* old_val_dbt,
1668 const DBT* extra,
1669 void (*set_val)(const DBT* new_val_dbt,
1670 void* set_extra),
1671 void* set_extra) {
1672 tokudb::buffer extra_val(extra->data, 0, extra->size);
1673
1674 uint8_t operation;
1675 extra_val.consume(&operation, sizeof operation);
1676 assert_always(operation == UPDATE_OP_UPSERT_1);
1677
1678 uint32_t insert_length;
1679 extra_val.consume(&insert_length, sizeof insert_length);
1680 void *insert_row = extra_val.consume_ptr(insert_length);
1681
1682 if (old_val_dbt == NULL) {
1683 // insert a new row
1684 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1685 new_val_dbt.size = insert_length;
1686 new_val_dbt.data = insert_row;
1687 set_val(&new_val_dbt, set_extra);
1688 } else {
1689 // decode the simple descriptor
1690 uint32_t m_fixed_field_offset;
1691 extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
1692 uint32_t m_var_field_offset;
1693 extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
1694 uint32_t m_var_offset_bytes;
1695 extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
1696 uint32_t m_bytes_per_offset;
1697 extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
1698
1699 tokudb::buffer old_val(
1700 old_val_dbt->data,
1701 old_val_dbt->size,
1702 old_val_dbt->size);
1703
1704 // new val = old val
1705 tokudb::buffer new_val;
1706 new_val.append(old_val_dbt->data, old_val_dbt->size);
1707
1708 tokudb::value_map vd(&new_val);
1709 vd.init_var_fields(
1710 m_var_field_offset,
1711 m_var_offset_bytes,
1712 m_bytes_per_offset);
1713
1714 // apply updates to new val
1715 apply_1_updates(vd, old_val, extra_val);
1716
1717 // set the new val
1718 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1719 new_val_dbt.data = new_val.data();
1720 new_val_dbt.size = new_val.size();
1721 set_val(&new_val_dbt, set_extra);
1722 }
1723
1724 return 0;
1725 }
1726
1727 // Decode and apply a sequence of update operations defined in the extra to the
1728 // old value and put the result in the new value.
apply_2_updates(tokudb::value_map & vd,tokudb::buffer & old_val,tokudb::buffer & extra_val)1729 static void apply_2_updates(tokudb::value_map& vd,
1730 tokudb::buffer& old_val,
1731 tokudb::buffer& extra_val) {
1732 uint32_t num_updates = consume_uint32(extra_val);
1733 for (uint32_t i = 0; i < num_updates; i++) {
1734 uint32_t update_operation = consume_uint32(extra_val);
1735 if (update_operation == 'v') {
1736 uint32_t var_field_offset = consume_uint32(extra_val);
1737 uint32_t var_offset_bytes = consume_uint32(extra_val);
1738 uint32_t bytes_per_offset = consume_uint32(extra_val);
1739 vd.init_var_fields(
1740 var_field_offset,
1741 var_offset_bytes,
1742 bytes_per_offset);
1743 } else if (update_operation == 'b') {
1744 uint32_t num_blobs = consume_uint32(extra_val);
1745 const uint8_t* blob_lengths =
1746 consume_uint8_array(extra_val, num_blobs);
1747 vd.init_blob_fields(num_blobs, blob_lengths);
1748 } else {
1749 uint32_t field_type = consume_uint32(extra_val);
1750 uint32_t field_null_num = consume_uint32(extra_val);
1751 uint32_t the_offset = consume_uint32(extra_val);
1752 uint32_t extra_val_length = consume_uint32(extra_val);
1753 void* extra_val_ptr = extra_val.consume_ptr(extra_val_length);
1754 assert_always(extra_val_ptr);
1755
1756 switch (field_type) {
1757 case UPDATE_TYPE_INT:
1758 if (update_operation == '=')
1759 vd.replace_fixed(
1760 the_offset,
1761 field_null_num,
1762 extra_val_ptr,
1763 extra_val_length);
1764 else
1765 vd.int_op(
1766 update_operation,
1767 the_offset,
1768 extra_val_length,
1769 field_null_num,
1770 old_val,
1771 extra_val_ptr);
1772 break;
1773 case UPDATE_TYPE_UINT:
1774 if (update_operation == '=')
1775 vd.replace_fixed(
1776 the_offset,
1777 field_null_num,
1778 extra_val_ptr,
1779 extra_val_length);
1780 else
1781 vd.uint_op(
1782 update_operation,
1783 the_offset,
1784 extra_val_length,
1785 field_null_num,
1786 old_val,
1787 extra_val_ptr);
1788 break;
1789 case UPDATE_TYPE_CHAR:
1790 case UPDATE_TYPE_BINARY:
1791 if (update_operation == '=')
1792 vd.replace_fixed(
1793 the_offset,
1794 field_null_num,
1795 extra_val_ptr,
1796 extra_val_length);
1797 else
1798 assert_unreachable();
1799 break;
1800 case UPDATE_TYPE_VARBINARY:
1801 case UPDATE_TYPE_VARCHAR:
1802 if (update_operation == '=')
1803 vd.replace_varchar(
1804 the_offset,
1805 field_null_num,
1806 extra_val_ptr,
1807 extra_val_length);
1808 else
1809 assert_unreachable();
1810 break;
1811 case UPDATE_TYPE_TEXT:
1812 case UPDATE_TYPE_BLOB:
1813 if (update_operation == '=')
1814 vd.replace_blob(
1815 the_offset,
1816 field_null_num,
1817 extra_val_ptr,
1818 extra_val_length);
1819 else
1820 assert_unreachable();
1821 break;
1822 default:
1823 assert_unreachable();
1824 }
1825 }
1826 }
1827 assert_always(extra_val.size() == extra_val.limit());
1828 }
1829
1830 // Simple update handler. Decode the update message, apply the update
1831 // operations to the old value, and set the new value.
tokudb_update_2_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1832 static int tokudb_update_2_fun(const DBT* old_val_dbt,
1833 const DBT* extra,
1834 void (*set_val)(const DBT* new_val_dbt,
1835 void* set_extra),
1836 void* set_extra) {
1837 tokudb::buffer extra_val(extra->data, 0, extra->size);
1838
1839 uint8_t op;
1840 extra_val.consume(&op, sizeof op);
1841 assert_always(op == UPDATE_OP_UPDATE_2);
1842
1843 if (old_val_dbt != NULL) {
1844 tokudb::buffer old_val(
1845 old_val_dbt->data,
1846 old_val_dbt->size,
1847 old_val_dbt->size);
1848
1849 // new val = old val
1850 tokudb::buffer new_val;
1851 new_val.append(old_val_dbt->data, old_val_dbt->size);
1852
1853 tokudb::value_map vd(&new_val);
1854
1855 // apply updates to new val
1856 apply_2_updates(vd, old_val, extra_val);
1857
1858 // set the new val
1859 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1860 new_val_dbt.data = new_val.data();
1861 new_val_dbt.size = new_val.size();
1862 set_val(&new_val_dbt, set_extra);
1863 }
1864
1865 return 0;
1866 }
1867
1868 // Simple upsert handler. Decode the upsert message. If the key does not exist,
1869 // then insert a new value from the extra.
1870 // Otherwise, apply the update operations to the old value, and then set the
1871 // new value.
tokudb_upsert_2_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1872 static int tokudb_upsert_2_fun(const DBT* old_val_dbt,
1873 const DBT* extra,
1874 void (*set_val)(const DBT* new_val_dbt,
1875 void* set_extra),
1876 void* set_extra) {
1877 tokudb::buffer extra_val(extra->data, 0, extra->size);
1878
1879 uint8_t op;
1880 extra_val.consume(&op, sizeof op);
1881 assert_always(op == UPDATE_OP_UPSERT_2);
1882
1883 uint32_t insert_length = consume_uint32(extra_val);
1884 assert_always(insert_length < extra_val.limit());
1885 void* insert_row = extra_val.consume_ptr(insert_length);
1886 assert_always(insert_row);
1887
1888 if (old_val_dbt == NULL) {
1889 // insert a new row
1890 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1891 new_val_dbt.size = insert_length;
1892 new_val_dbt.data = insert_row;
1893 set_val(&new_val_dbt, set_extra);
1894 } else {
1895 tokudb::buffer old_val(
1896 old_val_dbt->data,
1897 old_val_dbt->size,
1898 old_val_dbt->size);
1899
1900 // new val = old val
1901 tokudb::buffer new_val;
1902 new_val.append(old_val_dbt->data, old_val_dbt->size);
1903
1904 tokudb::value_map vd(&new_val);
1905
1906 // apply updates to new val
1907 apply_2_updates(vd, old_val, extra_val);
1908
1909 // set the new val
1910 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1911 new_val_dbt.data = new_val.data();
1912 new_val_dbt.size = new_val.size();
1913 set_val(&new_val_dbt, set_extra);
1914 }
1915
1916 return 0;
1917 }
1918
1919 // This function is the update callback function that is registered with the
1920 // YDB environment. It uses the first byte in the update message to identify
1921 // the update message type and call the handler for that message.
tokudb_update_fun(TOKUDB_UNUSED (DB * db),TOKUDB_UNUSED (const DBT * key),const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)1922 int tokudb_update_fun(TOKUDB_UNUSED(DB* db),
1923 TOKUDB_UNUSED(const DBT* key),
1924 const DBT* old_val,
1925 const DBT* extra,
1926 void (*set_val)(const DBT* new_val, void* set_extra),
1927 void* set_extra) {
1928 assert_always(extra->size > 0);
1929 uint8_t* extra_pos = (uchar*)extra->data;
1930 uint8_t operation = extra_pos[0];
1931 int error;
1932 switch (operation) {
1933 case UPDATE_OP_COL_ADD_OR_DROP:
1934 error = tokudb_hcad_update_fun(old_val, extra, set_val, set_extra);
1935 break;
1936 case UPDATE_OP_EXPAND_VARIABLE_OFFSETS:
1937 error =
1938 tokudb_expand_variable_offsets(old_val, extra, set_val, set_extra);
1939 break;
1940 case UPDATE_OP_EXPAND_INT:
1941 case UPDATE_OP_EXPAND_UINT:
1942 error = tokudb_expand_int_field(old_val, extra, set_val, set_extra);
1943 break;
1944 case UPDATE_OP_EXPAND_CHAR:
1945 case UPDATE_OP_EXPAND_BINARY:
1946 error = tokudb_expand_char_field(old_val, extra, set_val, set_extra);
1947 break;
1948 case UPDATE_OP_EXPAND_BLOB:
1949 error = tokudb_expand_blobs(old_val, extra, set_val, set_extra);
1950 break;
1951 case UPDATE_OP_UPDATE_1:
1952 error = tokudb_update_1_fun(old_val, extra, set_val, set_extra);
1953 break;
1954 case UPDATE_OP_UPSERT_1:
1955 error = tokudb_upsert_1_fun(old_val, extra, set_val, set_extra);
1956 break;
1957 case UPDATE_OP_UPDATE_2:
1958 error = tokudb_update_2_fun(old_val, extra, set_val, set_extra);
1959 break;
1960 case UPDATE_OP_UPSERT_2:
1961 error = tokudb_upsert_2_fun(old_val, extra, set_val, set_extra);
1962 break;
1963 default:
1964 assert_unreachable();
1965 }
1966 return error;
1967 }
1968