1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     TokuDBis is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     TokuDB is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ======= */
23 
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25 
26 // Point updates and upserts
27 
28 // Restrictions:
29 //   No triggers
30 //   Statement or mixed replication
31 //   Primary key must be defined
32 //   Simple and compound primary key
33 //   Int, char and varchar primary key types
34 //   No updates on fields that are part of any key
35 //   No clustering keys
36 //   Integer and char field updates
37 //   Update expressions:
38 //       x = constant
39 //       x = x + constant
40 //       x = x - constant
41 //       x = if (x=0,0,x-1)
42 //       x = x + values(x)
43 //   Session variable disables slow updates and slow upserts
44 
45 // Future features:
46 //   Support more primary key types
47 //   Force statement logging for fast updates
48 //   Support clustering keys using broadcast updates
49 //   Support primary key ranges using multicast messages
50 //   Support more complicated update expressions
51 //   Replace field_offset
52 
53 // Debug function to dump an Item
dump_item(Item * item)54 static void dump_item(Item* item) {
55     fprintf(stderr, "%u", item->type());
56     switch (item->type()) {
57     case Item::FUNC_ITEM: {
58         Item_func* func = static_cast<Item_func*>(item);
59         uint n = func->argument_count();
60         Item** arguments = func->arguments();
61         fprintf(
62             stderr,
63             ":func=%u,%s,%u(",
64             func->functype(),
65             func->func_name(),
66             n);
67         for (uint i = 0; i < n ; i++) {
68             dump_item(arguments[i]);
69             if (i < n-1)
70                 fprintf(stderr,",");
71         }
72         fprintf(stderr, ")");
73         break;
74     }
75     case Item::INT_ITEM: {
76         Item_int* int_item = static_cast<Item_int*>(item);
77         fprintf(stderr, ":int=%lld", int_item->val_int());
78         break;
79     }
80     case Item::STRING_ITEM: {
81         Item_string* str_item = static_cast<Item_string*>(item);
82         fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
83         break;
84     }
85     case Item::FIELD_ITEM: {
86         Item_field* field_item = static_cast<Item_field*>(item);
87         fprintf(
88             stderr,
89             ":field=%s.%s.%s",
90             field_item->db_name,
91             field_item->table_name,
92             field_item->field_name);
93         break;
94     }
95     case Item::COND_ITEM: {
96         Item_cond* cond_item = static_cast<Item_cond*>(item);
97         fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
98         List_iterator<Item> li(*cond_item->argument_list());
99         Item* list_item;
100         while ((list_item = li++)) {
101             dump_item(list_item);
102             fprintf(stderr, "\n");
103         }
104         fprintf(stderr, ")\n");
105         break;
106     }
107     case Item::INSERT_VALUE_ITEM: {
108         Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
109         fprintf(stderr, ":insert_value");
110         dump_item(value_item->arg);
111         break;
112     }
113     default:
114         fprintf(stderr, ":unsupported\n");
115         break;
116     }
117 }
118 
119 // Debug function to dump an Item list
dump_item_list(const char * h,List<Item> & l)120 static void dump_item_list(const char* h, List<Item> &l) {
121     fprintf(stderr, "%s elements=%u\n", h, l.elements);
122     List_iterator<Item> li(l);
123     Item* item;
124     while ((item = li++) != NULL) {
125         dump_item(item);
126         fprintf(stderr, "\n");
127     }
128 }
129 
130 // Find a Field by its Item name
find_field_by_name(TOKUDB_UNUSED (TABLE * table),Item * item)131 static Field* find_field_by_name(TOKUDB_UNUSED(TABLE* table), Item* item) {
132     if (item->type() != Item::FIELD_ITEM)
133         return NULL;
134     Item_field* field_item = static_cast<Item_field*>(item);
135 #if 0
136     if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
137         strcmp(table->s->table_name.str, field_item->table_name) != 0)
138         return NULL;
139     Field *found_field = NULL;
140     for (uint i = 0; i < table->s->fields; i++) {
141         Field *test_field = table->s->field[i];
142         if (strcmp(field_item->field_name, test_field->field_name) == 0) {
143             found_field = test_field;
144             break;
145         }
146     }
147     return found_field;
148 #else
149     // item->field may be a shortcut instead of the above table lookup
150     return field_item->field;
151 #endif
152 }
153 
154 // Return the starting offset in the value for a particular index (selected by idx) of a
155 // particular field (selected by expand_field_num).
156 // This only works for fixed length fields
fixed_field_offset(uint32_t null_bytes,KEY_AND_COL_INFO * kc_info,uint idx,uint expand_field_num)157 static uint32_t fixed_field_offset(
158     uint32_t null_bytes,
159     KEY_AND_COL_INFO* kc_info,
160     uint idx,
161     uint expand_field_num) {
162 
163     uint32_t offset = null_bytes;
164     for (uint i = 0; i < expand_field_num; i++) {
165         if (bitmap_is_set(&kc_info->key_filters[idx], i))
166             continue;
167         offset += kc_info->field_lengths[i];
168     }
169     return offset;
170 }
171 
var_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint idx,uint field_num)172 static uint32_t var_field_index(
173     TABLE* table,
174     KEY_AND_COL_INFO* kc_info,
175     uint idx,
176     uint field_num) {
177 
178     assert_always(field_num < table->s->fields);
179     uint v_index = 0;
180     for (uint i = 0; i < table->s->fields; i++) {
181         if (bitmap_is_set(&kc_info->key_filters[idx], i))
182             continue;
183         if (kc_info->length_bytes[i]) {
184             if (i == field_num)
185                 break;
186             v_index++;
187         }
188     }
189     return v_index;
190 }
191 
blob_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint field_num)192 static uint32_t blob_field_index(TABLE* table,
193                                  KEY_AND_COL_INFO* kc_info,
194                                  uint field_num) {
195     assert_always(field_num < table->s->fields);
196     uint b_index;
197     for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
198         if (kc_info->blob_fields[b_index] == field_num)
199             break;
200     }
201     assert_always(b_index < kc_info->num_blobs);
202     return b_index;
203 }
204 
205 // Determine if an update operation can be offloaded to the storage engine.
206 // The update operation consists of a list of update expressions
207 // (fields[i] = values[i]), and a list of where conditions (conds).
208 // The function returns 0 if the update is handled in the storage engine.
209 // Otherwise, an error is returned.
fast_update(THD * thd,List<Item> & update_fields,List<Item> & update_values,Item * conds)210 int ha_tokudb::fast_update(
211     THD* thd,
212     List<Item>& update_fields,
213     List<Item>& update_values,
214     Item* conds) {
215 
216     TOKUDB_HANDLER_DBUG_ENTER("");
217     int error = 0;
218 
219     if (!tokudb::sysvars::enable_fast_update(thd)) {
220         error = ENOTSUP;
221         goto exit;
222     }
223 
224     if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
225         dump_item_list("fields", update_fields);
226         dump_item_list("values", update_values);
227         if (conds) {
228             fprintf(stderr, "conds\n"); dump_item(conds); fprintf(stderr, "\n");
229         }
230     }
231 
232     if (update_fields.elements < 1 ||
233         update_fields.elements != update_values.elements) {
234         error = ENOTSUP;  // something is fishy with the parameters
235         goto exit;
236     }
237 
238     if (!check_fast_update(thd, update_fields, update_values, conds)) {
239         error = HA_ERR_UNSUPPORTED;
240         goto exit;
241     }
242 
243     error = send_update_message(
244         update_fields, update_values, conds, transaction);
245 
246     if (error) {
247         int mapped_error = map_to_handler_error(error);
248         if (mapped_error == error)
249             error = HA_ERR_UNSUPPORTED;
250     }
251 
252 exit:
253     if (error != 0 && error != ENOTSUP)
254         print_error(error, MYF(0));
255 
256     TOKUDB_HANDLER_DBUG_RETURN(error);
257 }
258 
259 // Return true if an expression is a simple int expression or a simple function
260 // of +- int expression.
check_int_result(Item * item)261 static bool check_int_result(Item* item) {
262     Item::Type t = item->type();
263     if (t == Item::INT_ITEM) {
264         return true;
265     } else if (t == Item::FUNC_ITEM) {
266         Item_func* item_func = static_cast<Item_func*>(item);
267         if (strcmp(item_func->func_name(), "+") != 0 &&
268             strcmp(item_func->func_name(), "-") != 0)
269             return false;
270         if (item_func->argument_count() != 1)
271             return false;
272         Item** arguments = item_func->arguments();
273         if (arguments[0]->type() != Item::INT_ITEM)
274             return false;
275         return true;
276     } else
277         return false;
278 }
279 
280 // check that an item is an insert value item with the same field name
check_insert_value(Item * item,const char * field_name)281 static bool check_insert_value(Item* item, const char* field_name) {
282     if (item->type() != Item::INSERT_VALUE_ITEM)
283         return false;
284     Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
285     if (value_item->arg->type() != Item::FIELD_ITEM)
286         return false;
287     Item_field* arg = static_cast<Item_field*>(value_item->arg);
288     if (strcmp(field_name, arg->field_name) != 0)
289         return false;
290     return true;
291 }
292 
293 // Return true if an expression looks like field_name op constant.
check_x_op_constant(const char * field_name,Item * item,const char * op,Item ** item_constant,bool allow_insert_value)294 static bool check_x_op_constant(
295     const char* field_name,
296     Item* item,
297     const char* op,
298     Item** item_constant,
299     bool allow_insert_value) {
300 
301     if (item->type() != Item::FUNC_ITEM)
302         return false;
303     Item_func* item_func = static_cast<Item_func*>(item);
304     if (strcmp(item_func->func_name(), op) != 0)
305         return false;
306     Item** arguments = item_func->arguments();
307     uint n = item_func->argument_count();
308     if (n != 2)
309         return false;
310     if (arguments[0]->type() != Item::FIELD_ITEM)
311         return false;
312     Item_field* arg0 = static_cast<Item_field*>(arguments[0]);
313     if (strcmp(field_name, arg0->field_name) != 0)
314         return false;
315     if (!check_int_result(arguments[1]))
316         if (!(allow_insert_value &&
317             check_insert_value(arguments[1], field_name)))
318             return false;
319     *item_constant = arguments[1];
320     return true;
321 }
322 
323 // Return true if an expression looks like field_name = constant
check_x_equal_0(const char * field_name,Item * item)324 static bool check_x_equal_0(const char *field_name, Item *item) {
325     Item* item_constant;
326     if (!check_x_op_constant(field_name, item, "=", &item_constant, false))
327         return false;
328     if (item_constant->type() != Item::INT_ITEM ||
329         item_constant->val_int() != 0)
330         return false;
331     return true;
332 }
333 
334 // Return true if an expression looks like fieldname - 1
check_x_minus_1(const char * field_name,Item * item)335 static bool check_x_minus_1(const char* field_name, Item* item) {
336     Item* item_constant;
337     if (!check_x_op_constant(field_name, item, "-", &item_constant, false))
338         return false;
339     if (item_constant->type() != Item::INT_ITEM ||
340         item_constant->val_int() != 1)
341         return false;
342     return true;
343 }
344 
345 // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
346 // the field named by fieldname is an unsigned int.
check_decr_floor_expression(Field * lhs_field,Item * item)347 static bool check_decr_floor_expression(Field* lhs_field, Item* item) {
348     if (item->type() != Item::FUNC_ITEM)
349         return false;
350     Item_func* item_func = static_cast<Item_func*>(item);
351     if (strcmp(item_func->func_name(), "if") != 0)
352         return false;
353     Item** arguments = item_func->arguments();
354     uint n = item_func->argument_count();
355     if (n != 3)
356         return false;
357     if (!check_x_equal_0(lhs_field->field_name, arguments[0]))
358         return false;
359     if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
360         return false;
361     if (!check_x_minus_1(lhs_field->field_name, arguments[2]))
362         return false;
363     if (!(lhs_field->flags & UNSIGNED_FLAG))
364         return false;
365     return true;
366 }
367 
368 // Check if lhs = rhs expression is simple.  Return true if it is.
check_update_expression(Item * lhs_item,Item * rhs_item,TABLE * table,bool allow_insert_value)369 static bool check_update_expression(
370     Item* lhs_item,
371     Item* rhs_item,
372     TABLE* table,
373     bool allow_insert_value) {
374 
375     Field* lhs_field = find_field_by_name(table, lhs_item);
376     if (lhs_field == NULL)
377         return false;
378     if (!lhs_field->part_of_key.is_clear_all())
379         return false;
380     enum_field_types lhs_type = lhs_field->type();
381     Item::Type rhs_type = rhs_item->type();
382     switch (lhs_type) {
383     case MYSQL_TYPE_TINY:
384     case MYSQL_TYPE_SHORT:
385     case MYSQL_TYPE_INT24:
386     case MYSQL_TYPE_LONG:
387     case MYSQL_TYPE_LONGLONG:
388         if (check_int_result(rhs_item))
389             return true;
390         Item* item_constant;
391         if (check_x_op_constant(
392                 lhs_field->field_name,
393                 rhs_item,
394                 "+",
395                 &item_constant,
396                 allow_insert_value))
397             return true;
398         if (check_x_op_constant(
399                 lhs_field->field_name,
400                 rhs_item,
401                 "-",
402                 &item_constant,
403                 allow_insert_value))
404             return true;
405         if (check_decr_floor_expression(lhs_field, rhs_item))
406             return true;
407         break;
408     case MYSQL_TYPE_STRING:
409         if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM)
410             return true;
411         break;
412     case MYSQL_TYPE_VARCHAR:
413     case MYSQL_TYPE_BLOB:
414         if (rhs_type == Item::STRING_ITEM)
415             return true;
416         break;
417     default:
418         break;
419     }
420     return false;
421 }
422 
423 // Check that all update expressions are simple.  Return true if they are.
check_all_update_expressions(List<Item> & fields,List<Item> & values,TABLE * table,bool allow_insert_value)424 static bool check_all_update_expressions(
425     List<Item>& fields,
426     List<Item>& values,
427     TABLE* table,
428     bool allow_insert_value) {
429 
430     List_iterator<Item> lhs_i(fields);
431     List_iterator<Item> rhs_i(values);
432     while (1) {
433         Item* lhs_item = lhs_i++;
434         if (lhs_item == NULL)
435             break;
436         Item* rhs_item = rhs_i++;
437         assert_always(rhs_item != NULL);
438         if (!check_update_expression(
439                 lhs_item,
440                 rhs_item,
441                 table,
442                 allow_insert_value))
443             return false;
444     }
445     return true;
446 }
447 
full_field_in_key(TABLE * table,Field * field)448 static bool full_field_in_key(TABLE* table, Field* field) {
449     assert_always(table->s->primary_key < table->s->keys);
450     KEY* key = &table->s->key_info[table->s->primary_key];
451     for (uint i = 0; i < key->user_defined_key_parts; i++) {
452         KEY_PART_INFO* key_part = &key->key_part[i];
453         if (strcmp(field->field_name, key_part->field->field_name) == 0) {
454             return key_part->length == field->field_length;
455         }
456     }
457     return false;
458 }
459 
460 // Check that an expression looks like fieldname = constant, fieldname is part
461 // of the primary key, and the named field is an int, char or varchar type.
462 // Return true if it does.
check_pk_field_equal_constant(Item * item,TABLE * table,MY_BITMAP * pk_fields)463 static bool check_pk_field_equal_constant(
464     Item* item,
465     TABLE* table,
466     MY_BITMAP* pk_fields) {
467 
468     if (item->type() != Item::FUNC_ITEM)
469         return false;
470     Item_func* func = static_cast<Item_func*>(item);
471     if (strcmp(func->func_name(), "=") != 0)
472         return false;
473     uint n = func->argument_count();
474     if (n != 2)
475         return false;
476     Item** arguments = func->arguments();
477     Field* field = find_field_by_name(table, arguments[0]);
478     if (field == NULL)
479         return false;
480     if (!bitmap_test_and_clear(pk_fields, field->field_index))
481         return false;
482     switch (field->type()) {
483     case MYSQL_TYPE_TINY:
484     case MYSQL_TYPE_SHORT:
485     case MYSQL_TYPE_INT24:
486     case MYSQL_TYPE_LONG:
487     case MYSQL_TYPE_LONGLONG:
488         return arguments[1]->type() == Item::INT_ITEM ||
489             arguments[1]->type() == Item::STRING_ITEM;
490     case MYSQL_TYPE_STRING:
491     case MYSQL_TYPE_VARCHAR:
492         return full_field_in_key(table, field) &&
493             (arguments[1]->type() == Item::INT_ITEM ||
494              arguments[1]->type() == Item::STRING_ITEM);
495     default:
496         return false;
497     }
498 }
499 
500 // Check that the where condition covers all of the primary key components
501 // with fieldname = constant expressions.  Return true if it does.
check_point_update(Item * conds,TABLE * table)502 static bool check_point_update(Item* conds, TABLE* table) {
503     bool result = false;
504 
505     if (conds == NULL)
506         return false; // no where condition on the update
507 
508     if (table->s->primary_key >= table->s->keys)
509         return false; // no primary key defined
510 
511     // use a bitmap of the primary key fields to keep track of those fields
512     // that are covered by the where conditions
513     MY_BITMAP pk_fields;
514     if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
515         return false;
516     KEY *key = &table->s->key_info[table->s->primary_key];
517     for (uint i = 0; i < key->user_defined_key_parts; i++)
518         bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);
519 
520     switch (conds->type()) {
521     case Item::FUNC_ITEM:
522         result = check_pk_field_equal_constant(conds, table, &pk_fields);
523         break;
524     case Item::COND_ITEM: {
525         Item_cond* cond_item = static_cast<Item_cond*>(conds);
526         if (strcmp(cond_item->func_name(), "and") != 0)
527             break;
528         List_iterator<Item> li(*cond_item->argument_list());
529         Item* list_item;
530         result = true;
531         while (result == true && (list_item = li++)) {
532             result = check_pk_field_equal_constant(
533                 list_item,
534                 table,
535                 &pk_fields);
536         }
537         break;
538     }
539     default:
540         break;
541     }
542 
543     if (!bitmap_is_clear_all(&pk_fields))
544         result = false;
545     bitmap_free(&pk_fields);
546     return result;
547 }
548 
549 // Return true if there are any clustering keys (except the primary).
550 // Precompute this when the table is opened.
clustering_keys_exist(TABLE * table)551 static bool clustering_keys_exist(TABLE *table) {
552     for (uint keynr = 0; keynr < table->s->keys; keynr++) {
553         if (keynr != table->s->primary_key &&
554             key_is_clustering(&table->s->key_info[keynr]))
555             return true;
556     }
557     return false;
558 }
559 
560 // Check if an update operation can be handled by this storage engine.
561 // Return true if it can.
check_fast_update(THD * thd,List<Item> & fields,List<Item> & values,Item * conds)562 bool ha_tokudb::check_fast_update(
563     THD* thd,
564     List<Item>& fields,
565     List<Item>& values,
566     Item* conds) {
567 
568     if (!transaction)
569         return false;
570 
571     // avoid strict mode arithmetic overflow issues
572     if (thd->is_strict_mode())
573         return false;
574 
575     // no triggers
576     if (table->triggers)
577         return false;
578 
579     // no binlog
580     if (mysql_bin_log.is_open() &&
581         !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
582           thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
583         return false;
584 
585     // no clustering keys (need to broadcast an increment into the clustering
586     // keys since we are selecting with the primary key)
587     if (clustering_keys_exist(table))
588         return false;
589 
590     if (!check_all_update_expressions(fields, values, table, false))
591         return false;
592 
593     if (!check_point_update(conds, table))
594         return false;
595 
596     return true;
597 }
598 
marshall_varchar_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info,uint key_num)599 static void marshall_varchar_descriptor(
600     tokudb::buffer& b,
601     TABLE* table,
602     KEY_AND_COL_INFO* kc_info,
603     uint key_num) {
604 
605     b.append_ui<uint32_t>('v');
606     b.append_ui<uint32_t>(
607         table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
608     uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
609     b.append_ui<uint32_t>(var_offset_bytes);
610     b.append_ui<uint32_t>(
611         var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
612 }
613 
marshall_blobs_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info)614 static void marshall_blobs_descriptor(
615     tokudb::buffer& b,
616     TABLE* table,
617     KEY_AND_COL_INFO* kc_info) {
618 
619     b.append_ui<uint32_t>('b');
620     uint32_t n = kc_info->num_blobs;
621     b.append_ui<uint32_t>(n);
622     for (uint i = 0; i < n; i++) {
623         uint blob_field_index = kc_info->blob_fields[i];
624         assert_always(blob_field_index < table->s->fields);
625         uint8_t blob_field_length =
626             table->s->field[blob_field_index]->row_pack_length();
627         b.append(&blob_field_length, sizeof blob_field_length);
628     }
629 }
630 
631 static inline uint32_t get_null_bit_position(uint32_t null_bit);
632 
633 // evaluate the int value of an item
item_val_int(Item * item)634 static longlong item_val_int(Item* item) {
635     Item::Type t = item->type();
636     if (t == Item::INSERT_VALUE_ITEM) {
637         Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
638         return value_item->arg->val_int();
639     } else
640         return item->val_int();
641 }
642 
643 // Marshall update operations to a buffer.
marshall_update(tokudb::buffer & b,Item * lhs_item,Item * rhs_item,TABLE * table,TOKUDB_SHARE * share)644 static void marshall_update(
645     tokudb::buffer& b,
646     Item* lhs_item,
647     Item* rhs_item,
648     TABLE* table,
649     TOKUDB_SHARE* share) {
650 
651     // figure out the update operation type (again)
652     Field* lhs_field = find_field_by_name(table, lhs_item);
653     assert_always(lhs_field); // we found it before, so this should work
654 
655     // compute the update info
656     uint32_t field_type;
657     uint32_t field_null_num = 0;
658     if (lhs_field->real_maybe_null()) {
659         uint32_t field_num = lhs_field->field_index;
660         field_null_num =
661             ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
662     }
663     uint32_t offset;
664     void* v_ptr = NULL;
665     uint32_t v_length;
666     uint32_t update_operation;
667     longlong v_ll;
668     String v_str;
669 
670     switch (lhs_field->type()) {
671     case MYSQL_TYPE_TINY:
672     case MYSQL_TYPE_SHORT:
673     case MYSQL_TYPE_INT24:
674     case MYSQL_TYPE_LONG:
675     case MYSQL_TYPE_LONGLONG: {
676         Field_num* lhs_num = static_cast<Field_num*>(lhs_field);
677         field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
678         offset =
679             fixed_field_offset(
680                 table->s->null_bytes,
681                 &share->kc_info,
682                 table->s->primary_key,
683                 lhs_field->field_index);
684         switch (rhs_item->type()) {
685         case Item::INT_ITEM: {
686             update_operation = '=';
687             v_ll = rhs_item->val_int();
688             v_length = lhs_field->pack_length();
689             v_ptr = &v_ll;
690             break;
691         }
692         case Item::FUNC_ITEM: {
693             Item_func* rhs_func = static_cast<Item_func*>(rhs_item);
694             Item** arguments = rhs_func->arguments();
695             // we only support one if function for now, and it is a
696             // decrement with floor.
697             if (strcmp(rhs_func->func_name(), "if") == 0) {
698                 update_operation = '-';
699                 v_ll = 1;
700             } else if (rhs_func->argument_count() == 1) {
701                 update_operation = '=';
702                 v_ll = rhs_func->val_int();
703             } else {
704                 update_operation = rhs_func->func_name()[0];
705                 v_ll = item_val_int(arguments[1]);
706             }
707             v_length = lhs_field->pack_length();
708             v_ptr = &v_ll;
709             break;
710         }
711         default:
712             assert_unreachable();
713         }
714         break;
715     }
716     case MYSQL_TYPE_STRING: {
717         update_operation = '=';
718         field_type =
719             lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
720         offset =
721             fixed_field_offset(
722                 table->s->null_bytes,
723                 &share->kc_info,
724                 table->s->primary_key,
725                 lhs_field->field_index);
726         v_str = *rhs_item->val_str(&v_str);
727         v_length = v_str.length();
728         if (v_length >= lhs_field->pack_length()) {
729             v_length = lhs_field->pack_length();
730             v_str.length(v_length); // truncate
731         } else {
732             v_length = lhs_field->pack_length();
733             uchar pad_char =
734                 lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
735             v_str.fill(lhs_field->pack_length(), pad_char); // pad
736         }
737         v_ptr = v_str.c_ptr();
738         break;
739     }
740     case MYSQL_TYPE_VARCHAR: {
741         update_operation = '=';
742         field_type =
743             lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
744         offset =
745             var_field_index(
746                 table,
747                 &share->kc_info,
748                 table->s->primary_key,
749                 lhs_field->field_index);
750         v_str = *rhs_item->val_str(&v_str);
751         v_length = v_str.length();
752         if (v_length >= lhs_field->row_pack_length()) {
753             v_length = lhs_field->row_pack_length();
754             v_str.length(v_length); // truncate
755         }
756         v_ptr = v_str.c_ptr();
757         break;
758     }
759     case MYSQL_TYPE_BLOB: {
760         update_operation = '=';
761         field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
762         offset =
763             blob_field_index(table, &share->kc_info, lhs_field->field_index);
764         v_str = *rhs_item->val_str(&v_str);
765         v_length = v_str.length();
766         if (v_length >= lhs_field->max_data_length()) {
767             v_length = lhs_field->max_data_length();
768             v_str.length(v_length); // truncate
769         }
770         v_ptr = v_str.c_ptr();
771         break;
772     }
773     default:
774         assert_unreachable();
775     }
776 
777     // marshall the update fields into the buffer
778     b.append_ui<uint32_t>(update_operation);
779     b.append_ui<uint32_t>(field_type);
780     b.append_ui<uint32_t>(field_null_num);
781     b.append_ui<uint32_t>(offset);
782     b.append_ui<uint32_t>(v_length);
783     b.append(v_ptr, v_length);
784 }
785 
786 // Save an item's value into the appropriate field.  Return 0 if successful.
save_in_field(Item * item,TABLE * table)787 static int save_in_field(Item* item, TABLE* table) {
788     assert_always(item->type() == Item::FUNC_ITEM);
789     Item_func *func = static_cast<Item_func*>(item);
790     assert_always(strcmp(func->func_name(), "=") == 0);
791     uint n = func->argument_count();
792     assert_always(n == 2);
793     Item **arguments = func->arguments();
794     assert_always(arguments[0]->type() == Item::FIELD_ITEM);
795     Item_field *field_item = static_cast<Item_field*>(arguments[0]);
796     my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
797     int error = arguments[1]->save_in_field(field_item->field, 0);
798     dbug_tmp_restore_column_map(table->write_set, old_map);
799     return error;
800 }
801 
count_update_types(Field * lhs_field,uint * num_varchars,uint * num_blobs)802 static void count_update_types(
803     Field* lhs_field,
804     uint* num_varchars,
805     uint* num_blobs) {
806 
807     switch (lhs_field->type()) {
808     case MYSQL_TYPE_VARCHAR:
809         *num_varchars += 1;
810         break;
811     case MYSQL_TYPE_BLOB:
812         *num_blobs += 1;
813         break;
814     default:
815         break;
816     }
817 }
818 
819 // Generate an update message for an update operation and send it into the
820 // primary tree.  Return 0 if successful.
send_update_message(List<Item> & update_fields,List<Item> & update_values,Item * conds,DB_TXN * txn)821 int ha_tokudb::send_update_message(
822     List<Item>& update_fields,
823     List<Item>& update_values,
824     Item* conds,
825     DB_TXN* txn) {
826 
827     int error;
828 
829     // Save the primary key from the where conditions
830     Item::Type t = conds->type();
831     if (t == Item::FUNC_ITEM) {
832         error = save_in_field(conds, table);
833     } else if (t == Item::COND_ITEM) {
834         Item_cond* cond_item = static_cast<Item_cond*>(conds);
835         List_iterator<Item> li(*cond_item->argument_list());
836         Item* list_item;
837         error = 0;
838         while (error == 0 && (list_item = li++)) {
839             error = save_in_field(list_item, table);
840         }
841     } else {
842         assert_unreachable();
843     }
844     if (error)
845         return error;
846 
847     // put the primary key into key_buff and wrap it with key_dbt
848     DBT key_dbt;
849     bool has_null;
850     create_dbt_key_from_table(
851         &key_dbt,
852         primary_key,
853         key_buff,
854         table->record[0],
855         &has_null);
856 
857     // construct the update message
858     tokudb::buffer update_message;
859 
860     uint8_t op = UPDATE_OP_UPDATE_2;
861     update_message.append(&op, sizeof op);
862 
863     uint32_t num_updates = update_fields.elements;
864     uint num_varchars = 0, num_blobs = 0;
865     if (1) {
866         List_iterator<Item> lhs_i(update_fields);
867         Item* lhs_item;
868         while ((lhs_item = lhs_i++)) {
869             if (lhs_item == NULL)
870                 break;
871             Field* lhs_field = find_field_by_name(table, lhs_item);
872             assert_always(lhs_field); // we found it before, so this should work
873             count_update_types(lhs_field, &num_varchars, &num_blobs);
874         }
875         if (num_varchars > 0 || num_blobs > 0)
876             num_updates++;
877         if (num_blobs > 0)
878             num_updates++;
879     }
880 
881     // append the updates
882     update_message.append_ui<uint32_t>(num_updates);
883 
884     if (num_varchars > 0 || num_blobs > 0)
885         marshall_varchar_descriptor(
886             update_message,
887             table,
888             &share->kc_info,
889             table->s->primary_key);
890     if (num_blobs > 0)
891         marshall_blobs_descriptor(update_message, table, &share->kc_info);
892 
893     List_iterator<Item> lhs_i(update_fields);
894     List_iterator<Item> rhs_i(update_values);
895     while (error == 0) {
896         Item* lhs_item = lhs_i++;
897         if (lhs_item == NULL)
898             break;
899         Item* rhs_item = rhs_i++;
900         assert_always(rhs_item != NULL);
901         marshall_update(update_message, lhs_item, rhs_item, table, share);
902     }
903 
904     rwlock_t_lock_read(share->_num_DBs_lock);
905 
906     // hot index in progress
907     if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
908         error = ENOTSUP; // run on the slow path
909     } else {
910         // send the update message
911         DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
912         update_dbt.data = update_message.data();
913         update_dbt.size = update_message.size();
914         error =
915             share->key_file[primary_key]->update(
916                 share->key_file[primary_key],
917                 txn,
918                 &key_dbt,
919                 &update_dbt,
920                 0);
921     }
922 
923     share->_num_DBs_lock.unlock();
924 
925     return error;
926 }
927 
928 // Determine if an upsert operation can be offloaded to the storage engine.
929 // An upsert consists of a row and a list of update expressions
930 // (update_fields[i] = update_values[i]).
931 // The function returns 0 if the upsert is handled in the storage engine.
932 // Otherwise, an error code is returned.
upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)933 int ha_tokudb::upsert(
934     THD* thd,
935     List<Item>& update_fields,
936     List<Item>& update_values) {
937 
938     TOKUDB_HANDLER_DBUG_ENTER("");
939     int error = 0;
940 
941     if (!tokudb::sysvars::enable_fast_upsert(thd)) {
942         error = ENOTSUP;
943         goto exit;
944     }
945 
946     if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
947         fprintf(stderr, "upsert\n");
948         dump_item_list("update_fields", update_fields);
949         dump_item_list("update_values", update_values);
950     }
951 
952     // not an upsert or something is fishy with the parameters
953     if (update_fields.elements < 1 ||
954         update_fields.elements != update_values.elements) {
955         error = ENOTSUP;
956         goto exit;
957     }
958 
959     if (!check_upsert(thd, update_fields, update_values)) {
960         error = HA_ERR_UNSUPPORTED;
961         goto exit;
962     }
963 
964     error = send_upsert_message(update_fields, update_values, transaction);
965 
966     if (error) {
967         int mapped_error = map_to_handler_error(error);
968         if (mapped_error == error)
969             error = HA_ERR_UNSUPPORTED;
970     }
971 
972 exit:
973     if (error != 0 && error != ENOTSUP)
974         print_error(error, MYF(0));
975 
976     TOKUDB_HANDLER_DBUG_RETURN(error);
977 }
978 
979 // Check if an upsert can be handled by this storage engine.
980 // Return true if it can.
check_upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)981 bool ha_tokudb::check_upsert(
982     THD* thd,
983     List<Item>& update_fields,
984     List<Item>& update_values) {
985 
986     if (!transaction)
987         return false;
988 
989     // avoid strict mode arithmetic overflow issues
990     if (thd->is_strict_mode())
991         return false;
992 
993     // no triggers
994     if (table->triggers)
995         return false;
996 
997     // primary key must exist
998     if (table->s->primary_key >= table->s->keys)
999         return false;
1000 
1001     // no secondary keys
1002     if (table->s->keys > 1)
1003         return false;
1004 
1005     // no binlog
1006     if (mysql_bin_log.is_open() &&
1007         !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
1008           thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
1009         return false;
1010 
1011     if (!check_all_update_expressions(
1012             update_fields,
1013             update_values,
1014             table,
1015             true))
1016         return false;
1017 
1018     return true;
1019 }
1020 
1021 // Generate an upsert message and send it into the primary tree.
1022 // Return 0 if successful.
send_upsert_message(List<Item> & update_fields,List<Item> & update_values,DB_TXN * txn)1023 int ha_tokudb::send_upsert_message(
1024     List<Item>& update_fields,
1025     List<Item>& update_values,
1026     DB_TXN* txn) {
1027     int error = 0;
1028 
1029     // generate primary key
1030     DBT key_dbt;
1031     bool has_null;
1032     create_dbt_key_from_table(
1033         &key_dbt,
1034         primary_key,
1035         primary_key_buff,
1036         table->record[0],
1037         &has_null);
1038 
1039     // generate packed row
1040     DBT row;
1041     error = pack_row(&row, (const uchar *) table->record[0], primary_key);
1042     if (error)
1043         return error;
1044 
1045     tokudb::buffer update_message;
1046 
1047     // append the operation
1048     uint8_t op = UPDATE_OP_UPSERT_2;
1049     update_message.append(&op, sizeof op);
1050 
1051     // append the row
1052     update_message.append_ui<uint32_t>(row.size);
1053     update_message.append(row.data, row.size);
1054 
1055     uint32_t num_updates = update_fields.elements;
1056     uint num_varchars = 0, num_blobs = 0;
1057     if (1) {
1058         List_iterator<Item> lhs_i(update_fields);
1059         Item* lhs_item;
1060         while ((lhs_item = lhs_i++)) {
1061             if (lhs_item == NULL)
1062                 break;
1063             Field* lhs_field = find_field_by_name(table, lhs_item);
1064             assert_always(lhs_field); // we found it before, so this should work
1065             count_update_types(lhs_field, &num_varchars, &num_blobs);
1066         }
1067         if (num_varchars > 0 || num_blobs > 0)
1068             num_updates++;
1069         if (num_blobs > 0)
1070             num_updates++;
1071     }
1072 
1073     // append the updates
1074     update_message.append_ui<uint32_t>(num_updates);
1075 
1076     if (num_varchars > 0 || num_blobs > 0)
1077         marshall_varchar_descriptor(
1078             update_message,
1079             table, &share->kc_info,
1080             table->s->primary_key);
1081     if (num_blobs > 0)
1082         marshall_blobs_descriptor(update_message, table, &share->kc_info);
1083 
1084     List_iterator<Item> lhs_i(update_fields);
1085     List_iterator<Item> rhs_i(update_values);
1086     while (1) {
1087         Item* lhs_item = lhs_i++;
1088         if (lhs_item == NULL)
1089             break;
1090         Item* rhs_item = rhs_i++;
1091         assert_always(rhs_item != NULL);
1092         marshall_update(update_message, lhs_item, rhs_item, table, share);
1093     }
1094 
1095     rwlock_t_lock_read(share->_num_DBs_lock);
1096 
1097     // hot index in progress
1098     if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
1099         error = ENOTSUP; // run on the slow path
1100     } else {
1101         // send the upsert message
1102         DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
1103         update_dbt.data = update_message.data();
1104         update_dbt.size = update_message.size();
1105         error =
1106             share->key_file[primary_key]->update(
1107                 share->key_file[primary_key],
1108                 txn,
1109                 &key_dbt,
1110                 &update_dbt,
1111                 0);
1112     }
1113 
1114     share->_num_DBs_lock.unlock();
1115 
1116     return error;
1117 }
1118