1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     TokuDBis is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     TokuDB is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ======= */
23 
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25 
26 // Point updates and upserts
27 
28 // Restrictions:
29 //   No triggers
30 //   Statement or mixed replication
31 //   Primary key must be defined
32 //   Simple and compound primary key
33 //   Int, char and varchar primary key types
34 //   No updates on fields that are part of any key
35 //   No clustering keys
36 //   Integer and char field updates
37 //   Update expressions:
38 //       x = constant
39 //       x = x + constant
40 //       x = x - constant
41 //       x = if (x=0,0,x-1)
42 //       x = x + values(x)
43 //   Session variable disables slow updates and slow upserts
44 
45 // Future features:
46 //   Support more primary key types
47 //   Force statement logging for fast updates
48 //   Support clustering keys using broadcast updates
49 //   Support primary key ranges using multicast messages
50 //   Support more complicated update expressions
51 //   Replace field_offset
52 
53 #if defined(TOKU_INCLUDE_UPSERT) && TOKU_INCLUDE_UPSERT
54 // Debug function to dump an Item
dump_item(Item * item)55 static void dump_item(Item* item) {
56     fprintf(stderr, "%u", item->type());
57     switch (item->type()) {
58     case Item::FUNC_ITEM: {
59         Item_func* func = static_cast<Item_func*>(item);
60         uint n = func->argument_count();
61         Item** arguments = func->arguments();
62         fprintf(
63             stderr,
64             ":func=%u,%s,%u(",
65             func->functype(),
66             func->func_name(),
67             n);
68         for (uint i = 0; i < n ; i++) {
69             dump_item(arguments[i]);
70             if (i < n-1)
71                 fprintf(stderr,",");
72         }
73         fprintf(stderr, ")");
74         break;
75     }
76     case Item::INT_ITEM: {
77         Item_int* int_item = static_cast<Item_int*>(item);
78         fprintf(stderr, ":int=%lld", int_item->val_int());
79         break;
80     }
81     case Item::STRING_ITEM: {
82         Item_string* str_item = static_cast<Item_string*>(item);
83         fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
84         break;
85     }
86     case Item::FIELD_ITEM: {
87         Item_field* field_item = static_cast<Item_field*>(item);
88         fprintf(
89             stderr,
90             ":field=%s.%s.%s",
91             field_item->db_name,
92             field_item->table_name,
93             field_item->field_name.str);
94         break;
95     }
96     case Item::COND_ITEM: {
97         Item_cond* cond_item = static_cast<Item_cond*>(item);
98         fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
99         List_iterator<Item> li(*cond_item->argument_list());
100         Item* list_item;
101         while ((list_item = li++)) {
102             dump_item(list_item);
103             fprintf(stderr, "\n");
104         }
105         fprintf(stderr, ")\n");
106         break;
107     }
108     case Item::INSERT_VALUE_ITEM: {
109         Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
110         fprintf(stderr, ":insert_value");
111         dump_item(value_item->arg);
112         break;
113     }
114     default:
115         fprintf(stderr, ":unsupported\n");
116         break;
117     }
118 }
119 
120 // Debug function to dump an Item list
dump_item_list(const char * h,List<Item> & l)121 static void dump_item_list(const char* h, List<Item> &l) {
122     fprintf(stderr, "%s elements=%u\n", h, l.elements);
123     List_iterator<Item> li(l);
124     Item* item;
125     while ((item = li++) != NULL) {
126         dump_item(item);
127         fprintf(stderr, "\n");
128     }
129 }
130 
131 // Find a Field by its Item name
find_field_by_name(TOKUDB_UNUSED (TABLE * table),Item * item)132 static Field* find_field_by_name(TOKUDB_UNUSED(TABLE* table), Item* item) {
133     if (item->type() != Item::FIELD_ITEM)
134         return NULL;
135     Item_field* field_item = static_cast<Item_field*>(item);
136 #if 0
137     if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
138         strcmp(table->s->table_name.str, field_item->table_name) != 0)
139         return NULL;
140     Field *found_field = NULL;
141     for (uint i = 0; i < table->s->fields; i++) {
142         Field *test_field = table->s->field[i];
143         if (strcmp(field_item->field_name.str, test_field->field_name.str) == 0) {
144             found_field = test_field;
145             break;
146         }
147     }
148     return found_field;
149 #else
150     // item->field may be a shortcut instead of the above table lookup
151     return field_item->field;
152 #endif
153 }
154 
155 // Return the starting offset in the value for a particular index (selected by idx) of a
156 // particular field (selected by expand_field_num).
157 // This only works for fixed length fields
fixed_field_offset(uint32_t null_bytes,KEY_AND_COL_INFO * kc_info,uint idx,uint expand_field_num)158 static uint32_t fixed_field_offset(
159     uint32_t null_bytes,
160     KEY_AND_COL_INFO* kc_info,
161     uint idx,
162     uint expand_field_num) {
163 
164     uint32_t offset = null_bytes;
165     for (uint i = 0; i < expand_field_num; i++) {
166         if (bitmap_is_set(&kc_info->key_filters[idx], i))
167             continue;
168         offset += kc_info->field_lengths[i];
169     }
170     return offset;
171 }
172 
var_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint idx,uint field_num)173 static uint32_t var_field_index(
174     TABLE* table,
175     KEY_AND_COL_INFO* kc_info,
176     uint idx,
177     uint field_num) {
178 
179     assert_always(field_num < table->s->fields);
180     uint v_index = 0;
181     for (uint i = 0; i < table->s->fields; i++) {
182         if (bitmap_is_set(&kc_info->key_filters[idx], i))
183             continue;
184         if (kc_info->length_bytes[i]) {
185             if (i == field_num)
186                 break;
187             v_index++;
188         }
189     }
190     return v_index;
191 }
192 
blob_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint field_num)193 static uint32_t blob_field_index(TABLE* table,
194                                  KEY_AND_COL_INFO* kc_info,
195                                  uint field_num) {
196     assert_always(field_num < table->s->fields);
197     uint b_index;
198     for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
199         if (kc_info->blob_fields[b_index] == field_num)
200             break;
201     }
202     assert_always(b_index < kc_info->num_blobs);
203     return b_index;
204 }
205 
206 // Determine if an update operation can be offloaded to the storage engine.
207 // The update operation consists of a list of update expressions
208 // (fields[i] = values[i]), and a list of where conditions (conds).
209 // The function returns 0 if the update is handled in the storage engine.
210 // Otherwise, an error is returned.
fast_update(THD * thd,List<Item> & update_fields,List<Item> & update_values,Item * conds)211 int ha_tokudb::fast_update(
212     THD* thd,
213     List<Item>& update_fields,
214     List<Item>& update_values,
215     Item* conds) {
216 
217     TOKUDB_HANDLER_DBUG_ENTER("");
218     int error = 0;
219 
220     if (!tokudb::sysvars::enable_fast_update(thd)) {
221         error = ENOTSUP;
222         goto exit;
223     }
224 
225     if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
226         dump_item_list("fields", update_fields);
227         dump_item_list("values", update_values);
228         if (conds) {
229             fprintf(stderr, "conds\n");
230             dump_item(conds);
231             fprintf(stderr, "\n");
232         }
233     }
234 
235     if (update_fields.elements < 1 ||
236         update_fields.elements != update_values.elements) {
237         error = ENOTSUP;  // something is fishy with the parameters
238         goto exit;
239     }
240 
241     if (!check_fast_update(thd, update_fields, update_values, conds)) {
242         error = HA_ERR_UNSUPPORTED;
243         goto exit;
244     }
245 
246     error = send_update_message(
247         update_fields, update_values, conds, transaction);
248 
249     if (error) {
250         int mapped_error = map_to_handler_error(error);
251         if (mapped_error == error)
252             error = HA_ERR_UNSUPPORTED;
253     }
254 
255 exit:
256 
257     if (error != 0 && error != ENOTSUP)
258         print_error(error, MYF(0));
259 
260     TOKUDB_HANDLER_DBUG_RETURN(error);
261 }
262 
263 // Return true if an expression is a simple int expression or a simple function
264 // of +- int expression.
check_int_result(Item * item)265 static bool check_int_result(Item* item) {
266     Item::Type t = item->type();
267     if (t == Item::INT_ITEM) {
268         return true;
269     } else if (t == Item::FUNC_ITEM) {
270         Item_func* item_func = static_cast<Item_func*>(item);
271         if (strcmp(item_func->func_name(), "+") != 0 &&
272             strcmp(item_func->func_name(), "-") != 0)
273             return false;
274         if (item_func->argument_count() != 1)
275             return false;
276         Item** arguments = item_func->arguments();
277         if (arguments[0]->type() != Item::INT_ITEM)
278             return false;
279         return true;
280     } else
281         return false;
282 }
283 
284 // check that an item is an insert value item with the same field name
check_insert_value(Item * item,const char * field_name)285 static bool check_insert_value(Item* item, const char* field_name) {
286     if (item->type() != Item::INSERT_VALUE_ITEM)
287         return false;
288     Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
289     if (value_item->arg->type() != Item::FIELD_ITEM)
290         return false;
291     Item_field* arg = static_cast<Item_field*>(value_item->arg);
292     if (strcmp(field_name, arg->field_name.str) != 0)
293         return false;
294     return true;
295 }
296 
297 // Return true if an expression looks like field_name op constant.
check_x_op_constant(const char * field_name,Item * item,const char * op,Item ** item_constant,bool allow_insert_value)298 static bool check_x_op_constant(
299     const char* field_name,
300     Item* item,
301     const char* op,
302     Item** item_constant,
303     bool allow_insert_value) {
304 
305     if (item->type() != Item::FUNC_ITEM)
306         return false;
307     Item_func* item_func = static_cast<Item_func*>(item);
308     if (strcmp(item_func->func_name(), op) != 0)
309         return false;
310     Item** arguments = item_func->arguments();
311     uint n = item_func->argument_count();
312     if (n != 2)
313         return false;
314     if (arguments[0]->type() != Item::FIELD_ITEM)
315         return false;
316     Item_field* arg0 = static_cast<Item_field*>(arguments[0]);
317     if (strcmp(field_name, arg0->field_name.str) != 0)
318         return false;
319     if (!check_int_result(arguments[1]))
320         if (!(allow_insert_value &&
321             check_insert_value(arguments[1], field_name)))
322             return false;
323     *item_constant = arguments[1];
324     return true;
325 }
326 
327 // Return true if an expression looks like field_name = constant
check_x_equal_0(const char * field_name,Item * item)328 static bool check_x_equal_0(const char *field_name, Item *item) {
329     Item* item_constant;
330     if (!check_x_op_constant(field_name, item, "=", &item_constant, false))
331         return false;
332     if (item_constant->type() != Item::INT_ITEM ||
333         item_constant->val_int() != 0)
334         return false;
335     return true;
336 }
337 
338 // Return true if an expression looks like fieldname - 1
check_x_minus_1(const char * field_name,Item * item)339 static bool check_x_minus_1(const char* field_name, Item* item) {
340     Item* item_constant;
341     if (!check_x_op_constant(field_name, item, "-", &item_constant, false))
342         return false;
343     if (item_constant->type() != Item::INT_ITEM ||
344         item_constant->val_int() != 1)
345         return false;
346     return true;
347 }
348 
349 // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
350 // the field named by fieldname is an unsigned int.
check_decr_floor_expression(Field * lhs_field,Item * item)351 static bool check_decr_floor_expression(Field* lhs_field, Item* item) {
352     if (item->type() != Item::FUNC_ITEM)
353         return false;
354     Item_func* item_func = static_cast<Item_func*>(item);
355     if (strcmp(item_func->func_name(), "if") != 0)
356         return false;
357     Item** arguments = item_func->arguments();
358     uint n = item_func->argument_count();
359     if (n != 3)
360         return false;
361     if (!check_x_equal_0(lhs_field->field_name.str, arguments[0]))
362         return false;
363     if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
364         return false;
365     if (!check_x_minus_1(lhs_field->field_name.str, arguments[2]))
366         return false;
367     if (!(lhs_field->flags & UNSIGNED_FLAG))
368         return false;
369     return true;
370 }
371 
372 // Check if lhs = rhs expression is simple.  Return true if it is.
check_update_expression(Item * lhs_item,Item * rhs_item,TABLE * table,bool allow_insert_value)373 static bool check_update_expression(
374     Item* lhs_item,
375     Item* rhs_item,
376     TABLE* table,
377     bool allow_insert_value) {
378 
379     Field* lhs_field = find_field_by_name(table, lhs_item);
380     if (lhs_field == NULL)
381         return false;
382     if (!lhs_field->part_of_key.is_clear_all())
383         return false;
384     enum_field_types lhs_type = lhs_field->type();
385     Item::Type rhs_type = rhs_item->type();
386     switch (lhs_type) {
387     case MYSQL_TYPE_TINY:
388     case MYSQL_TYPE_SHORT:
389     case MYSQL_TYPE_INT24:
390     case MYSQL_TYPE_LONG:
391     case MYSQL_TYPE_LONGLONG:
392         if (check_int_result(rhs_item))
393             return true;
394         Item* item_constant;
395         if (check_x_op_constant(
396                 lhs_field->field_name.str,
397                 rhs_item,
398                 "+",
399                 &item_constant,
400                 allow_insert_value))
401             return true;
402         if (check_x_op_constant(
403                 lhs_field->field_name.str,
404                 rhs_item,
405                 "-",
406                 &item_constant,
407                 allow_insert_value))
408             return true;
409         if (check_decr_floor_expression(lhs_field, rhs_item))
410             return true;
411         break;
412     case MYSQL_TYPE_STRING:
413         if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM)
414             return true;
415         break;
416     case MYSQL_TYPE_VARCHAR:
417     case MYSQL_TYPE_BLOB:
418         if (rhs_type == Item::STRING_ITEM)
419             return true;
420         break;
421     default:
422         break;
423     }
424     return false;
425 }
426 
427 // Check that all update expressions are simple.  Return true if they are.
check_all_update_expressions(List<Item> & fields,List<Item> & values,TABLE * table,bool allow_insert_value)428 static bool check_all_update_expressions(
429     List<Item>& fields,
430     List<Item>& values,
431     TABLE* table,
432     bool allow_insert_value) {
433 
434     List_iterator<Item> lhs_i(fields);
435     List_iterator<Item> rhs_i(values);
436     while (1) {
437         Item* lhs_item = lhs_i++;
438         if (lhs_item == NULL)
439             break;
440         Item* rhs_item = rhs_i++;
441         assert_always(rhs_item != NULL);
442         if (!check_update_expression(
443                 lhs_item,
444                 rhs_item,
445                 table,
446                 allow_insert_value))
447             return false;
448     }
449     return true;
450 }
451 
full_field_in_key(TABLE * table,Field * field)452 static bool full_field_in_key(TABLE* table, Field* field) {
453     assert_always(table->s->primary_key < table->s->keys);
454     KEY* key = &table->s->key_info[table->s->primary_key];
455     for (uint i = 0; i < key->user_defined_key_parts; i++) {
456         KEY_PART_INFO* key_part = &key->key_part[i];
457         if (strcmp(field->field_name.str, key_part->field->field_name.str) == 0) {
458             return key_part->length == field->field_length;
459         }
460     }
461     return false;
462 }
463 
464 // Check that an expression looks like fieldname = constant, fieldname is part
465 // of the primary key, and the named field is an int, char or varchar type.
466 // Return true if it does.
check_pk_field_equal_constant(Item * item,TABLE * table,MY_BITMAP * pk_fields)467 static bool check_pk_field_equal_constant(
468     Item* item,
469     TABLE* table,
470     MY_BITMAP* pk_fields) {
471 
472     if (item->type() != Item::FUNC_ITEM)
473         return false;
474     Item_func* func = static_cast<Item_func*>(item);
475     if (strcmp(func->func_name(), "=") != 0)
476         return false;
477     uint n = func->argument_count();
478     if (n != 2)
479         return false;
480     Item** arguments = func->arguments();
481     Field* field = find_field_by_name(table, arguments[0]);
482     if (field == NULL)
483         return false;
484     if (!bitmap_test_and_clear(pk_fields, field->field_index))
485         return false;
486     switch (field->type()) {
487     case MYSQL_TYPE_TINY:
488     case MYSQL_TYPE_SHORT:
489     case MYSQL_TYPE_INT24:
490     case MYSQL_TYPE_LONG:
491     case MYSQL_TYPE_LONGLONG:
492         return arguments[1]->type() == Item::INT_ITEM ||
493             arguments[1]->type() == Item::STRING_ITEM;
494     case MYSQL_TYPE_STRING:
495     case MYSQL_TYPE_VARCHAR:
496         return full_field_in_key(table, field) &&
497             (arguments[1]->type() == Item::INT_ITEM ||
498              arguments[1]->type() == Item::STRING_ITEM);
499     default:
500         return false;
501     }
502 }
503 
504 // Check that the where condition covers all of the primary key components
505 // with fieldname = constant expressions.  Return true if it does.
check_point_update(Item * conds,TABLE * table)506 static bool check_point_update(Item* conds, TABLE* table) {
507     bool result = false;
508 
509     if (conds == NULL)
510         return false; // no where condition on the update
511 
512     if (table->s->primary_key >= table->s->keys)
513         return false; // no primary key defined
514 
515     // use a bitmap of the primary key fields to keep track of those fields
516     // that are covered by the where conditions
517     MY_BITMAP pk_fields;
518     if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
519         return false;
520     KEY *key = &table->s->key_info[table->s->primary_key];
521     for (uint i = 0; i < key->user_defined_key_parts; i++)
522         bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);
523 
524     switch (conds->type()) {
525     case Item::FUNC_ITEM:
526         result = check_pk_field_equal_constant(conds, table, &pk_fields);
527         break;
528     case Item::COND_ITEM: {
529         Item_cond* cond_item = static_cast<Item_cond*>(conds);
530         if (strcmp(cond_item->func_name(), "and") != 0)
531             break;
532         List_iterator<Item> li(*cond_item->argument_list());
533         Item* list_item;
534         result = true;
535         while (result == true && (list_item = li++)) {
536             result = check_pk_field_equal_constant(
537                 list_item,
538                 table,
539                 &pk_fields);
540         }
541         break;
542     }
543     default:
544         break;
545     }
546 
547     if (!bitmap_is_clear_all(&pk_fields))
548         result = false;
549     bitmap_free(&pk_fields);
550     return result;
551 }
552 
553 // Return true if there are any clustering keys (except the primary).
554 // Precompute this when the table is opened.
clustering_keys_exist(TABLE * table)555 static bool clustering_keys_exist(TABLE *table) {
556     for (uint keynr = 0; keynr < table->s->keys; keynr++) {
557         if (keynr != table->s->primary_key &&
558             key_is_clustering(&table->s->key_info[keynr]))
559             return true;
560     }
561     return false;
562 }
563 
is_strict_mode(THD * thd)564 static bool is_strict_mode(THD* thd) {
565 #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
566     return thd->is_strict_mode();
567 #else
568     return tokudb_test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
569 #endif
570 }
571 
572 // Check if an update operation can be handled by this storage engine.
573 // Return true if it can.
check_fast_update(THD * thd,List<Item> & fields,List<Item> & values,Item * conds)574 bool ha_tokudb::check_fast_update(
575     THD* thd,
576     List<Item>& fields,
577     List<Item>& values,
578     Item* conds) {
579 
580     if (!transaction)
581         return false;
582 
583     // avoid strict mode arithmetic overflow issues
584     if (is_strict_mode(thd))
585         return false;
586 
587     // no triggers
588     if (table->triggers)
589         return false;
590 
591     // no binlog
592     if (mysql_bin_log.is_open() &&
593         !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
594           thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
595         return false;
596 
597     // no clustering keys (need to broadcast an increment into the clustering
598     // keys since we are selecting with the primary key)
599     if (clustering_keys_exist(table))
600         return false;
601 
602     if (!check_all_update_expressions(fields, values, table, false))
603         return false;
604 
605     if (!check_point_update(conds, table))
606         return false;
607 
608     return true;
609 }
610 
marshall_varchar_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info,uint key_num)611 static void marshall_varchar_descriptor(
612     tokudb::buffer& b,
613     TABLE* table,
614     KEY_AND_COL_INFO* kc_info,
615     uint key_num) {
616 
617     b.append_ui<uint32_t>('v');
618     b.append_ui<uint32_t>(
619         table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
620     uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
621     b.append_ui<uint32_t>(var_offset_bytes);
622     b.append_ui<uint32_t>(
623         var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
624 }
625 
marshall_blobs_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info)626 static void marshall_blobs_descriptor(
627     tokudb::buffer& b,
628     TABLE* table,
629     KEY_AND_COL_INFO* kc_info) {
630 
631     b.append_ui<uint32_t>('b');
632     uint32_t n = kc_info->num_blobs;
633     b.append_ui<uint32_t>(n);
634     for (uint i = 0; i < n; i++) {
635         uint blob_field_index = kc_info->blob_fields[i];
636         assert_always(blob_field_index < table->s->fields);
637         uint8_t blob_field_length =
638             table->s->field[blob_field_index]->row_pack_length();
639         b.append(&blob_field_length, sizeof blob_field_length);
640     }
641 }
642 
643 static inline uint32_t get_null_bit_position(uint32_t null_bit);
644 
645 // evaluate the int value of an item
item_val_int(Item * item)646 static longlong item_val_int(Item* item) {
647     Item::Type t = item->type();
648     if (t == Item::INSERT_VALUE_ITEM) {
649         Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
650         return value_item->arg->val_int();
651     } else
652         return item->val_int();
653 }
654 
655 // Marshall update operations to a buffer.
marshall_update(tokudb::buffer & b,Item * lhs_item,Item * rhs_item,TABLE * table,TOKUDB_SHARE * share)656 static void marshall_update(
657     tokudb::buffer& b,
658     Item* lhs_item,
659     Item* rhs_item,
660     TABLE* table,
661     TOKUDB_SHARE* share) {
662 
663     // figure out the update operation type (again)
664     Field* lhs_field = find_field_by_name(table, lhs_item);
665     assert_always(lhs_field); // we found it before, so this should work
666 
667     // compute the update info
668     uint32_t field_type;
669     uint32_t field_null_num = 0;
670     if (lhs_field->real_maybe_null()) {
671         uint32_t field_num = lhs_field->field_index;
672         field_null_num =
673             ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
674     }
675     uint32_t offset;
676     void* v_ptr = NULL;
677     uint32_t v_length;
678     uint32_t update_operation;
679     longlong v_ll;
680     String v_str;
681 
682     switch (lhs_field->type()) {
683     case MYSQL_TYPE_TINY:
684     case MYSQL_TYPE_SHORT:
685     case MYSQL_TYPE_INT24:
686     case MYSQL_TYPE_LONG:
687     case MYSQL_TYPE_LONGLONG: {
688         Field_num* lhs_num = static_cast<Field_num*>(lhs_field);
689         field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
690         offset =
691             fixed_field_offset(
692                 table->s->null_bytes,
693                 &share->kc_info,
694                 table->s->primary_key,
695                 lhs_field->field_index);
696         switch (rhs_item->type()) {
697         case Item::INT_ITEM: {
698             update_operation = '=';
699             v_ll = rhs_item->val_int();
700             v_length = lhs_field->pack_length();
701             v_ptr = &v_ll;
702             break;
703         }
704         case Item::FUNC_ITEM: {
705             Item_func* rhs_func = static_cast<Item_func*>(rhs_item);
706             Item** arguments = rhs_func->arguments();
707             // we only support one if function for now, and it is a
708             // decrement with floor.
709             if (strcmp(rhs_func->func_name(), "if") == 0) {
710                 update_operation = '-';
711                 v_ll = 1;
712             } else if (rhs_func->argument_count() == 1) {
713                 update_operation = '=';
714                 v_ll = rhs_func->val_int();
715             } else {
716                 update_operation = rhs_func->func_name()[0];
717                 v_ll = item_val_int(arguments[1]);
718             }
719             v_length = lhs_field->pack_length();
720             v_ptr = &v_ll;
721             break;
722         }
723         default:
724             assert_unreachable();
725         }
726         break;
727     }
728     case MYSQL_TYPE_STRING: {
729         update_operation = '=';
730         field_type =
731             lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
732         offset =
733             fixed_field_offset(
734                 table->s->null_bytes,
735                 &share->kc_info,
736                 table->s->primary_key,
737                 lhs_field->field_index);
738         v_str = *rhs_item->val_str(&v_str);
739         v_length = v_str.length();
740         if (v_length >= lhs_field->pack_length()) {
741             v_length = lhs_field->pack_length();
742             v_str.length(v_length); // truncate
743         } else {
744             v_length = lhs_field->pack_length();
745             uchar pad_char =
746                 lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
747             v_str.fill(lhs_field->pack_length(), pad_char); // pad
748         }
749         v_ptr = v_str.c_ptr();
750         break;
751     }
752     case MYSQL_TYPE_VARCHAR: {
753         update_operation = '=';
754         field_type =
755             lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
756         offset =
757             var_field_index(
758                 table,
759                 &share->kc_info,
760                 table->s->primary_key,
761                 lhs_field->field_index);
762         v_str = *rhs_item->val_str(&v_str);
763         v_length = v_str.length();
764         if (v_length >= lhs_field->row_pack_length()) {
765             v_length = lhs_field->row_pack_length();
766             v_str.length(v_length); // truncate
767         }
768         v_ptr = v_str.c_ptr();
769         break;
770     }
771     case MYSQL_TYPE_BLOB: {
772         update_operation = '=';
773         field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
774         offset =
775             blob_field_index(table, &share->kc_info, lhs_field->field_index);
776         v_str = *rhs_item->val_str(&v_str);
777         v_length = v_str.length();
778         if (v_length >= lhs_field->max_data_length()) {
779             v_length = lhs_field->max_data_length();
780             v_str.length(v_length); // truncate
781         }
782         v_ptr = v_str.c_ptr();
783         break;
784     }
785     default:
786         assert_unreachable();
787     }
788 
789     // marshall the update fields into the buffer
790     b.append_ui<uint32_t>(update_operation);
791     b.append_ui<uint32_t>(field_type);
792     b.append_ui<uint32_t>(field_null_num);
793     b.append_ui<uint32_t>(offset);
794     b.append_ui<uint32_t>(v_length);
795     b.append(v_ptr, v_length);
796 }
797 
798 // Save an item's value into the appropriate field.  Return 0 if successful.
save_in_field(Item * item,TABLE * table)799 static int save_in_field(Item* item, TABLE* table) {
800     assert_always(item->type() == Item::FUNC_ITEM);
801     Item_func *func = static_cast<Item_func*>(item);
802     assert_always(strcmp(func->func_name(), "=") == 0);
803     uint n = func->argument_count();
804     assert_always(n == 2);
805     Item **arguments = func->arguments();
806     assert_always(arguments[0]->type() == Item::FIELD_ITEM);
807     Item_field *field_item = static_cast<Item_field*>(arguments[0]);
808     my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
809     int error = arguments[1]->save_in_field(field_item->field, 0);
810     dbug_tmp_restore_column_map(table->write_set, old_map);
811     return error;
812 }
813 
count_update_types(Field * lhs_field,uint * num_varchars,uint * num_blobs)814 static void count_update_types(
815     Field* lhs_field,
816     uint* num_varchars,
817     uint* num_blobs) {
818 
819     switch (lhs_field->type()) {
820     case MYSQL_TYPE_VARCHAR:
821         *num_varchars += 1;
822         break;
823     case MYSQL_TYPE_BLOB:
824         *num_blobs += 1;
825         break;
826     default:
827         break;
828     }
829 }
830 
831 // Generate an update message for an update operation and send it into the
832 // primary tree.  Return 0 if successful.
send_update_message(List<Item> & update_fields,List<Item> & update_values,Item * conds,DB_TXN * txn)833 int ha_tokudb::send_update_message(
834     List<Item>& update_fields,
835     List<Item>& update_values,
836     Item* conds,
837     DB_TXN* txn) {
838 
839     int error;
840 
841     // Save the primary key from the where conditions
842     Item::Type t = conds->type();
843     if (t == Item::FUNC_ITEM) {
844         error = save_in_field(conds, table);
845     } else if (t == Item::COND_ITEM) {
846         Item_cond* cond_item = static_cast<Item_cond*>(conds);
847         List_iterator<Item> li(*cond_item->argument_list());
848         Item* list_item;
849         error = 0;
850         while (error == 0 && (list_item = li++)) {
851             error = save_in_field(list_item, table);
852         }
853     } else {
854         assert_unreachable();
855     }
856     if (error)
857         return error;
858 
859     // put the primary key into key_buff and wrap it with key_dbt
860     DBT key_dbt;
861     bool has_null;
862     create_dbt_key_from_table(
863         &key_dbt,
864         primary_key,
865         key_buff,
866         table->record[0],
867         &has_null);
868 
869     // construct the update message
870     tokudb::buffer update_message;
871 
872     uint8_t op = UPDATE_OP_UPDATE_2;
873     update_message.append(&op, sizeof op);
874 
875     uint32_t num_updates = update_fields.elements;
876     uint num_varchars = 0, num_blobs = 0;
877     if (1) {
878         List_iterator<Item> lhs_i(update_fields);
879         Item* lhs_item;
880         while ((lhs_item = lhs_i++)) {
881             if (lhs_item == NULL)
882                 break;
883             Field* lhs_field = find_field_by_name(table, lhs_item);
884             assert_always(lhs_field); // we found it before, so this should work
885             count_update_types(lhs_field, &num_varchars, &num_blobs);
886         }
887         if (num_varchars > 0 || num_blobs > 0)
888             num_updates++;
889         if (num_blobs > 0)
890             num_updates++;
891     }
892 
893     // append the updates
894     update_message.append_ui<uint32_t>(num_updates);
895 
896     if (num_varchars > 0 || num_blobs > 0)
897         marshall_varchar_descriptor(
898             update_message,
899             table,
900             &share->kc_info,
901             table->s->primary_key);
902     if (num_blobs > 0)
903         marshall_blobs_descriptor(update_message, table, &share->kc_info);
904 
905     List_iterator<Item> lhs_i(update_fields);
906     List_iterator<Item> rhs_i(update_values);
907     while (error == 0) {
908         Item* lhs_item = lhs_i++;
909         if (lhs_item == NULL)
910             break;
911         Item* rhs_item = rhs_i++;
912         assert_always(rhs_item != NULL);
913         marshall_update(update_message, lhs_item, rhs_item, table, share);
914     }
915 
916     rwlock_t_lock_read(share->_num_DBs_lock);
917 
918     // hot index in progress
919     if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
920         error = ENOTSUP; // run on the slow path
921     } else {
922         // send the update message
923         DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
924         update_dbt.data = update_message.data();
925         update_dbt.size = update_message.size();
926         error =
927             share->key_file[primary_key]->update(
928                 share->key_file[primary_key],
929                 txn,
930                 &key_dbt,
931                 &update_dbt,
932                 0);
933     }
934 
935     share->_num_DBs_lock.unlock();
936 
937     return error;
938 }
939 
940 // Determine if an upsert operation can be offloaded to the storage engine.
941 // An upsert consists of a row and a list of update expressions
942 // (update_fields[i] = update_values[i]).
943 // The function returns 0 if the upsert is handled in the storage engine.
944 // Otherwise, an error code is returned.
upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)945 int ha_tokudb::upsert(
946     THD* thd,
947     List<Item>& update_fields,
948     List<Item>& update_values) {
949 
950     TOKUDB_HANDLER_DBUG_ENTER("");
951     int error = 0;
952 
953     if (!tokudb::sysvars::enable_fast_upsert(thd)) {
954         error = ENOTSUP;
955         goto exit;
956     }
957 
958     if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
959         fprintf(stderr, "upsert\n");
960         dump_item_list("update_fields", update_fields);
961         dump_item_list("update_values", update_values);
962     }
963 
964     // not an upsert or something is fishy with the parameters
965     if (update_fields.elements < 1 ||
966         update_fields.elements != update_values.elements) {
967         error = ENOTSUP;
968         goto exit;
969     }
970 
971     if (!check_upsert(thd, update_fields, update_values)) {
972         error = HA_ERR_UNSUPPORTED;
973         goto exit;
974     }
975 
976     error = send_upsert_message(update_fields, update_values, transaction);
977 
978     if (error) {
979         int mapped_error = map_to_handler_error(error);
980         if (mapped_error == error)
981             error = HA_ERR_UNSUPPORTED;
982     }
983 
984 exit:
985 
986     if (error != 0 && error != ENOTSUP)
987         print_error(error, MYF(0));
988 
989     TOKUDB_HANDLER_DBUG_RETURN(error);
990 }
991 
992 // Check if an upsert can be handled by this storage engine.
993 // Return true if it can.
check_upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)994 bool ha_tokudb::check_upsert(
995     THD* thd,
996     List<Item>& update_fields,
997     List<Item>& update_values) {
998 
999     if (!transaction)
1000         return false;
1001 
1002     // avoid strict mode arithmetic overflow issues
1003     if (is_strict_mode(thd))
1004         return false;
1005 
1006     // no triggers
1007     if (table->triggers)
1008         return false;
1009 
1010     // primary key must exist
1011     if (table->s->primary_key >= table->s->keys)
1012         return false;
1013 
1014     // no secondary keys
1015     if (table->s->keys > 1)
1016         return false;
1017 
1018     // no binlog
1019     if (mysql_bin_log.is_open() &&
1020         !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
1021           thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
1022         return false;
1023 
1024     if (!check_all_update_expressions(
1025             update_fields,
1026             update_values,
1027             table,
1028             true))
1029         return false;
1030 
1031     return true;
1032 }
1033 
1034 // Generate an upsert message and send it into the primary tree.
1035 // Return 0 if successful.
send_upsert_message(List<Item> & update_fields,List<Item> & update_values,DB_TXN * txn)1036 int ha_tokudb::send_upsert_message(
1037     List<Item>& update_fields,
1038     List<Item>& update_values,
1039     DB_TXN* txn) {
1040     int error = 0;
1041 
1042     // generate primary key
1043     DBT key_dbt;
1044     bool has_null;
1045     create_dbt_key_from_table(
1046         &key_dbt,
1047         primary_key,
1048         primary_key_buff,
1049         table->record[0],
1050         &has_null);
1051 
1052     // generate packed row
1053     DBT row;
1054     error = pack_row(&row, (const uchar *) table->record[0], primary_key);
1055     if (error)
1056         return error;
1057 
1058     tokudb::buffer update_message;
1059 
1060     // append the operation
1061     uint8_t op = UPDATE_OP_UPSERT_2;
1062     update_message.append(&op, sizeof op);
1063 
1064     // append the row
1065     update_message.append_ui<uint32_t>(row.size);
1066     update_message.append(row.data, row.size);
1067 
1068     uint32_t num_updates = update_fields.elements;
1069     uint num_varchars = 0, num_blobs = 0;
1070     if (1) {
1071         List_iterator<Item> lhs_i(update_fields);
1072         Item* lhs_item;
1073         while ((lhs_item = lhs_i++)) {
1074             if (lhs_item == NULL)
1075                 break;
1076             Field* lhs_field = find_field_by_name(table, lhs_item);
1077             assert_always(lhs_field); // we found it before, so this should work
1078             count_update_types(lhs_field, &num_varchars, &num_blobs);
1079         }
1080         if (num_varchars > 0 || num_blobs > 0)
1081             num_updates++;
1082         if (num_blobs > 0)
1083             num_updates++;
1084     }
1085 
1086     // append the updates
1087     update_message.append_ui<uint32_t>(num_updates);
1088 
1089     if (num_varchars > 0 || num_blobs > 0)
1090         marshall_varchar_descriptor(
1091             update_message,
1092             table, &share->kc_info,
1093             table->s->primary_key);
1094     if (num_blobs > 0)
1095         marshall_blobs_descriptor(update_message, table, &share->kc_info);
1096 
1097     List_iterator<Item> lhs_i(update_fields);
1098     List_iterator<Item> rhs_i(update_values);
1099     while (1) {
1100         Item* lhs_item = lhs_i++;
1101         if (lhs_item == NULL)
1102             break;
1103         Item* rhs_item = rhs_i++;
1104         assert_always(rhs_item != NULL);
1105         marshall_update(update_message, lhs_item, rhs_item, table, share);
1106     }
1107 
1108     rwlock_t_lock_read(share->_num_DBs_lock);
1109 
1110     // hot index in progress
1111     if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
1112         error = ENOTSUP; // run on the slow path
1113     } else {
1114         // send the upsert message
1115         DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
1116         update_dbt.data = update_message.data();
1117         update_dbt.size = update_message.size();
1118         error =
1119             share->key_file[primary_key]->update(
1120                 share->key_file[primary_key],
1121                 txn,
1122                 &key_dbt,
1123                 &update_dbt,
1124                 0);
1125     }
1126 
1127     share->_num_DBs_lock.unlock();
1128 
1129     return error;
1130 }
1131 #endif  // defined(TOKU_INCLUDE_UPSERT) && TOKU_INCLUDE_UPSERT
1132