1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     TokuDBis is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     TokuDB is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ======= */
23 
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25 
26 // Update operation codes.  These codes get stuffed into update messages, so they can not change.
27 // The operations are currently stored in a single byte in the update message, so only 256 operations
28 // are supported.  When we need more, we can use the last (255) code to indicate that the operation code
29 // is expanded beyond 1 byte.
30 enum {
31     UPDATE_OP_COL_ADD_OR_DROP = 0,
32 
33     UPDATE_OP_EXPAND_VARIABLE_OFFSETS = 1,
34     UPDATE_OP_EXPAND_INT = 2,
35     UPDATE_OP_EXPAND_UINT = 3,
36     UPDATE_OP_EXPAND_CHAR = 4,
37     UPDATE_OP_EXPAND_BINARY = 5,
38     UPDATE_OP_EXPAND_BLOB = 6,
39 
40     UPDATE_OP_UPDATE_1 = 10,
41     UPDATE_OP_UPSERT_1 = 11,
42     UPDATE_OP_UPDATE_2 = 12,
43     UPDATE_OP_UPSERT_2 = 13,
44 };
45 
46 // Field types used in the update messages
47 enum {
48     UPDATE_TYPE_UNKNOWN = 0,
49     UPDATE_TYPE_INT = 1,
50     UPDATE_TYPE_UINT = 2,
51     UPDATE_TYPE_CHAR = 3,
52     UPDATE_TYPE_BINARY = 4,
53     UPDATE_TYPE_VARCHAR = 5,
54     UPDATE_TYPE_VARBINARY = 6,
55     UPDATE_TYPE_TEXT = 7,
56     UPDATE_TYPE_BLOB = 8,
57 };
58 
59 #define UP_COL_ADD_OR_DROP UPDATE_OP_COL_ADD_OR_DROP
60 
61 // add or drop column sub-operations
62 #define COL_DROP 0xaa
63 #define COL_ADD 0xbb
64 
65 // add or drop column types
66 #define COL_FIXED 0xcc
67 #define COL_VAR 0xdd
68 #define COL_BLOB 0xee
69 
70 #define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8
71 
72 // how much space do I need for the mutators?
73 // static stuff first:
74 // operation 1 == UP_COL_ADD_OR_DROP
75 // 8 - old null, new null
76 // 2 - old num_offset, new num_offset
77 // 8 - old fixed_field size, new fixed_field_size
78 // 8 - old and new length of offsets
79 // 8 - old and new starting null bit position
80 // TOTAL: 27
81 
82 // dynamic stuff:
83 // 4 - number of columns
84 // for each column:
85 // 1 - add or drop
86 // 1 - is nullable
87 // 4 - if nullable, position
88 // 1 - if add, whether default is null or not
89 // 1 - if fixed, var, or not
90 //  for fixed, entire default
91 //  for var, 4 bytes length, then entire default
92 //  for blob, nothing
93 // So, an upperbound is 4 + num_fields(12) + all default stuff
94 
95 // static blob stuff:
96 // 4 - num blobs
97 // 1 byte for each num blobs in old table
98 // So, an upperbound is 4 + kc_info->num_blobs
99 
100 // dynamic blob stuff:
101 // for each blob added:
102 // 1 - state if we are adding or dropping
103 // 4 - blob index
104 // if add, 1 len bytes
105 //  at most, 4 0's
106 // So, upperbound is num_blobs(1+4+1+4) = num_columns*10
107 
108 // The expand varchar offsets message is used to expand the size of an offset
109 // from 1 to 2 bytes.  Not VLQ coded.
110 //     uint8  operation          = UPDATE_OP_EXPAND_VARIABLE_OFFSETS
111 //     uint32 number of offsets
112 //     uint32 starting offset of the variable length field offsets
113 
114 // Expand the size of a fixed length column message. Not VLQ coded.
115 // The field type is encoded in the operation code.
116 //     uint8  operation          = UPDATE_OP_EXPAND_INT/UINT/CHAR/BINARY
117 //     uint32 offset             offset of the field
118 //     uint32 old length         the old length of the field's value
119 //     uint32 new length         the new length of the field's value
120 
121 //     uint8  operation          = UPDATE_OP_EXPAND_CHAR/BINARY
122 //     uint32 offset             offset of the field
123 //     uint32 old length         the old length of the field's value
124 //     uint32 new length         the new length of the field's value
125 //     uint8  pad char
126 
127 // Expand blobs message. VLQ coded.
128 //     uint8  operation = UPDATE_OP_EXPAND_BLOB
129 //     uint32 start variable offset
130 //     uint32 variable offset bytes
131 //     uint32 bytes per offset
132 //     uint32 num blobs = N
133 //     uint8  old lengths[N]
134 //     uint8  new lengths[N]
135 
136 // Update and Upsert version 1 messages. Not VLQ coded. Not used anymore, but
137 // may be in the fractal tree from a previous build.
138 //
139 // Field descriptor:
140 // Operations:
141 //     update operation   4 == { '=', '+', '-' }
142 //         x = k
143 //         x = x + k
144 //         x = x - k
145 //     field type         4 see field types above
146 //     unused             4 unused
147 //     field null num     4 bit 31 is 1 if the field is nullible and the
148 //                          remaining bits contain the null bit number
149 //     field offset       4 for fixed fields, this is the offset from
150 //                          begining of the row of the field
151 //     value:
152 //         value length   4 == N, length of the value
153 //         value          N value to add or subtract
154 //
155 // Update_1 message:
156 //     Operation          1 == UPDATE_OP_UPDATE_1
157 //     fixed field offset 4 offset of the beginning of the fixed fields
158 //     var field offset   4 offset of the variable length offsets
159 //     var_offset_bytes   1 length of offsets (Note: not big enough)
160 //     bytes_per_offset   4 number of bytes per offset
161 //     Number of update ops 4 == N
162 //     Update ops [N]
163 //
164 // Upsert_1 message:
165 //     Operation          1 == UPDATE_OP_UPSERT_1
166 //     Insert row:
167 //         length         4 == N
168 //         data           N
169 //     fixed field offset 4 offset of the beginning of the fixed fields
170 //     var field offset   4 offset of the variable length offsets
171 //     var_offset_bytes   1 length of offsets (Note: not big enough)
172 //     bytes_per_offset   4 number of bytes per offset
173 //     Number of update ops 4 == N
174 //     Update ops [N]
175 
176 // Update and Upserver version 2 messages. VLQ coded.
177 // Update version 2
178 //     uint8  operation = UPDATE_OP_UPDATE_2
179 //     uint32 number of update ops = N
180 //     uint8  update ops [ N ]
181 //
182 // Upsert version 2
183 //     uint8 operation = UPDATE_OP_UPSERT_2
184 //     uint32 insert length = N
185 //     uint8 insert data [ N ]
186 //     uint32 number of update ops = M
187 //     update ops [ M ]
188 //
189 // Variable fields info
190 //     uint32 update operation = 'v'
191 //     uint32 start offset
192 //     uint32 num varchars
193 //     uint32 bytes per offset
194 //
195 // Blobs info
196 //     uint32 update operation = 'b'
197 //     uint32 num blobs = N
198 //     uint8  blob lengths [ N ]
199 //
200 // Update operation on fixed length fields
201 //     uint32 update operation = '=', '+', '-'
202 //     uint32 field type
203 //     uint32 null num 0 => not nullable, otherwise encoded as field_null_num + 1
204 //     uint32 offset
205 //     uint32 value length = N
206 //     uint8  value [ N ]
207 //
208 // Update operation on varchar fields
209 //     uint32 update operation = '='
210 //     uint32 field type
211 //     uint32 null num
212 //     uint32 var index
213 //     uint32 value length = N
214 //     uint8  value [ N ]
215 //
216 // Update operation on blob fields
217 //     uint32 update operation = '='
218 //     uint32 field type
219 //     uint32 null num
220 //     uint32 blob index
221 //     uint32 value length = N
222 //     uint8  value [ N ]
223 
224 #include "tokudb_buffer.h"
225 #include "tokudb_math.h"
226 
227 //
228 // checks whether the bit at index pos in data is set or not
229 //
is_overall_null_position_set(uchar * data,uint32_t pos)230 static inline bool is_overall_null_position_set(uchar* data, uint32_t pos) {
231     uint32_t offset = pos/8;
232     uchar remainder = pos%8;
233     uchar null_bit = 1<<remainder;
234     return ((data[offset] & null_bit) != 0);
235 }
236 
237 //
238 // sets the bit at index pos in data to 1 if is_null, 0 otherwise
239 //
set_overall_null_position(uchar * data,uint32_t pos,bool is_null)240 static inline void set_overall_null_position(
241     uchar* data,
242     uint32_t pos,
243     bool is_null) {
244 
245     uint32_t offset = pos/8;
246     uchar remainder = pos%8;
247     uchar null_bit = 1<<remainder;
248     if (is_null) {
249         data[offset] |= null_bit;
250     }
251     else {
252         data[offset] &= ~null_bit;
253     }
254 }
255 
copy_null_bits(uint32_t start_old_pos,uint32_t start_new_pos,uint32_t num_bits,uchar * old_null_bytes,uchar * new_null_bytes)256 static inline void copy_null_bits(
257     uint32_t start_old_pos,
258     uint32_t start_new_pos,
259     uint32_t num_bits,
260     uchar* old_null_bytes,
261     uchar* new_null_bytes) {
262     for (uint32_t i = 0; i < num_bits; i++) {
263         uint32_t curr_old_pos = i + start_old_pos;
264         uint32_t curr_new_pos = i + start_new_pos;
265         // copy over old null bytes
266         if (is_overall_null_position_set(old_null_bytes,curr_old_pos)) {
267             set_overall_null_position(new_null_bytes,curr_new_pos,true);
268         }
269         else {
270             set_overall_null_position(new_null_bytes,curr_new_pos,false);
271         }
272     }
273 }
274 
copy_var_fields(uint32_t start_old_num_var_field,uint32_t num_var_fields,uchar * old_var_field_offset_ptr,uchar old_num_offset_bytes,uchar * start_new_var_field_data_ptr,uchar * start_new_var_field_offset_ptr,uchar * new_var_field_data_ptr,uchar * old_var_field_data_ptr,uint32_t new_num_offset_bytes,uint32_t * num_data_bytes_written,uint32_t * num_offset_bytes_written)275 static inline void copy_var_fields(
276     //index of var fields that we should start writing
277     uint32_t start_old_num_var_field,
278     // number of var fields to copy
279     uint32_t num_var_fields,
280     //static ptr to where offset bytes begin in old row
281     uchar* old_var_field_offset_ptr,
282     //number of offset bytes used in old row
283     uchar old_num_offset_bytes,
284     // where the new var data should be written
285     uchar* start_new_var_field_data_ptr,
286     // where the new var offsets should be written
287     uchar* start_new_var_field_offset_ptr,
288     // pointer to beginning of var fields in new row
289     uchar* new_var_field_data_ptr,
290     // pointer to beginning of var fields in old row
291     uchar* old_var_field_data_ptr,
292     // number of offset bytes used in new row
293     uint32_t new_num_offset_bytes,
294     uint32_t* num_data_bytes_written,
295     uint32_t* num_offset_bytes_written) {
296 
297     uchar* curr_new_var_field_data_ptr = start_new_var_field_data_ptr;
298     uchar* curr_new_var_field_offset_ptr = start_new_var_field_offset_ptr;
299     for (uint32_t i = 0; i < num_var_fields; i++) {
300         uint32_t field_len;
301         uint32_t start_read_offset;
302         uint32_t curr_old = i + start_old_num_var_field;
303         uchar* data_to_copy = NULL;
304         // get the length and pointer to data that needs to be copied
305         get_var_field_info(
306             &field_len,
307             &start_read_offset,
308             curr_old,
309             old_var_field_offset_ptr,
310             old_num_offset_bytes);
311         data_to_copy = old_var_field_data_ptr + start_read_offset;
312         // now need to copy field_len bytes starting from data_to_copy
313         curr_new_var_field_data_ptr = write_var_field(
314             curr_new_var_field_offset_ptr,
315             curr_new_var_field_data_ptr,
316             new_var_field_data_ptr,
317             data_to_copy,
318             field_len,
319             new_num_offset_bytes);
320         curr_new_var_field_offset_ptr += new_num_offset_bytes;
321     }
322     *num_data_bytes_written =
323         (uint32_t)(curr_new_var_field_data_ptr - start_new_var_field_data_ptr);
324     *num_offset_bytes_written =
325         (uint32_t)(curr_new_var_field_offset_ptr -
326         start_new_var_field_offset_ptr);
327 }
328 
copy_toku_blob(uchar * to_ptr,uchar * from_ptr,uint32_t len_bytes,bool skip)329 static inline uint32_t copy_toku_blob(
330     uchar* to_ptr,
331     uchar* from_ptr,
332     uint32_t len_bytes,
333     bool skip) {
334 
335     uint32_t length = 0;
336     if (!skip) {
337         memcpy(to_ptr, from_ptr, len_bytes);
338     }
339     length = get_blob_field_len(from_ptr,len_bytes);
340     if (!skip) {
341         memcpy(to_ptr + len_bytes, from_ptr + len_bytes, length);
342     }
343     return (length + len_bytes);
344 }
345 
tokudb_hcad_update_fun(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)346 static int tokudb_hcad_update_fun(const DBT* old_val,
347                                   const DBT* extra,
348                                   void (*set_val)(const DBT* new_val,
349                                                   void* set_extra),
350                                   void* set_extra) {
351     uint32_t max_num_bytes;
352     uint32_t num_columns;
353     DBT new_val;
354     uint32_t num_bytes_left;
355     uint32_t num_var_fields_to_copy;
356     uint32_t num_data_bytes_written = 0;
357     uint32_t num_offset_bytes_written = 0;
358     int error;
359     memset(&new_val, 0, sizeof(DBT));
360     uchar operation;
361     uchar* new_val_data = NULL;
362     uchar* extra_pos = NULL;
363     uchar* extra_pos_start = NULL;
364     //
365     // info for pointers into rows
366     //
367     uint32_t old_num_null_bytes;
368     uint32_t new_num_null_bytes;
369     uchar old_num_offset_bytes;
370     uchar new_num_offset_bytes;
371     uint32_t old_fixed_field_size;
372     uint32_t new_fixed_field_size;
373     uint32_t old_len_of_offsets;
374     uint32_t new_len_of_offsets;
375 
376     uchar* old_fixed_field_ptr = NULL;
377     uchar* new_fixed_field_ptr = NULL;
378     uint32_t curr_old_fixed_offset;
379     uint32_t curr_new_fixed_offset;
380 
381     uchar* old_null_bytes = NULL;
382     uchar* new_null_bytes = NULL;
383     uint32_t curr_old_null_pos;
384     uint32_t curr_new_null_pos;
385     uint32_t old_null_bits_left;
386     uint32_t new_null_bits_left;
387     uint32_t overall_null_bits_left;
388 
389     uint32_t old_num_var_fields;
390     // uint32_t new_num_var_fields;
391     uint32_t curr_old_num_var_field;
392     uint32_t curr_new_num_var_field;
393     uchar* old_var_field_offset_ptr = NULL;
394     uchar* new_var_field_offset_ptr = NULL;
395     uchar* curr_new_var_field_offset_ptr = NULL;
396     uchar* old_var_field_data_ptr = NULL;
397     uchar* new_var_field_data_ptr = NULL;
398     uchar* curr_new_var_field_data_ptr = NULL;
399 
400     uint32_t start_blob_offset;
401     uchar* start_blob_ptr;
402     uint32_t num_blob_bytes;
403 
404     // came across a delete, nothing to update
405     if (old_val == NULL) {
406         error = 0;
407         goto cleanup;
408     }
409 
410     extra_pos_start = (uchar *)extra->data;
411     extra_pos = (uchar *)extra->data;
412 
413     operation = extra_pos[0];
414     extra_pos++;
415     assert_always(operation == UP_COL_ADD_OR_DROP);
416 
417     memcpy(&old_num_null_bytes, extra_pos, sizeof(uint32_t));
418     extra_pos += sizeof(uint32_t);
419     memcpy(&new_num_null_bytes, extra_pos, sizeof(uint32_t));
420     extra_pos += sizeof(uint32_t);
421 
422     old_num_offset_bytes = extra_pos[0];
423     extra_pos++;
424     new_num_offset_bytes = extra_pos[0];
425     extra_pos++;
426 
427     memcpy(&old_fixed_field_size, extra_pos, sizeof(uint32_t));
428     extra_pos += sizeof(uint32_t);
429     memcpy(&new_fixed_field_size, extra_pos, sizeof(uint32_t));
430     extra_pos += sizeof(uint32_t);
431 
432     memcpy(&old_len_of_offsets, extra_pos, sizeof(uint32_t));
433     extra_pos += sizeof(uint32_t);
434     memcpy(&new_len_of_offsets, extra_pos, sizeof(uint32_t));
435     extra_pos += sizeof(uint32_t);
436 
437     max_num_bytes =
438         old_val->size + extra->size + new_len_of_offsets + new_fixed_field_size;
439     new_val_data = (uchar *)tokudb::memory::malloc(
440         max_num_bytes,
441         MYF(MY_FAE));
442     if (new_val_data == NULL) {
443         error = ENOMEM;
444         goto cleanup;
445     }
446 
447     old_fixed_field_ptr = (uchar *) old_val->data;
448     old_fixed_field_ptr += old_num_null_bytes;
449     new_fixed_field_ptr = new_val_data + new_num_null_bytes;
450     curr_old_fixed_offset = 0;
451     curr_new_fixed_offset = 0;
452 
453     old_num_var_fields = old_len_of_offsets/old_num_offset_bytes;
454     // new_num_var_fields = new_len_of_offsets/new_num_offset_bytes;
455     // following fields will change as we write the variable data
456     old_var_field_offset_ptr = old_fixed_field_ptr + old_fixed_field_size;
457     new_var_field_offset_ptr = new_fixed_field_ptr + new_fixed_field_size;
458     old_var_field_data_ptr = old_var_field_offset_ptr + old_len_of_offsets;
459     new_var_field_data_ptr = new_var_field_offset_ptr + new_len_of_offsets;
460     curr_new_var_field_offset_ptr = new_var_field_offset_ptr;
461     curr_new_var_field_data_ptr = new_var_field_data_ptr;
462     curr_old_num_var_field = 0;
463     curr_new_num_var_field = 0;
464 
465     old_null_bytes = (uchar *)old_val->data;
466     new_null_bytes = new_val_data;
467 
468     memcpy(&curr_old_null_pos, extra_pos, sizeof(uint32_t));
469     extra_pos += sizeof(uint32_t);
470     memcpy(&curr_new_null_pos, extra_pos, sizeof(uint32_t));
471     extra_pos += sizeof(uint32_t);
472 
473     memcpy(&num_columns, extra_pos, sizeof(num_columns));
474     extra_pos += sizeof(num_columns);
475 
476     memset(new_null_bytes, 0, new_num_null_bytes); // shut valgrind up
477 
478     //
479     // now go through and apply the change into new_val_data
480     //
481     for (uint32_t i = 0; i < num_columns; i++) {
482         uchar op_type = extra_pos[0];
483         bool is_null_default = false;
484         extra_pos++;
485 
486         assert_always(op_type == COL_DROP || op_type == COL_ADD);
487         bool nullable = (extra_pos[0] != 0);
488         extra_pos++;
489         if (nullable) {
490             uint32_t null_bit_position;
491             memcpy(&null_bit_position, extra_pos, sizeof(uint32_t));
492             extra_pos += sizeof(uint32_t);
493             uint32_t num_bits;
494             if (op_type == COL_DROP) {
495                 assert_always(curr_old_null_pos <= null_bit_position);
496                 num_bits = null_bit_position - curr_old_null_pos;
497             } else {
498                 assert_always(curr_new_null_pos <= null_bit_position);
499                 num_bits = null_bit_position - curr_new_null_pos;
500             }
501             copy_null_bits(
502                 curr_old_null_pos,
503                 curr_new_null_pos,
504                 num_bits,
505                 old_null_bytes,
506                 new_null_bytes);
507             // update the positions
508             curr_new_null_pos += num_bits;
509             curr_old_null_pos += num_bits;
510             if (op_type == COL_DROP) {
511                 curr_old_null_pos++; // account for dropped column
512             } else {
513                 is_null_default = (extra_pos[0] != 0);
514                 extra_pos++;
515                 set_overall_null_position(
516                     new_null_bytes,
517                     null_bit_position,
518                     is_null_default);
519                 curr_new_null_pos++; //account for added column
520             }
521         }
522         uchar col_type = extra_pos[0];
523         extra_pos++;
524         if (col_type == COL_FIXED) {
525             uint32_t col_offset;
526             uint32_t col_size;
527             uint32_t num_bytes_to_copy;
528             memcpy(&col_offset, extra_pos, sizeof(uint32_t));
529             extra_pos += sizeof(uint32_t);
530             memcpy(&col_size, extra_pos, sizeof(uint32_t));
531             extra_pos += sizeof(uint32_t);
532 
533             if (op_type == COL_DROP) {
534                 num_bytes_to_copy = col_offset - curr_old_fixed_offset;
535             } else {
536                 num_bytes_to_copy = col_offset - curr_new_fixed_offset;
537             }
538             memcpy(
539                 new_fixed_field_ptr + curr_new_fixed_offset,
540                 old_fixed_field_ptr + curr_old_fixed_offset,
541                 num_bytes_to_copy);
542             curr_old_fixed_offset += num_bytes_to_copy;
543             curr_new_fixed_offset += num_bytes_to_copy;
544             if (op_type == COL_DROP) {
545                 // move old_fixed_offset val to skip OVER column that is
546                 // being dropped
547                 curr_old_fixed_offset += col_size;
548             } else {
549                 if (is_null_default) {
550                     // copy zeroes
551                     memset(
552                         new_fixed_field_ptr + curr_new_fixed_offset,
553                         0,
554                         col_size);
555                 } else {
556                     // copy data from extra_pos into new row
557                     memcpy(
558                         new_fixed_field_ptr + curr_new_fixed_offset,
559                         extra_pos,
560                         col_size);
561                     extra_pos += col_size;
562                 }
563                 curr_new_fixed_offset += col_size;
564             }
565 
566         } else if (col_type == COL_VAR) {
567             uint32_t var_col_index;
568             memcpy(&var_col_index, extra_pos, sizeof(uint32_t));
569             extra_pos += sizeof(uint32_t);
570             if (op_type == COL_DROP) {
571                 num_var_fields_to_copy = var_col_index - curr_old_num_var_field;
572             } else {
573                 num_var_fields_to_copy = var_col_index - curr_new_num_var_field;
574             }
575             copy_var_fields(
576                 curr_old_num_var_field,
577                 num_var_fields_to_copy,
578                 old_var_field_offset_ptr,
579                 old_num_offset_bytes,
580                 curr_new_var_field_data_ptr,
581                 curr_new_var_field_offset_ptr,
582                 // pointer to beginning of var fields in new row
583                 new_var_field_data_ptr,
584                 // pointer to beginning of var fields in old row
585                 old_var_field_data_ptr,
586                 // number of offset bytes used in new row
587                 new_num_offset_bytes,
588                 &num_data_bytes_written,
589                 &num_offset_bytes_written);
590             curr_new_var_field_data_ptr += num_data_bytes_written;
591             curr_new_var_field_offset_ptr += num_offset_bytes_written;
592             curr_new_num_var_field += num_var_fields_to_copy;
593             curr_old_num_var_field += num_var_fields_to_copy;
594             if (op_type == COL_DROP) {
595                 curr_old_num_var_field++; // skip over dropped field
596             } else {
597                 if (is_null_default) {
598                     curr_new_var_field_data_ptr = write_var_field(
599                         curr_new_var_field_offset_ptr,
600                         curr_new_var_field_data_ptr,
601                         new_var_field_data_ptr,
602                         NULL, //copying no data
603                         0, //copying 0 bytes
604                         new_num_offset_bytes);
605                     curr_new_var_field_offset_ptr += new_num_offset_bytes;
606                 } else {
607                     uint32_t data_length;
608                     memcpy(&data_length, extra_pos, sizeof(data_length));
609                     extra_pos += sizeof(data_length);
610                     curr_new_var_field_data_ptr = write_var_field(
611                         curr_new_var_field_offset_ptr,
612                         curr_new_var_field_data_ptr,
613                         new_var_field_data_ptr,
614                         extra_pos, //copying data from mutator
615                         data_length, //copying data_length bytes
616                         new_num_offset_bytes);
617                     extra_pos += data_length;
618                     curr_new_var_field_offset_ptr += new_num_offset_bytes;
619                 }
620                 curr_new_num_var_field++; //account for added column
621             }
622         } else if (col_type == COL_BLOB) {
623             // handle blob data later
624             continue;
625         } else {
626             assert_unreachable();
627         }
628     }
629     // finish copying the null stuff
630     old_null_bits_left = 8*old_num_null_bytes - curr_old_null_pos;
631     new_null_bits_left = 8*new_num_null_bytes - curr_new_null_pos;
632     overall_null_bits_left = old_null_bits_left;
633     set_if_smaller(overall_null_bits_left, new_null_bits_left);
634     copy_null_bits(
635         curr_old_null_pos,
636         curr_new_null_pos,
637         overall_null_bits_left,
638         old_null_bytes,
639         new_null_bytes);
640     // finish copying fixed field stuff
641     num_bytes_left = old_fixed_field_size - curr_old_fixed_offset;
642     memcpy(
643         new_fixed_field_ptr + curr_new_fixed_offset,
644         old_fixed_field_ptr + curr_old_fixed_offset,
645         num_bytes_left);
646     curr_old_fixed_offset += num_bytes_left;
647     curr_new_fixed_offset += num_bytes_left;
648     // sanity check
649     assert_always(curr_new_fixed_offset == new_fixed_field_size);
650 
651     // finish copying var field stuff
652     num_var_fields_to_copy = old_num_var_fields - curr_old_num_var_field;
653     copy_var_fields(
654         curr_old_num_var_field,
655         num_var_fields_to_copy,
656         old_var_field_offset_ptr,
657         old_num_offset_bytes,
658         curr_new_var_field_data_ptr,
659         curr_new_var_field_offset_ptr,
660         // pointer to beginning of var fields in new row
661         new_var_field_data_ptr,
662         // pointer to beginning of var fields in old row
663         old_var_field_data_ptr,
664         // number of offset bytes used in new row
665         new_num_offset_bytes,
666         &num_data_bytes_written,
667         &num_offset_bytes_written);
668     curr_new_var_field_offset_ptr += num_offset_bytes_written;
669     curr_new_var_field_data_ptr += num_data_bytes_written;
670     // sanity check
671     assert_always(curr_new_var_field_offset_ptr == new_var_field_data_ptr);
672 
673     // start handling blobs
674     get_blob_field_info(
675         &start_blob_offset,
676         old_len_of_offsets,
677         old_var_field_data_ptr,
678         old_num_offset_bytes);
679     start_blob_ptr = old_var_field_data_ptr + start_blob_offset;
680     // if nothing else in extra, then there are no blobs to add or drop, so
681     // can copy blobs straight
682     if ((extra_pos - extra_pos_start) == extra->size) {
683         num_blob_bytes = old_val->size - (start_blob_ptr - old_null_bytes);
684         memcpy(curr_new_var_field_data_ptr, start_blob_ptr, num_blob_bytes);
685         curr_new_var_field_data_ptr += num_blob_bytes;
686     } else {
687         // else, there is blob information to process
688         uchar* len_bytes = NULL;
689         uint32_t curr_old_blob = 0;
690         uint32_t curr_new_blob = 0;
691         uint32_t num_old_blobs = 0;
692         uchar* curr_old_blob_ptr = start_blob_ptr;
693         memcpy(&num_old_blobs, extra_pos, sizeof(num_old_blobs));
694         extra_pos += sizeof(num_old_blobs);
695         len_bytes = extra_pos;
696         extra_pos += num_old_blobs;
697         // copy over blob fields one by one
698         while ((extra_pos - extra_pos_start) < extra->size) {
699             uchar op_type = extra_pos[0];
700             extra_pos++;
701             uint32_t num_blobs_to_copy = 0;
702             uint32_t blob_index;
703             memcpy(&blob_index, extra_pos, sizeof(blob_index));
704             extra_pos += sizeof(blob_index);
705             assert_always (op_type == COL_DROP || op_type == COL_ADD);
706             if (op_type == COL_DROP) {
707                 num_blobs_to_copy = blob_index - curr_old_blob;
708             } else {
709                 num_blobs_to_copy = blob_index - curr_new_blob;
710             }
711             for (uint32_t i = 0; i < num_blobs_to_copy; i++) {
712                 uint32_t num_bytes_written = copy_toku_blob(
713                     curr_new_var_field_data_ptr,
714                     curr_old_blob_ptr,
715                     len_bytes[curr_old_blob + i],
716                     false);
717                 curr_old_blob_ptr += num_bytes_written;
718                 curr_new_var_field_data_ptr += num_bytes_written;
719             }
720             curr_old_blob += num_blobs_to_copy;
721             curr_new_blob += num_blobs_to_copy;
722             if (op_type == COL_DROP) {
723                 // skip over blob in row
724                 uint32_t num_bytes = copy_toku_blob(
725                     NULL,
726                     curr_old_blob_ptr,
727                     len_bytes[curr_old_blob],
728                     true);
729                 curr_old_blob++;
730                 curr_old_blob_ptr += num_bytes;
731             } else {
732                 // copy new data
733                 uint32_t new_len_bytes = extra_pos[0];
734                 extra_pos++;
735                 uint32_t num_bytes = copy_toku_blob(
736                     curr_new_var_field_data_ptr,
737                     extra_pos,
738                     new_len_bytes,
739                     false);
740                 curr_new_blob++;
741                 curr_new_var_field_data_ptr += num_bytes;
742                 extra_pos += num_bytes;
743             }
744         }
745         num_blob_bytes = old_val->size - (curr_old_blob_ptr - old_null_bytes);
746         memcpy(curr_new_var_field_data_ptr, curr_old_blob_ptr, num_blob_bytes);
747         curr_new_var_field_data_ptr += num_blob_bytes;
748     }
749     new_val.data = new_val_data;
750     new_val.size = curr_new_var_field_data_ptr - new_val_data;
751     set_val(&new_val, set_extra);
752 
753     error = 0;
754 cleanup:
755     tokudb::memory::free(new_val_data);
756     return error;
757 }
758 
759 // Expand the variable offset array in the old row given the update mesage
760 // in the extra.
tokudb_expand_variable_offsets(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)761 static int tokudb_expand_variable_offsets(const DBT* old_val,
762                                           const DBT* extra,
763                                           void (*set_val)(const DBT* new_val,
764                                                           void* set_extra),
765                                           void* set_extra) {
766     int error = 0;
767     tokudb::buffer extra_val(extra->data, 0, extra->size);
768 
769     // decode the operation
770     uint8_t operation;
771     extra_val.consume(&operation, sizeof operation);
772     assert_always(operation == UPDATE_OP_EXPAND_VARIABLE_OFFSETS);
773 
774     // decode number of offsets
775     uint32_t number_of_offsets;
776     extra_val.consume(&number_of_offsets, sizeof number_of_offsets);
777 
778     // decode the offset start
779     uint32_t offset_start;
780     extra_val.consume(&offset_start, sizeof offset_start);
781 
782     assert_always(extra_val.size() == extra_val.limit());
783 
784     DBT new_val; memset(&new_val, 0, sizeof new_val);
785 
786     if (old_val != NULL) {
787         assert_always(offset_start + number_of_offsets <= old_val->size);
788 
789         // compute the new val from the old val
790         uchar* old_val_ptr = (uchar*)old_val->data;
791 
792         // allocate space for the new val's data
793         uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
794             number_of_offsets + old_val->size,
795             MYF(MY_FAE));
796         if (!new_val_ptr) {
797             error = ENOMEM;
798             goto cleanup;
799         }
800         new_val.data = new_val_ptr;
801 
802         // copy up to the start of the varchar offset
803         memcpy(new_val_ptr, old_val_ptr, offset_start);
804         new_val_ptr += offset_start;
805         old_val_ptr += offset_start;
806 
807         // expand each offset from 1 to 2 bytes
808         for (uint32_t i = 0; i < number_of_offsets; i++) {
809             uint16_t new_offset = *old_val_ptr;
810             int2store(new_val_ptr, new_offset);
811             new_val_ptr += 2;
812             old_val_ptr += 1;
813         }
814 
815         // copy the rest of the row
816         size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
817         memcpy(new_val_ptr, old_val_ptr, n);
818         new_val_ptr += n;
819         old_val_ptr += n;
820         new_val.size = new_val_ptr - (uchar *)new_val.data;
821 
822         assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
823         assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
824 
825         // set the new val
826         set_val(&new_val, set_extra);
827     }
828 
829     error = 0;
830 
831 cleanup:
832     tokudb::memory::free(new_val.data);
833     return error;
834 }
835 
836 // Expand an int field in a old row given the expand message in the extra.
tokudb_expand_int_field(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)837 static int tokudb_expand_int_field(const DBT* old_val,
838                                    const DBT* extra,
839                                    void (*set_val)(const DBT* new_val,
840                                                    void* set_extra),
841                                    void* set_extra) {
842     int error = 0;
843     tokudb::buffer extra_val(extra->data, 0, extra->size);
844 
845     uint8_t operation;
846     extra_val.consume(&operation, sizeof operation);
847     assert_always(
848         operation == UPDATE_OP_EXPAND_INT ||
849         operation == UPDATE_OP_EXPAND_UINT);
850     uint32_t the_offset;
851     extra_val.consume(&the_offset, sizeof the_offset);
852     uint32_t old_length;
853     extra_val.consume(&old_length, sizeof old_length);
854     uint32_t new_length;
855     extra_val.consume(&new_length, sizeof new_length);
856     assert_always(extra_val.size() == extra_val.limit());
857 
858     assert_always(new_length >= old_length); // expand only
859 
860     DBT new_val; memset(&new_val, 0, sizeof new_val);
861 
862     if (old_val != NULL) {
863         // old field within the old val
864         assert_always(the_offset + old_length <= old_val->size);
865 
866         // compute the new val from the old val
867         uchar* old_val_ptr = (uchar*)old_val->data;
868 
869         // allocate space for the new val's data
870         uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
871             old_val->size + (new_length - old_length),
872             MYF(MY_FAE));
873         if (!new_val_ptr) {
874             error = ENOMEM;
875             goto cleanup;
876         }
877         new_val.data = new_val_ptr;
878 
879         // copy up to the old offset
880         memcpy(new_val_ptr, old_val_ptr, the_offset);
881         new_val_ptr += the_offset;
882         old_val_ptr += the_offset;
883 
884         switch (operation) {
885         case UPDATE_OP_EXPAND_INT:
886             // fill the entire new value with ones or zeros depending on the
887             // sign bit the encoding is little endian
888             memset(
889                 new_val_ptr,
890                 (old_val_ptr[old_length-1] & 0x80) ? 0xff : 0x00,
891                 new_length);
892             // overlay the low bytes of the new value with the old value
893             memcpy(new_val_ptr, old_val_ptr, old_length);
894             new_val_ptr += new_length;
895             old_val_ptr += old_length;
896             break;
897         case UPDATE_OP_EXPAND_UINT:
898             // fill the entire new value with zeros
899             memset(new_val_ptr, 0, new_length);
900             // overlay the low bytes of the new value with the old value
901             memcpy(new_val_ptr, old_val_ptr, old_length);
902             new_val_ptr += new_length;
903             old_val_ptr += old_length;
904             break;
905         default:
906             assert_unreachable();
907         }
908 
909         // copy the rest
910         size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
911         memcpy(new_val_ptr, old_val_ptr, n);
912         new_val_ptr += n;
913         old_val_ptr += n;
914         new_val.size = new_val_ptr - (uchar *)new_val.data;
915 
916         assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
917         assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
918 
919         // set the new val
920         set_val(&new_val, set_extra);
921     }
922 
923     error = 0;
924 
925 cleanup:
926     tokudb::memory::free(new_val.data);
927     return error;
928 }
929 
930 // Expand a char field in a old row given the expand message in the extra.
tokudb_expand_char_field(const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)931 static int tokudb_expand_char_field(const DBT* old_val,
932                                     const DBT* extra,
933                                     void (*set_val)(const DBT* new_val,
934                                                     void* set_extra),
935                                     void* set_extra) {
936     int error = 0;
937     tokudb::buffer extra_val(extra->data, 0, extra->size);
938 
939     uint8_t operation;
940     extra_val.consume(&operation, sizeof operation);
941     assert_always(
942         operation == UPDATE_OP_EXPAND_CHAR ||
943         operation == UPDATE_OP_EXPAND_BINARY);
944     uint32_t the_offset;
945     extra_val.consume(&the_offset, sizeof the_offset);
946     uint32_t old_length;
947     extra_val.consume(&old_length, sizeof old_length);
948     uint32_t new_length;
949     extra_val.consume(&new_length, sizeof new_length);
950     uchar pad_char;
951     extra_val.consume(&pad_char, sizeof pad_char);
952     assert_always(extra_val.size() == extra_val.limit());
953 
954     assert_always(new_length >= old_length); // expand only
955 
956     DBT new_val; memset(&new_val, 0, sizeof new_val);
957 
958     if (old_val != NULL) {
959         // old field within the old val
960         assert_always(the_offset + old_length <= old_val->size);
961 
962         // compute the new val from the old val
963         uchar* old_val_ptr = (uchar*)old_val->data;
964 
965         // allocate space for the new val's data
966         uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
967             old_val->size + (new_length - old_length),
968             MYF(MY_FAE));
969         if (!new_val_ptr) {
970             error = ENOMEM;
971             goto cleanup;
972         }
973         new_val.data = new_val_ptr;
974 
975         // copy up to the old offset
976         memcpy(new_val_ptr, old_val_ptr, the_offset);
977         new_val_ptr += the_offset;
978         old_val_ptr += the_offset;
979 
980         switch (operation) {
981         case UPDATE_OP_EXPAND_CHAR:
982         case UPDATE_OP_EXPAND_BINARY:
983             // fill the entire new value with the pad char
984             memset(new_val_ptr, pad_char, new_length);
985             // overlay the low bytes of the new value with the old value
986             memcpy(new_val_ptr, old_val_ptr, old_length);
987             new_val_ptr += new_length;
988             old_val_ptr += old_length;
989             break;
990         default:
991             assert_unreachable();
992         }
993 
994         // copy the rest
995         size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
996         memcpy(new_val_ptr, old_val_ptr, n);
997         new_val_ptr += n;
998         old_val_ptr += n;
999         new_val.size = new_val_ptr - (uchar *)new_val.data;
1000 
1001         assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
1002         assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
1003 
1004         // set the new val
1005         set_val(&new_val, set_extra);
1006     }
1007 
1008     error = 0;
1009 
1010 cleanup:
1011     tokudb::memory::free(new_val.data);
1012     return error;
1013 }
1014 
1015 namespace tokudb {
1016 
1017 class var_fields {
1018 public:
var_fields()1019     inline var_fields() {
1020     }
init_var_fields(uint32_t var_offset,uint32_t offset_bytes,uint32_t bytes_per_offset,tokudb::buffer * val_buffer)1021     inline void init_var_fields(
1022         uint32_t var_offset,
1023         uint32_t offset_bytes,
1024         uint32_t bytes_per_offset,
1025         tokudb::buffer* val_buffer) {
1026 
1027         assert_always(
1028             bytes_per_offset == 0 ||
1029             bytes_per_offset == 1 ||
1030             bytes_per_offset == 2);
1031         m_var_offset = var_offset;
1032         m_val_offset = m_var_offset + offset_bytes;
1033         m_bytes_per_offset = bytes_per_offset;
1034         if (bytes_per_offset > 0) {
1035             m_num_fields = offset_bytes / bytes_per_offset;
1036         } else {
1037             assert_always(offset_bytes == 0);
1038             m_num_fields = 0;
1039         }
1040         m_val_buffer = val_buffer;
1041     }
1042     uint32_t value_offset(uint32_t var_index);
1043     uint32_t value_length(uint32_t var_index);
1044     void update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s);
1045     uint32_t end_offset();
1046     void replace(
1047         uint32_t var_index,
1048         void* new_val_ptr,
1049         uint32_t new_val_length);
1050 private:
1051     uint32_t read_offset(uint32_t var_index);
1052     void write_offset(uint32_t var_index, uint32_t v);
1053 private:
1054     uint32_t m_var_offset;
1055     uint32_t m_val_offset;
1056     uint32_t m_bytes_per_offset;
1057     uint32_t m_num_fields;
1058     tokudb::buffer* m_val_buffer;
1059 };
1060 
1061 // Return the ith variable length offset
read_offset(uint32_t var_index)1062 uint32_t var_fields::read_offset(uint32_t var_index) {
1063     uint32_t offset = 0;
1064     m_val_buffer->read(
1065     &offset, m_bytes_per_offset, m_var_offset + var_index * m_bytes_per_offset);
1066     return offset;
1067 }
1068 
1069 // Write the ith variable length offset with a new offset.
write_offset(uint32_t var_index,uint32_t new_offset)1070 void var_fields::write_offset(uint32_t var_index, uint32_t new_offset) {
1071     m_val_buffer->write(
1072         &new_offset,
1073         m_bytes_per_offset,
1074         m_var_offset + var_index * m_bytes_per_offset);
1075 }
1076 
1077 // Return the offset of the ith variable length field
value_offset(uint32_t var_index)1078 uint32_t var_fields::value_offset(uint32_t var_index) {
1079     assert_always(var_index < m_num_fields);
1080     if (var_index == 0)
1081         return m_val_offset;
1082     else
1083         return m_val_offset + read_offset(var_index-1);
1084 }
1085 
1086 // Return the length of the ith variable length field
value_length(uint32_t var_index)1087 uint32_t var_fields::value_length(uint32_t var_index) {
1088     assert_always(var_index < m_num_fields);
1089     if (var_index == 0)
1090         return read_offset(0);
1091     else
1092         return read_offset(var_index) - read_offset(var_index-1);
1093 }
1094 
1095 // The length of the ith variable length fields changed.
1096 // Update all of the subsequent offsets.
update_offsets(uint32_t var_index,uint32_t old_s,uint32_t new_s)1097 void var_fields::update_offsets(
1098     uint32_t var_index,
1099     uint32_t old_s,
1100     uint32_t new_s) {
1101 
1102     assert_always(var_index < m_num_fields);
1103     if (old_s == new_s)
1104         return;
1105     for (uint i = var_index; i < m_num_fields; i++) {
1106         uint32_t v = read_offset(i);
1107         if (new_s > old_s)
1108             write_offset(i, v + (new_s - old_s));
1109         else
1110             write_offset(i, v - (old_s - new_s));
1111     }
1112 }
1113 
end_offset()1114 uint32_t var_fields::end_offset() {
1115     if (m_num_fields == 0)
1116         return m_val_offset;
1117     else
1118         return m_val_offset + read_offset(m_num_fields-1);
1119 }
1120 
replace(uint32_t var_index,void * new_val_ptr,uint32_t new_val_length)1121 void var_fields::replace(
1122     uint32_t var_index,
1123     void* new_val_ptr,
1124     uint32_t new_val_length) {
1125 
1126     // replace the new val with the extra val
1127     uint32_t the_offset = value_offset(var_index);
1128     uint32_t old_s = value_length(var_index);
1129     uint32_t new_s = new_val_length;
1130     m_val_buffer->replace(the_offset, old_s, new_val_ptr, new_s);
1131 
1132     // update the var offsets
1133     update_offsets(var_index, old_s, new_s);
1134 }
1135 
1136 class blob_fields {
1137 public:
blob_fields()1138     blob_fields() {
1139     }
init_blob_fields(uint32_t num_blobs,const uint8_t * blob_lengths,tokudb::buffer * val_buffer)1140     void init_blob_fields(
1141         uint32_t num_blobs,
1142         const uint8_t* blob_lengths,
1143         tokudb::buffer* val_buffer) {
1144         m_num_blobs = num_blobs;
1145         m_blob_lengths = blob_lengths;
1146         m_val_buffer = val_buffer;
1147     }
start_blobs(uint32_t offset)1148     void start_blobs(uint32_t offset) {
1149         m_blob_offset = offset;
1150     }
1151     void replace(uint32_t blob_index, uint32_t length, void *p);
1152 
1153     void expand_length(
1154         uint32_t blob_index,
1155         uint8_t old_length_length,
1156         uint8_t new_length_length);
1157 private:
1158     uint32_t read_length(uint32_t offset, size_t size);
1159     void write_length(uint32_t offset, size_t size, uint32_t new_length);
1160     uint32_t blob_offset(uint32_t blob_index);
1161 private:
1162     uint32_t m_blob_offset;
1163     uint32_t m_num_blobs;
1164     const uint8_t *m_blob_lengths;
1165     tokudb::buffer *m_val_buffer;
1166 };
1167 
read_length(uint32_t offset,size_t blob_length)1168 uint32_t blob_fields::read_length(uint32_t offset, size_t blob_length) {
1169     uint32_t length = 0;
1170     m_val_buffer->read(&length, blob_length, offset);
1171     return length;
1172 }
1173 
write_length(uint32_t offset,size_t size,uint32_t new_length)1174 void blob_fields::write_length(
1175     uint32_t offset,
1176     size_t size,
1177     uint32_t new_length) {
1178     m_val_buffer->write(&new_length, size, offset);
1179 }
1180 
blob_offset(uint32_t blob_index)1181 uint32_t blob_fields::blob_offset(uint32_t blob_index) {
1182     assert_always(blob_index < m_num_blobs);
1183     uint32_t offset = m_blob_offset;
1184     for (uint i = 0; i < blob_index; i++) {
1185         uint32_t blob_length = m_blob_lengths[i];
1186         uint32_t length = read_length(offset, blob_length);
1187         offset += blob_length + length;
1188     }
1189     return offset;
1190 }
1191 
replace(uint32_t blob_index,uint32_t new_length,void * new_value)1192 void blob_fields::replace(
1193     uint32_t blob_index,
1194     uint32_t new_length,
1195     void* new_value) {
1196 
1197     assert_always(blob_index < m_num_blobs);
1198 
1199     // compute the ith blob offset
1200     uint32_t offset = blob_offset(blob_index);
1201     uint8_t blob_length = m_blob_lengths[blob_index];
1202 
1203     // read the old length
1204     uint32_t old_length = read_length(offset, blob_length);
1205 
1206     // replace the data
1207     m_val_buffer->replace(
1208         offset + blob_length,
1209         old_length,
1210         new_value,
1211         new_length);
1212 
1213     // write the new length
1214     write_length(offset, blob_length, new_length);
1215 }
1216 
expand_length(uint32_t blob_index,uint8_t old_length_length,uint8_t new_length_length)1217 void blob_fields::expand_length(
1218     uint32_t blob_index,
1219     uint8_t old_length_length,
1220     uint8_t new_length_length) {
1221 
1222     assert_always(blob_index < m_num_blobs);
1223     assert_always(old_length_length == m_blob_lengths[blob_index]);
1224 
1225     // compute the ith blob offset
1226     uint32_t offset = blob_offset(blob_index);
1227 
1228     // read the blob length
1229     uint32_t blob_length = read_length(offset, old_length_length);
1230 
1231     // expand the length
1232     m_val_buffer->replace(
1233         offset,
1234         old_length_length,
1235         &blob_length,
1236         new_length_length);
1237 }
1238 
1239 class value_map {
1240 public:
value_map(tokudb::buffer * val_buffer)1241     value_map(tokudb::buffer *val_buffer) : m_val_buffer(val_buffer) {
1242     }
1243 
init_var_fields(uint32_t var_offset,uint32_t offset_bytes,uint32_t bytes_per_offset)1244     void init_var_fields(
1245         uint32_t var_offset,
1246         uint32_t offset_bytes,
1247         uint32_t bytes_per_offset) {
1248 
1249         m_var_fields.init_var_fields(
1250             var_offset,
1251             offset_bytes,
1252             bytes_per_offset,
1253             m_val_buffer);
1254     }
1255 
init_blob_fields(uint32_t num_blobs,const uint8_t * blob_lengths)1256     void init_blob_fields(uint32_t num_blobs, const uint8_t *blob_lengths) {
1257         m_blob_fields.init_blob_fields(num_blobs, blob_lengths, m_val_buffer);
1258     }
1259 
1260     // Replace the value of a fixed length field
replace_fixed(uint32_t the_offset,uint32_t field_null_num,void * new_val_ptr,uint32_t new_val_length)1261     void replace_fixed(
1262         uint32_t the_offset,
1263         uint32_t field_null_num,
1264         void* new_val_ptr,
1265         uint32_t new_val_length) {
1266 
1267         m_val_buffer->replace(
1268             the_offset,
1269             new_val_length,
1270             new_val_ptr,
1271             new_val_length);
1272         maybe_clear_null(field_null_num);
1273     }
1274 
1275     // Replace the value of a variable length field
replace_varchar(uint32_t var_index,uint32_t field_null_num,void * new_val_ptr,uint32_t new_val_length)1276     void replace_varchar(
1277         uint32_t var_index,
1278         uint32_t field_null_num,
1279         void* new_val_ptr,
1280         uint32_t new_val_length) {
1281 
1282         m_var_fields.replace(var_index, new_val_ptr, new_val_length);
1283         maybe_clear_null(field_null_num);
1284     }
1285 
1286     // Replace the value of a blob field
replace_blob(uint32_t blob_index,uint32_t field_null_num,void * new_val_ptr,uint32_t new_val_length)1287     void replace_blob(
1288         uint32_t blob_index,
1289         uint32_t field_null_num,
1290         void* new_val_ptr,
1291         uint32_t new_val_length) {
1292 
1293         m_blob_fields.start_blobs(m_var_fields.end_offset());
1294         m_blob_fields.replace(blob_index, new_val_length, new_val_ptr);
1295         maybe_clear_null(field_null_num);
1296     }
1297 
1298     void expand_blob_lengths(
1299         uint32_t num_blob,
1300         const uint8_t* old_length,
1301         const uint8_t* new_length);
1302 
1303     void int_op(
1304         uint32_t operation,
1305         uint32_t the_offset,
1306         uint32_t length,
1307         uint32_t field_null_num,
1308         tokudb::buffer& old_val,
1309         void* extra_val);
1310 
1311     void uint_op(
1312         uint32_t operation,
1313         uint32_t the_offset,
1314         uint32_t length,
1315         uint32_t field_null_num,
1316         tokudb::buffer& old_val,
1317         void* extra_val);
1318 
1319 private:
is_null(uint32_t null_num,uchar * null_bytes)1320     bool is_null(uint32_t null_num, uchar *null_bytes) {
1321         bool field_is_null = false;
1322         if (null_num) {
1323             if (null_num & (1<<31))
1324                 null_num &= ~(1<<31);
1325             else
1326                 null_num -= 1;
1327             field_is_null = is_overall_null_position_set(null_bytes, null_num);
1328         }
1329         return field_is_null;
1330     }
1331 
maybe_clear_null(uint32_t null_num)1332     void maybe_clear_null(uint32_t null_num) {
1333         if (null_num) {
1334             if (null_num & (1<<31))
1335                 null_num &= ~(1<<31);
1336             else
1337                 null_num -= 1;
1338             set_overall_null_position(
1339                 (uchar*)m_val_buffer->data(),
1340                 null_num,
1341                 false);
1342         }
1343     }
1344 
1345 private:
1346     var_fields m_var_fields;
1347     blob_fields m_blob_fields;
1348     tokudb::buffer *m_val_buffer;
1349 };
1350 
1351 // Update an int field: signed newval@offset = old_val@offset OP extra_val
int_op(uint32_t operation,uint32_t the_offset,uint32_t length,uint32_t field_null_num,tokudb::buffer & old_val,void * extra_val)1352 void value_map::int_op(
1353     uint32_t operation,
1354     uint32_t the_offset,
1355     uint32_t length,
1356     uint32_t field_null_num,
1357     tokudb::buffer &old_val,
1358     void* extra_val) {
1359 
1360     assert_always(the_offset + length <= m_val_buffer->size());
1361     assert_always(the_offset + length <= old_val.size());
1362     assert_always(
1363         length == 1 || length == 2 || length == 3 ||
1364         length == 4 || length == 8);
1365 
1366     uchar *old_val_ptr = (uchar *) old_val.data();
1367     bool field_is_null = is_null(field_null_num, old_val_ptr);
1368     int64_t v = 0;
1369     memcpy(&v, old_val_ptr + the_offset, length);
1370     v = tokudb::int_sign_extend(v, 8*length);
1371     int64_t extra_v = 0;
1372     memcpy(&extra_v, extra_val, length);
1373     extra_v = tokudb::int_sign_extend(extra_v, 8*length);
1374     switch (operation) {
1375     case '+':
1376         if (!field_is_null) {
1377             bool over;
1378             v = tokudb::int_add(v, extra_v, 8*length, &over);
1379             if (over) {
1380                 if (extra_v > 0)
1381                     v = tokudb::int_high_endpoint(8*length);
1382                 else
1383                     v = tokudb::int_low_endpoint(8*length);
1384             }
1385             m_val_buffer->replace(the_offset, length, &v, length);
1386         }
1387         break;
1388     case '-':
1389         if (!field_is_null) {
1390             bool over;
1391             v = tokudb::int_sub(v, extra_v, 8*length, &over);
1392             if (over) {
1393                 if (extra_v > 0)
1394                     v = tokudb::int_low_endpoint(8*length);
1395                 else
1396                     v = tokudb::int_high_endpoint(8*length);
1397             }
1398             m_val_buffer->replace(the_offset, length, &v, length);
1399         }
1400         break;
1401     default:
1402         assert_unreachable();
1403     }
1404 }
1405 
1406 // Update an unsigned field: unsigned newval@offset = old_val@offset OP extra_val
uint_op(uint32_t operation,uint32_t the_offset,uint32_t length,uint32_t field_null_num,tokudb::buffer & old_val,void * extra_val)1407 void value_map::uint_op(
1408     uint32_t operation,
1409     uint32_t the_offset,
1410     uint32_t length,
1411     uint32_t field_null_num,
1412     tokudb::buffer& old_val,
1413     void* extra_val) {
1414 
1415     assert_always(the_offset + length <= m_val_buffer->size());
1416     assert_always(the_offset + length <= old_val.size());
1417     assert_always(
1418         length == 1 || length == 2 || length == 3 ||
1419         length == 4 || length == 8);
1420 
1421     uchar *old_val_ptr = (uchar *) old_val.data();
1422     bool field_is_null = is_null(field_null_num, old_val_ptr);
1423     uint64_t v = 0;
1424     memcpy(&v, old_val_ptr + the_offset, length);
1425     uint64_t extra_v = 0;
1426     memcpy(&extra_v, extra_val, length);
1427     switch (operation) {
1428     case '+':
1429         if (!field_is_null) {
1430             bool over;
1431             v = tokudb::uint_add(v, extra_v, 8*length, &over);
1432             if (over) {
1433                 v = tokudb::uint_high_endpoint(8*length);
1434             }
1435             m_val_buffer->replace(the_offset, length, &v, length);
1436         }
1437         break;
1438     case '-':
1439         if (!field_is_null) {
1440             bool over;
1441             v = tokudb::uint_sub(v, extra_v, 8*length, &over);
1442             if (over) {
1443                 v = tokudb::uint_low_endpoint(8*length);
1444             }
1445             m_val_buffer->replace(the_offset, length, &v, length);
1446         }
1447         break;
1448     default:
1449         assert_unreachable();
1450     }
1451 }
1452 
expand_blob_lengths(uint32_t num_blob,const uint8_t * old_length,const uint8_t * new_length)1453 void value_map::expand_blob_lengths(
1454     uint32_t num_blob,
1455     const uint8_t* old_length,
1456     const uint8_t* new_length) {
1457 
1458     uint8_t current_length[num_blob];
1459     memcpy(current_length, old_length, num_blob);
1460     for (uint32_t i = 0; i < num_blob; i++) {
1461         if (new_length[i] > current_length[i]) {
1462             m_blob_fields.init_blob_fields(
1463                 num_blob,
1464                 current_length,
1465                 m_val_buffer);
1466             m_blob_fields.start_blobs(m_var_fields.end_offset());
1467             m_blob_fields.expand_length(i, current_length[i], new_length[i]);
1468             current_length[i] = new_length[i];
1469         }
1470     }
1471 }
1472 
1473 }
1474 
consume_uint32(tokudb::buffer & b)1475 static uint32_t consume_uint32(tokudb::buffer &b) {
1476     uint32_t n;
1477     size_t s = b.consume_ui<uint32_t>(&n);
1478     assert_always(s > 0);
1479     return n;
1480 }
1481 
consume_uint8_array(tokudb::buffer & b,uint32_t array_size)1482 static uint8_t *consume_uint8_array(tokudb::buffer &b, uint32_t array_size) {
1483     uint8_t *p = (uint8_t *) b.consume_ptr(array_size);
1484     assert_always(p);
1485     return p;
1486 }
1487 
tokudb_expand_blobs(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1488 static int tokudb_expand_blobs(const DBT* old_val_dbt,
1489                                const DBT* extra,
1490                                void (*set_val)(const DBT* new_val_dbt,
1491                                                void* set_extra),
1492                                void* set_extra) {
1493     tokudb::buffer extra_val(extra->data, 0, extra->size);
1494 
1495     uint8_t operation;
1496     extra_val.consume(&operation, sizeof operation);
1497     assert_always(operation == UPDATE_OP_EXPAND_BLOB);
1498 
1499     if (old_val_dbt != NULL) {
1500         // new val = old val
1501         tokudb::buffer new_val;
1502         new_val.append(old_val_dbt->data, old_val_dbt->size);
1503 
1504         tokudb::value_map vd(&new_val);
1505 
1506         // decode variable field info
1507         uint32_t var_field_offset = consume_uint32(extra_val);
1508         uint32_t var_offset_bytes = consume_uint32(extra_val);
1509         uint32_t bytes_per_offset = consume_uint32(extra_val);
1510         vd.init_var_fields(
1511             var_field_offset,
1512             var_offset_bytes,
1513             bytes_per_offset);
1514 
1515         // decode blob info
1516         uint32_t num_blob = consume_uint32(extra_val);
1517         const uint8_t* old_blob_length =
1518             consume_uint8_array(extra_val, num_blob);
1519         const uint8_t* new_blob_length =
1520             consume_uint8_array(extra_val, num_blob);
1521         assert_always(extra_val.size() == extra_val.limit());
1522 
1523         // expand blob lengths
1524         vd.expand_blob_lengths(num_blob, old_blob_length, new_blob_length);
1525 
1526         // set the new val
1527         DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1528         new_val_dbt.data = new_val.data();
1529         new_val_dbt.size = new_val.size();
1530         set_val(&new_val_dbt, set_extra);
1531     }
1532     return 0;
1533 }
1534 
1535 // Decode and apply a sequence of update operations defined in the extra to
1536 // the old value and put the result in the new value.
apply_1_updates(tokudb::value_map & vd,tokudb::buffer & old_val,tokudb::buffer & extra_val)1537 static void apply_1_updates(tokudb::value_map& vd,
1538                             tokudb::buffer& old_val,
1539                             tokudb::buffer& extra_val) {
1540     uint32_t num_updates;
1541     extra_val.consume(&num_updates, sizeof num_updates);
1542     for ( ; num_updates > 0; num_updates--) {
1543         // get the update operation
1544         uint32_t update_operation;
1545         extra_val.consume(&update_operation, sizeof update_operation);
1546         uint32_t field_type;
1547         extra_val.consume(&field_type, sizeof field_type);
1548         uint32_t unused;
1549         extra_val.consume(&unused, sizeof unused);
1550         uint32_t field_null_num;
1551         extra_val.consume(&field_null_num, sizeof field_null_num);
1552         uint32_t the_offset;
1553         extra_val.consume(&the_offset, sizeof the_offset);
1554         uint32_t extra_val_length;
1555         extra_val.consume(&extra_val_length, sizeof extra_val_length);
1556         void *extra_val_ptr = extra_val.consume_ptr(extra_val_length);
1557 
1558         // apply the update
1559         switch (field_type) {
1560         case UPDATE_TYPE_INT:
1561             if (update_operation == '=')
1562                 vd.replace_fixed(
1563                     the_offset,
1564                     field_null_num,
1565                     extra_val_ptr,
1566                     extra_val_length);
1567             else
1568                 vd.int_op(
1569                     update_operation,
1570                     the_offset,
1571                     extra_val_length,
1572                     field_null_num,
1573                     old_val,
1574                     extra_val_ptr);
1575             break;
1576         case UPDATE_TYPE_UINT:
1577             if (update_operation == '=')
1578                 vd.replace_fixed(
1579                     the_offset,
1580                     field_null_num,
1581                     extra_val_ptr,
1582                     extra_val_length);
1583             else
1584                 vd.uint_op(
1585                     update_operation,
1586                     the_offset,
1587                     extra_val_length,
1588                     field_null_num,
1589                     old_val,
1590                     extra_val_ptr);
1591             break;
1592         case UPDATE_TYPE_CHAR:
1593         case UPDATE_TYPE_BINARY:
1594             if (update_operation == '=')
1595                 vd.replace_fixed(
1596                     the_offset,
1597                     field_null_num,
1598                     extra_val_ptr,
1599                     extra_val_length);
1600             else
1601                 assert_unreachable();
1602             break;
1603         default:
1604             assert_unreachable();
1605             break;
1606         }
1607     }
1608     assert_always(extra_val.size() == extra_val.limit());
1609 }
1610 
1611 // Simple update handler. Decode the update message, apply the update operations
1612 // to the old value, and set the new value.
tokudb_update_1_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1613 static int tokudb_update_1_fun(const DBT* old_val_dbt,
1614                                const DBT* extra,
1615                                void (*set_val)(const DBT* new_val_dbt,
1616                                                void* set_extra),
1617                                void* set_extra) {
1618     tokudb::buffer extra_val(extra->data, 0, extra->size);
1619 
1620     uint8_t operation;
1621     extra_val.consume(&operation, sizeof operation);
1622     assert_always(operation == UPDATE_OP_UPDATE_1);
1623 
1624     if (old_val_dbt != NULL) {
1625         // get the simple descriptor
1626         uint32_t m_fixed_field_offset;
1627         extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
1628         uint32_t m_var_field_offset;
1629         extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
1630         uint32_t m_var_offset_bytes;
1631         extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
1632         uint32_t m_bytes_per_offset;
1633         extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
1634 
1635         tokudb::buffer old_val(
1636             old_val_dbt->data,
1637             old_val_dbt->size,
1638             old_val_dbt->size);
1639 
1640         // new val = old val
1641         tokudb::buffer new_val;
1642         new_val.append(old_val_dbt->data, old_val_dbt->size);
1643 
1644         tokudb::value_map vd(&new_val);
1645         vd.init_var_fields(
1646             m_var_field_offset,
1647             m_var_offset_bytes,
1648             m_bytes_per_offset);
1649 
1650         // apply updates to new val
1651         apply_1_updates(vd, old_val, extra_val);
1652 
1653         // set the new val
1654         DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1655         new_val_dbt.data = new_val.data();
1656         new_val_dbt.size = new_val.size();
1657         set_val(&new_val_dbt, set_extra);
1658     }
1659 
1660     return 0;
1661 }
1662 
1663 // Simple upsert handler. Decode the upsert message. If the key does not exist,
1664 // then insert a new value from the extra.
1665 // Otherwise, apply the update operations to the old value, and then set the
1666 // new value.
tokudb_upsert_1_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1667 static int tokudb_upsert_1_fun(const DBT* old_val_dbt,
1668                                const DBT* extra,
1669                                void (*set_val)(const DBT* new_val_dbt,
1670                                                void* set_extra),
1671                                void* set_extra) {
1672     tokudb::buffer extra_val(extra->data, 0, extra->size);
1673 
1674     uint8_t operation;
1675     extra_val.consume(&operation, sizeof operation);
1676     assert_always(operation == UPDATE_OP_UPSERT_1);
1677 
1678     uint32_t insert_length;
1679     extra_val.consume(&insert_length, sizeof insert_length);
1680     void *insert_row = extra_val.consume_ptr(insert_length);
1681 
1682     if (old_val_dbt == NULL) {
1683         // insert a new row
1684         DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1685         new_val_dbt.size = insert_length;
1686         new_val_dbt.data = insert_row;
1687         set_val(&new_val_dbt, set_extra);
1688     } else {
1689         // decode the simple descriptor
1690         uint32_t m_fixed_field_offset;
1691         extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
1692         uint32_t m_var_field_offset;
1693         extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
1694         uint32_t m_var_offset_bytes;
1695         extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
1696         uint32_t m_bytes_per_offset;
1697         extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
1698 
1699         tokudb::buffer old_val(
1700             old_val_dbt->data,
1701             old_val_dbt->size,
1702             old_val_dbt->size);
1703 
1704         // new val = old val
1705         tokudb::buffer new_val;
1706         new_val.append(old_val_dbt->data, old_val_dbt->size);
1707 
1708         tokudb::value_map vd(&new_val);
1709         vd.init_var_fields(
1710             m_var_field_offset,
1711             m_var_offset_bytes,
1712             m_bytes_per_offset);
1713 
1714         // apply updates to new val
1715         apply_1_updates(vd, old_val, extra_val);
1716 
1717         // set the new val
1718         DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1719         new_val_dbt.data = new_val.data();
1720         new_val_dbt.size = new_val.size();
1721         set_val(&new_val_dbt, set_extra);
1722     }
1723 
1724     return 0;
1725 }
1726 
1727 // Decode and apply a sequence of update operations defined in the extra to the
1728 // old value and put the result in the new value.
apply_2_updates(tokudb::value_map & vd,tokudb::buffer & old_val,tokudb::buffer & extra_val)1729 static void apply_2_updates(tokudb::value_map& vd,
1730                             tokudb::buffer& old_val,
1731                             tokudb::buffer& extra_val) {
1732     uint32_t num_updates = consume_uint32(extra_val);
1733     for (uint32_t i = 0; i < num_updates; i++) {
1734         uint32_t update_operation = consume_uint32(extra_val);
1735         if (update_operation == 'v') {
1736             uint32_t var_field_offset = consume_uint32(extra_val);
1737             uint32_t var_offset_bytes = consume_uint32(extra_val);
1738             uint32_t bytes_per_offset = consume_uint32(extra_val);
1739             vd.init_var_fields(
1740                 var_field_offset,
1741                 var_offset_bytes,
1742                 bytes_per_offset);
1743         } else if (update_operation == 'b') {
1744             uint32_t num_blobs = consume_uint32(extra_val);
1745             const uint8_t* blob_lengths =
1746                 consume_uint8_array(extra_val, num_blobs);
1747             vd.init_blob_fields(num_blobs, blob_lengths);
1748         } else {
1749             uint32_t field_type = consume_uint32(extra_val);
1750             uint32_t field_null_num = consume_uint32(extra_val);
1751             uint32_t the_offset = consume_uint32(extra_val);
1752             uint32_t extra_val_length = consume_uint32(extra_val);
1753             void* extra_val_ptr = extra_val.consume_ptr(extra_val_length);
1754             assert_always(extra_val_ptr);
1755 
1756             switch (field_type) {
1757             case UPDATE_TYPE_INT:
1758                 if (update_operation == '=')
1759                     vd.replace_fixed(
1760                         the_offset,
1761                         field_null_num,
1762                         extra_val_ptr,
1763                         extra_val_length);
1764                 else
1765                     vd.int_op(
1766                         update_operation,
1767                         the_offset,
1768                         extra_val_length,
1769                         field_null_num,
1770                         old_val,
1771                         extra_val_ptr);
1772                 break;
1773             case UPDATE_TYPE_UINT:
1774                 if (update_operation == '=')
1775                     vd.replace_fixed(
1776                         the_offset,
1777                         field_null_num,
1778                         extra_val_ptr,
1779                         extra_val_length);
1780                 else
1781                     vd.uint_op(
1782                         update_operation,
1783                         the_offset,
1784                         extra_val_length,
1785                         field_null_num,
1786                         old_val,
1787                         extra_val_ptr);
1788                 break;
1789             case UPDATE_TYPE_CHAR:
1790             case UPDATE_TYPE_BINARY:
1791                 if (update_operation == '=')
1792                     vd.replace_fixed(
1793                         the_offset,
1794                         field_null_num,
1795                         extra_val_ptr,
1796                         extra_val_length);
1797                 else
1798                     assert_unreachable();
1799                 break;
1800             case UPDATE_TYPE_VARBINARY:
1801             case UPDATE_TYPE_VARCHAR:
1802                 if (update_operation == '=')
1803                     vd.replace_varchar(
1804                         the_offset,
1805                         field_null_num,
1806                         extra_val_ptr,
1807                         extra_val_length);
1808                 else
1809                     assert_unreachable();
1810                 break;
1811             case UPDATE_TYPE_TEXT:
1812             case UPDATE_TYPE_BLOB:
1813                 if (update_operation == '=')
1814                     vd.replace_blob(
1815                         the_offset,
1816                         field_null_num,
1817                         extra_val_ptr,
1818                         extra_val_length);
1819                 else
1820                     assert_unreachable();
1821                 break;
1822             default:
1823                 assert_unreachable();
1824             }
1825         }
1826     }
1827     assert_always(extra_val.size() == extra_val.limit());
1828 }
1829 
1830 // Simple update handler. Decode the update message, apply the update
1831 // operations to the old value, and set the new value.
tokudb_update_2_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1832 static int tokudb_update_2_fun(const DBT* old_val_dbt,
1833                                const DBT* extra,
1834                                void (*set_val)(const DBT* new_val_dbt,
1835                                                void* set_extra),
1836                                void* set_extra) {
1837     tokudb::buffer extra_val(extra->data, 0, extra->size);
1838 
1839     uint8_t op;
1840     extra_val.consume(&op, sizeof op);
1841     assert_always(op == UPDATE_OP_UPDATE_2);
1842 
1843     if (old_val_dbt != NULL) {
1844         tokudb::buffer old_val(
1845             old_val_dbt->data,
1846             old_val_dbt->size,
1847             old_val_dbt->size);
1848 
1849         // new val = old val
1850         tokudb::buffer new_val;
1851         new_val.append(old_val_dbt->data, old_val_dbt->size);
1852 
1853         tokudb::value_map vd(&new_val);
1854 
1855         // apply updates to new val
1856         apply_2_updates(vd, old_val, extra_val);
1857 
1858         // set the new val
1859         DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1860         new_val_dbt.data = new_val.data();
1861         new_val_dbt.size = new_val.size();
1862         set_val(&new_val_dbt, set_extra);
1863     }
1864 
1865     return 0;
1866 }
1867 
1868 // Simple upsert handler. Decode the upsert message. If the key does not exist,
1869 // then insert a new value from the extra.
1870 // Otherwise, apply the update operations to the old value, and then set the
1871 // new value.
tokudb_upsert_2_fun(const DBT * old_val_dbt,const DBT * extra,void (* set_val)(const DBT * new_val_dbt,void * set_extra),void * set_extra)1872 static int tokudb_upsert_2_fun(const DBT* old_val_dbt,
1873                                const DBT* extra,
1874                                void (*set_val)(const DBT* new_val_dbt,
1875                                                void* set_extra),
1876                                void* set_extra) {
1877     tokudb::buffer extra_val(extra->data, 0, extra->size);
1878 
1879     uint8_t op;
1880     extra_val.consume(&op, sizeof op);
1881     assert_always(op == UPDATE_OP_UPSERT_2);
1882 
1883     uint32_t insert_length = consume_uint32(extra_val);
1884     assert_always(insert_length < extra_val.limit());
1885     void* insert_row = extra_val.consume_ptr(insert_length);
1886     assert_always(insert_row);
1887 
1888     if (old_val_dbt == NULL) {
1889         // insert a new row
1890         DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1891         new_val_dbt.size = insert_length;
1892         new_val_dbt.data = insert_row;
1893         set_val(&new_val_dbt, set_extra);
1894     } else {
1895         tokudb::buffer old_val(
1896             old_val_dbt->data,
1897             old_val_dbt->size,
1898             old_val_dbt->size);
1899 
1900         // new val = old val
1901         tokudb::buffer new_val;
1902         new_val.append(old_val_dbt->data, old_val_dbt->size);
1903 
1904         tokudb::value_map vd(&new_val);
1905 
1906         // apply updates to new val
1907         apply_2_updates(vd, old_val, extra_val);
1908 
1909         // set the new val
1910         DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1911         new_val_dbt.data = new_val.data();
1912         new_val_dbt.size = new_val.size();
1913         set_val(&new_val_dbt, set_extra);
1914     }
1915 
1916     return 0;
1917 }
1918 
1919 // This function is the update callback function that is registered with the
1920 // YDB environment. It uses the first byte in the update message to identify
1921 // the update message type and call the handler for that message.
tokudb_update_fun(TOKUDB_UNUSED (DB * db),TOKUDB_UNUSED (const DBT * key),const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra)1922 int tokudb_update_fun(TOKUDB_UNUSED(DB* db),
1923                       TOKUDB_UNUSED(const DBT* key),
1924                       const DBT* old_val,
1925                       const DBT* extra,
1926                       void (*set_val)(const DBT* new_val, void* set_extra),
1927                       void* set_extra) {
1928     assert_always(extra->size > 0);
1929     uint8_t* extra_pos = (uchar*)extra->data;
1930     uint8_t operation = extra_pos[0];
1931     int error;
1932     switch (operation) {
1933     case UPDATE_OP_COL_ADD_OR_DROP:
1934         error = tokudb_hcad_update_fun(old_val, extra, set_val, set_extra);
1935         break;
1936     case UPDATE_OP_EXPAND_VARIABLE_OFFSETS:
1937         error =
1938             tokudb_expand_variable_offsets(old_val, extra, set_val, set_extra);
1939         break;
1940     case UPDATE_OP_EXPAND_INT:
1941     case UPDATE_OP_EXPAND_UINT:
1942         error = tokudb_expand_int_field(old_val, extra, set_val, set_extra);
1943         break;
1944     case UPDATE_OP_EXPAND_CHAR:
1945     case UPDATE_OP_EXPAND_BINARY:
1946         error = tokudb_expand_char_field(old_val, extra, set_val, set_extra);
1947         break;
1948     case UPDATE_OP_EXPAND_BLOB:
1949         error = tokudb_expand_blobs(old_val, extra, set_val, set_extra);
1950         break;
1951     case UPDATE_OP_UPDATE_1:
1952         error = tokudb_update_1_fun(old_val, extra, set_val, set_extra);
1953         break;
1954     case UPDATE_OP_UPSERT_1:
1955         error = tokudb_upsert_1_fun(old_val, extra, set_val, set_extra);
1956         break;
1957     case UPDATE_OP_UPDATE_2:
1958         error = tokudb_update_2_fun(old_val, extra, set_val, set_extra);
1959         break;
1960     case UPDATE_OP_UPSERT_2:
1961         error = tokudb_upsert_2_fun(old_val, extra, set_val, set_extra);
1962         break;
1963     default:
1964         assert_unreachable();
1965     }
1966     return error;
1967 }
1968