1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     TokuDBis is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     TokuDB is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with TokuDB.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ======= */
23 
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25 
26 // Point updates and upserts
27 
28 // Restrictions:
29 //   No triggers
30 //   Statement or mixed replication
31 //   Primary key must be defined
32 //   Simple and compound primary key
33 //   Int, char and varchar primary key types
34 //   No updates on fields that are part of any key
35 //   No clustering keys
36 //   Integer and char field updates
37 //   Update expressions:
38 //       x = constant
39 //       x = x + constant
40 //       x = x - constant
41 //       x = if (x=0,0,x-1)
42 //       x = x + values(x)
43 //   Session variable disables slow updates and slow upserts
44 
45 // Future features:
46 //   Support more primary key types
47 //   Force statement logging for fast updates
48 //   Support clustering keys using broadcast updates
49 //   Support primary key ranges using multicast messages
50 //   Support more complicated update expressions
51 //   Replace field_offset
52 
53 // Debug function to dump an Item
dump_item(Item * item)54 static void dump_item(Item* item) {
55     fprintf(stderr, "%u", item->type());
56     switch (item->type()) {
57     case Item::FUNC_ITEM: {
58         Item_func* func = static_cast<Item_func*>(item);
59         uint n = func->argument_count();
60         Item** arguments = func->arguments();
61         fprintf(
62             stderr,
63             ":func=%u,%s,%u(",
64             func->functype(),
65             func->func_name(),
66             n);
67         for (uint i = 0; i < n ; i++) {
68             dump_item(arguments[i]);
69             if (i < n-1)
70                 fprintf(stderr,",");
71         }
72         fprintf(stderr, ")");
73         break;
74     }
75     case Item::INT_ITEM: {
76         Item_int* int_item = static_cast<Item_int*>(item);
77         fprintf(stderr, ":int=%lld", int_item->val_int());
78         break;
79     }
80     case Item::STRING_ITEM: {
81         Item_string* str_item = static_cast<Item_string*>(item);
82         fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
83         break;
84     }
85     case Item::FIELD_ITEM: {
86         Item_field* field_item = static_cast<Item_field*>(item);
87         fprintf(
88             stderr,
89             ":field=%s.%s.%s",
90             field_item->db_name,
91             field_item->table_name,
92             field_item->field_name);
93         break;
94     }
95     case Item::COND_ITEM: {
96         Item_cond* cond_item = static_cast<Item_cond*>(item);
97         fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
98         List_iterator<Item> li(*cond_item->argument_list());
99         Item* list_item;
100         while ((list_item = li++)) {
101             dump_item(list_item);
102             fprintf(stderr, "\n");
103         }
104         fprintf(stderr, ")\n");
105         break;
106     }
107     case Item::INSERT_VALUE_ITEM: {
108         Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
109         fprintf(stderr, ":insert_value");
110         dump_item(value_item->arg);
111         break;
112     }
113     default:
114         fprintf(stderr, ":unsupported\n");
115         break;
116     }
117 }
118 
119 // Debug function to dump an Item list
dump_item_list(const char * h,List<Item> & l)120 static void dump_item_list(const char* h, List<Item> &l) {
121     fprintf(stderr, "%s elements=%u\n", h, l.elements);
122     List_iterator<Item> li(l);
123     Item* item;
124     while ((item = li++) != NULL) {
125         dump_item(item);
126         fprintf(stderr, "\n");
127     }
128 }
129 
130 // Find a Field by its Item name
find_field_by_name(TOKUDB_UNUSED (TABLE * table),Item * item)131 static Field* find_field_by_name(TOKUDB_UNUSED(TABLE* table), Item* item) {
132     if (item->type() != Item::FIELD_ITEM)
133         return NULL;
134     Item_field* field_item = static_cast<Item_field*>(item);
135 #if 0
136     if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
137         strcmp(table->s->table_name.str, field_item->table_name) != 0)
138         return NULL;
139     Field *found_field = NULL;
140     for (uint i = 0; i < table->s->fields; i++) {
141         Field *test_field = table->s->field[i];
142         if (strcmp(field_item->field_name, test_field->field_name) == 0) {
143             found_field = test_field;
144             break;
145         }
146     }
147     return found_field;
148 #else
149     // item->field may be a shortcut instead of the above table lookup
150     return field_item->field;
151 #endif
152 }
153 
154 // Return the starting offset in the value for a particular index (selected by idx) of a
155 // particular field (selected by expand_field_num).
156 // This only works for fixed length fields
fixed_field_offset(uint32_t null_bytes,KEY_AND_COL_INFO * kc_info,uint idx,uint expand_field_num)157 static uint32_t fixed_field_offset(
158     uint32_t null_bytes,
159     KEY_AND_COL_INFO* kc_info,
160     uint idx,
161     uint expand_field_num) {
162 
163     uint32_t offset = null_bytes;
164     for (uint i = 0; i < expand_field_num; i++) {
165         if (bitmap_is_set(&kc_info->key_filters[idx], i))
166             continue;
167         offset += kc_info->field_lengths[i];
168     }
169     return offset;
170 }
171 
var_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint idx,uint field_num)172 static uint32_t var_field_index(
173     TABLE* table,
174     KEY_AND_COL_INFO* kc_info,
175     uint idx,
176     uint field_num) {
177 
178     assert_always(field_num < table->s->fields);
179     uint v_index = 0;
180     for (uint i = 0; i < table->s->fields; i++) {
181         if (bitmap_is_set(&kc_info->key_filters[idx], i))
182             continue;
183         if (kc_info->length_bytes[i]) {
184             if (i == field_num)
185                 break;
186             v_index++;
187         }
188     }
189     return v_index;
190 }
191 
blob_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint field_num)192 static uint32_t blob_field_index(TABLE* table,
193                                  KEY_AND_COL_INFO* kc_info,
194                                  uint field_num) {
195     assert_always(field_num < table->s->fields);
196     uint b_index;
197     for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
198         if (kc_info->blob_fields[b_index] == field_num)
199             break;
200     }
201     assert_always(b_index < kc_info->num_blobs);
202     return b_index;
203 }
204 
205 // Determine if an update operation can be offloaded to the storage engine.
206 // The update operation consists of a list of update expressions
207 // (fields[i] = values[i]), and a list of where conditions (conds).
208 // The function returns 0 if the update is handled in the storage engine.
209 // Otherwise, an error is returned.
fast_update(THD * thd,List<Item> & update_fields,List<Item> & update_values,Item * conds)210 int ha_tokudb::fast_update(
211     THD* thd,
212     List<Item>& update_fields,
213     List<Item>& update_values,
214     Item* conds) {
215 
216     TOKUDB_HANDLER_DBUG_ENTER("");
217     int error = 0;
218 
219     if (!tokudb::sysvars::enable_fast_update(thd)) {
220         error = ENOTSUP;
221         goto exit;
222     }
223 
224     if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
225         dump_item_list("fields", update_fields);
226         dump_item_list("values", update_values);
227         if (conds) {
228             fprintf(stderr, "conds\n");
229             dump_item(conds);
230             fprintf(stderr, "\n");
231         }
232     }
233 
234     if (update_fields.elements < 1 ||
235         update_fields.elements != update_values.elements) {
236         error = ENOTSUP;  // something is fishy with the parameters
237         goto exit;
238     }
239 
240     if (!check_fast_update(thd, update_fields, update_values, conds)) {
241         error = HA_ERR_UNSUPPORTED;
242         goto exit;
243     }
244 
245     error = send_update_message(
246         update_fields, update_values, conds, transaction);
247 
248     if (error) {
249         int mapped_error = map_to_handler_error(error);
250         if (mapped_error == error)
251             error = HA_ERR_UNSUPPORTED;
252     }
253 
254 exit:
255 
256     if (error != 0 && error != ENOTSUP)
257         print_error(error, MYF(0));
258 
259     TOKUDB_HANDLER_DBUG_RETURN(error);
260 }
261 
262 // Return true if an expression is a simple int expression or a simple function
263 // of +- int expression.
check_int_result(Item * item)264 static bool check_int_result(Item* item) {
265     Item::Type t = item->type();
266     if (t == Item::INT_ITEM) {
267         return true;
268     } else if (t == Item::FUNC_ITEM) {
269         Item_func* item_func = static_cast<Item_func*>(item);
270         if (strcmp(item_func->func_name(), "+") != 0 &&
271             strcmp(item_func->func_name(), "-") != 0)
272             return false;
273         if (item_func->argument_count() != 1)
274             return false;
275         Item** arguments = item_func->arguments();
276         if (arguments[0]->type() != Item::INT_ITEM)
277             return false;
278         return true;
279     } else
280         return false;
281 }
282 
283 // check that an item is an insert value item with the same field name
check_insert_value(Item * item,const char * field_name)284 static bool check_insert_value(Item* item, const char* field_name) {
285     if (item->type() != Item::INSERT_VALUE_ITEM)
286         return false;
287     Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
288     if (value_item->arg->type() != Item::FIELD_ITEM)
289         return false;
290     Item_field* arg = static_cast<Item_field*>(value_item->arg);
291     if (strcmp(field_name, arg->field_name) != 0)
292         return false;
293     return true;
294 }
295 
296 // Return true if an expression looks like field_name op constant.
check_x_op_constant(const char * field_name,Item * item,const char * op,Item ** item_constant,bool allow_insert_value)297 static bool check_x_op_constant(
298     const char* field_name,
299     Item* item,
300     const char* op,
301     Item** item_constant,
302     bool allow_insert_value) {
303 
304     if (item->type() != Item::FUNC_ITEM)
305         return false;
306     Item_func* item_func = static_cast<Item_func*>(item);
307     if (strcmp(item_func->func_name(), op) != 0)
308         return false;
309     Item** arguments = item_func->arguments();
310     uint n = item_func->argument_count();
311     if (n != 2)
312         return false;
313     if (arguments[0]->type() != Item::FIELD_ITEM)
314         return false;
315     Item_field* arg0 = static_cast<Item_field*>(arguments[0]);
316     if (strcmp(field_name, arg0->field_name) != 0)
317         return false;
318     if (!check_int_result(arguments[1]))
319         if (!(allow_insert_value &&
320             check_insert_value(arguments[1], field_name)))
321             return false;
322     *item_constant = arguments[1];
323     return true;
324 }
325 
326 // Return true if an expression looks like field_name = constant
check_x_equal_0(const char * field_name,Item * item)327 static bool check_x_equal_0(const char *field_name, Item *item) {
328     Item* item_constant;
329     if (!check_x_op_constant(field_name, item, "=", &item_constant, false))
330         return false;
331     if (item_constant->type() != Item::INT_ITEM ||
332         item_constant->val_int() != 0)
333         return false;
334     return true;
335 }
336 
337 // Return true if an expression looks like fieldname - 1
check_x_minus_1(const char * field_name,Item * item)338 static bool check_x_minus_1(const char* field_name, Item* item) {
339     Item* item_constant;
340     if (!check_x_op_constant(field_name, item, "-", &item_constant, false))
341         return false;
342     if (item_constant->type() != Item::INT_ITEM ||
343         item_constant->val_int() != 1)
344         return false;
345     return true;
346 }
347 
348 // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
349 // the field named by fieldname is an unsigned int.
check_decr_floor_expression(Field * lhs_field,Item * item)350 static bool check_decr_floor_expression(Field* lhs_field, Item* item) {
351     if (item->type() != Item::FUNC_ITEM)
352         return false;
353     Item_func* item_func = static_cast<Item_func*>(item);
354     if (strcmp(item_func->func_name(), "if") != 0)
355         return false;
356     Item** arguments = item_func->arguments();
357     uint n = item_func->argument_count();
358     if (n != 3)
359         return false;
360     if (!check_x_equal_0(lhs_field->field_name, arguments[0]))
361         return false;
362     if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
363         return false;
364     if (!check_x_minus_1(lhs_field->field_name, arguments[2]))
365         return false;
366     if (!(lhs_field->flags & UNSIGNED_FLAG))
367         return false;
368     return true;
369 }
370 
371 // Check if lhs = rhs expression is simple.  Return true if it is.
check_update_expression(Item * lhs_item,Item * rhs_item,TABLE * table,bool allow_insert_value)372 static bool check_update_expression(
373     Item* lhs_item,
374     Item* rhs_item,
375     TABLE* table,
376     bool allow_insert_value) {
377 
378     Field* lhs_field = find_field_by_name(table, lhs_item);
379     if (lhs_field == NULL)
380         return false;
381     if (!lhs_field->part_of_key.is_clear_all())
382         return false;
383     enum_field_types lhs_type = lhs_field->type();
384     Item::Type rhs_type = rhs_item->type();
385     switch (lhs_type) {
386     case MYSQL_TYPE_TINY:
387     case MYSQL_TYPE_SHORT:
388     case MYSQL_TYPE_INT24:
389     case MYSQL_TYPE_LONG:
390     case MYSQL_TYPE_LONGLONG:
391         if (check_int_result(rhs_item))
392             return true;
393         Item* item_constant;
394         if (check_x_op_constant(
395                 lhs_field->field_name,
396                 rhs_item,
397                 "+",
398                 &item_constant,
399                 allow_insert_value))
400             return true;
401         if (check_x_op_constant(
402                 lhs_field->field_name,
403                 rhs_item,
404                 "-",
405                 &item_constant,
406                 allow_insert_value))
407             return true;
408         if (check_decr_floor_expression(lhs_field, rhs_item))
409             return true;
410         break;
411     case MYSQL_TYPE_STRING:
412         if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM)
413             return true;
414         break;
415     case MYSQL_TYPE_VARCHAR:
416     case MYSQL_TYPE_BLOB:
417         if (rhs_type == Item::STRING_ITEM)
418             return true;
419         break;
420     default:
421         break;
422     }
423     return false;
424 }
425 
426 // Check that all update expressions are simple.  Return true if they are.
check_all_update_expressions(List<Item> & fields,List<Item> & values,TABLE * table,bool allow_insert_value)427 static bool check_all_update_expressions(
428     List<Item>& fields,
429     List<Item>& values,
430     TABLE* table,
431     bool allow_insert_value) {
432 
433     List_iterator<Item> lhs_i(fields);
434     List_iterator<Item> rhs_i(values);
435     while (1) {
436         Item* lhs_item = lhs_i++;
437         if (lhs_item == NULL)
438             break;
439         Item* rhs_item = rhs_i++;
440         assert_always(rhs_item != NULL);
441         if (!check_update_expression(
442                 lhs_item,
443                 rhs_item,
444                 table,
445                 allow_insert_value))
446             return false;
447     }
448     return true;
449 }
450 
full_field_in_key(TABLE * table,Field * field)451 static bool full_field_in_key(TABLE* table, Field* field) {
452     assert_always(table->s->primary_key < table->s->keys);
453     KEY* key = &table->s->key_info[table->s->primary_key];
454     for (uint i = 0; i < key->user_defined_key_parts; i++) {
455         KEY_PART_INFO* key_part = &key->key_part[i];
456         if (strcmp(field->field_name, key_part->field->field_name) == 0) {
457             return key_part->length == field->field_length;
458         }
459     }
460     return false;
461 }
462 
463 // Check that an expression looks like fieldname = constant, fieldname is part
464 // of the primary key, and the named field is an int, char or varchar type.
465 // Return true if it does.
check_pk_field_equal_constant(Item * item,TABLE * table,MY_BITMAP * pk_fields)466 static bool check_pk_field_equal_constant(
467     Item* item,
468     TABLE* table,
469     MY_BITMAP* pk_fields) {
470 
471     if (item->type() != Item::FUNC_ITEM)
472         return false;
473     Item_func* func = static_cast<Item_func*>(item);
474     if (strcmp(func->func_name(), "=") != 0)
475         return false;
476     uint n = func->argument_count();
477     if (n != 2)
478         return false;
479     Item** arguments = func->arguments();
480     Field* field = find_field_by_name(table, arguments[0]);
481     if (field == NULL)
482         return false;
483     if (!bitmap_test_and_clear(pk_fields, field->field_index))
484         return false;
485     switch (field->type()) {
486     case MYSQL_TYPE_TINY:
487     case MYSQL_TYPE_SHORT:
488     case MYSQL_TYPE_INT24:
489     case MYSQL_TYPE_LONG:
490     case MYSQL_TYPE_LONGLONG:
491         return arguments[1]->type() == Item::INT_ITEM ||
492             arguments[1]->type() == Item::STRING_ITEM;
493     case MYSQL_TYPE_STRING:
494     case MYSQL_TYPE_VARCHAR:
495         return full_field_in_key(table, field) &&
496             (arguments[1]->type() == Item::INT_ITEM ||
497              arguments[1]->type() == Item::STRING_ITEM);
498     default:
499         return false;
500     }
501 }
502 
503 // Check that the where condition covers all of the primary key components
504 // with fieldname = constant expressions.  Return true if it does.
check_point_update(Item * conds,TABLE * table)505 static bool check_point_update(Item* conds, TABLE* table) {
506     bool result = false;
507 
508     if (conds == NULL)
509         return false; // no where condition on the update
510 
511     if (table->s->primary_key >= table->s->keys)
512         return false; // no primary key defined
513 
514     // use a bitmap of the primary key fields to keep track of those fields
515     // that are covered by the where conditions
516     MY_BITMAP pk_fields;
517     if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
518         return false;
519     KEY *key = &table->s->key_info[table->s->primary_key];
520     for (uint i = 0; i < key->user_defined_key_parts; i++)
521         bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);
522 
523     switch (conds->type()) {
524     case Item::FUNC_ITEM:
525         result = check_pk_field_equal_constant(conds, table, &pk_fields);
526         break;
527     case Item::COND_ITEM: {
528         Item_cond* cond_item = static_cast<Item_cond*>(conds);
529         if (strcmp(cond_item->func_name(), "and") != 0)
530             break;
531         List_iterator<Item> li(*cond_item->argument_list());
532         Item* list_item;
533         result = true;
534         while (result == true && (list_item = li++)) {
535             result = check_pk_field_equal_constant(
536                 list_item,
537                 table,
538                 &pk_fields);
539         }
540         break;
541     }
542     default:
543         break;
544     }
545 
546     if (!bitmap_is_clear_all(&pk_fields))
547         result = false;
548     bitmap_free(&pk_fields);
549     return result;
550 }
551 
552 // Return true if there are any clustering keys (except the primary).
553 // Precompute this when the table is opened.
clustering_keys_exist(TABLE * table)554 static bool clustering_keys_exist(TABLE *table) {
555     for (uint keynr = 0; keynr < table->s->keys; keynr++) {
556         if (keynr != table->s->primary_key &&
557             key_is_clustering(&table->s->key_info[keynr]))
558             return true;
559     }
560     return false;
561 }
562 
is_strict_mode(THD * thd)563 static bool is_strict_mode(THD* thd) {
564 #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
565     return thd->is_strict_mode();
566 #else
567     return tokudb_test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
568 #endif
569 }
570 
571 // Check if an update operation can be handled by this storage engine.
572 // Return true if it can.
check_fast_update(THD * thd,List<Item> & fields,List<Item> & values,Item * conds)573 bool ha_tokudb::check_fast_update(
574     THD* thd,
575     List<Item>& fields,
576     List<Item>& values,
577     Item* conds) {
578 
579     if (!transaction)
580         return false;
581 
582     // avoid strict mode arithmetic overflow issues
583     if (is_strict_mode(thd))
584         return false;
585 
586     // no triggers
587     if (table->triggers)
588         return false;
589 
590     // no binlog
591     if (mysql_bin_log.is_open() &&
592         !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
593           thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
594         return false;
595 
596     // no clustering keys (need to broadcast an increment into the clustering
597     // keys since we are selecting with the primary key)
598     if (clustering_keys_exist(table))
599         return false;
600 
601     if (!check_all_update_expressions(fields, values, table, false))
602         return false;
603 
604     if (!check_point_update(conds, table))
605         return false;
606 
607     return true;
608 }
609 
marshall_varchar_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info,uint key_num)610 static void marshall_varchar_descriptor(
611     tokudb::buffer& b,
612     TABLE* table,
613     KEY_AND_COL_INFO* kc_info,
614     uint key_num) {
615 
616     b.append_ui<uint32_t>('v');
617     b.append_ui<uint32_t>(
618         table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
619     uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
620     b.append_ui<uint32_t>(var_offset_bytes);
621     b.append_ui<uint32_t>(
622         var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
623 }
624 
marshall_blobs_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info)625 static void marshall_blobs_descriptor(
626     tokudb::buffer& b,
627     TABLE* table,
628     KEY_AND_COL_INFO* kc_info) {
629 
630     b.append_ui<uint32_t>('b');
631     uint32_t n = kc_info->num_blobs;
632     b.append_ui<uint32_t>(n);
633     for (uint i = 0; i < n; i++) {
634         uint blob_field_index = kc_info->blob_fields[i];
635         assert_always(blob_field_index < table->s->fields);
636         uint8_t blob_field_length =
637             table->s->field[blob_field_index]->row_pack_length();
638         b.append(&blob_field_length, sizeof blob_field_length);
639     }
640 }
641 
642 static inline uint32_t get_null_bit_position(uint32_t null_bit);
643 
644 // evaluate the int value of an item
item_val_int(Item * item)645 static longlong item_val_int(Item* item) {
646     Item::Type t = item->type();
647     if (t == Item::INSERT_VALUE_ITEM) {
648         Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
649         return value_item->arg->val_int();
650     } else
651         return item->val_int();
652 }
653 
654 // Marshall update operations to a buffer.
marshall_update(tokudb::buffer & b,Item * lhs_item,Item * rhs_item,TABLE * table,TOKUDB_SHARE * share)655 static void marshall_update(
656     tokudb::buffer& b,
657     Item* lhs_item,
658     Item* rhs_item,
659     TABLE* table,
660     TOKUDB_SHARE* share) {
661 
662     // figure out the update operation type (again)
663     Field* lhs_field = find_field_by_name(table, lhs_item);
664     assert_always(lhs_field); // we found it before, so this should work
665 
666     // compute the update info
667     uint32_t field_type;
668     uint32_t field_null_num = 0;
669     if (lhs_field->real_maybe_null()) {
670         uint32_t field_num = lhs_field->field_index;
671         field_null_num =
672             ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
673     }
674     uint32_t offset;
675     void* v_ptr = NULL;
676     uint32_t v_length;
677     uint32_t update_operation;
678     longlong v_ll;
679     String v_str;
680 
681     switch (lhs_field->type()) {
682     case MYSQL_TYPE_TINY:
683     case MYSQL_TYPE_SHORT:
684     case MYSQL_TYPE_INT24:
685     case MYSQL_TYPE_LONG:
686     case MYSQL_TYPE_LONGLONG: {
687         Field_num* lhs_num = static_cast<Field_num*>(lhs_field);
688         field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
689         offset =
690             fixed_field_offset(
691                 table->s->null_bytes,
692                 &share->kc_info,
693                 table->s->primary_key,
694                 lhs_field->field_index);
695         switch (rhs_item->type()) {
696         case Item::INT_ITEM: {
697             update_operation = '=';
698             v_ll = rhs_item->val_int();
699             v_length = lhs_field->pack_length();
700             v_ptr = &v_ll;
701             break;
702         }
703         case Item::FUNC_ITEM: {
704             Item_func* rhs_func = static_cast<Item_func*>(rhs_item);
705             Item** arguments = rhs_func->arguments();
706             // we only support one if function for now, and it is a
707             // decrement with floor.
708             if (strcmp(rhs_func->func_name(), "if") == 0) {
709                 update_operation = '-';
710                 v_ll = 1;
711             } else if (rhs_func->argument_count() == 1) {
712                 update_operation = '=';
713                 v_ll = rhs_func->val_int();
714             } else {
715                 update_operation = rhs_func->func_name()[0];
716                 v_ll = item_val_int(arguments[1]);
717             }
718             v_length = lhs_field->pack_length();
719             v_ptr = &v_ll;
720             break;
721         }
722         default:
723             assert_unreachable();
724         }
725         break;
726     }
727     case MYSQL_TYPE_STRING: {
728         update_operation = '=';
729         field_type =
730             lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
731         offset =
732             fixed_field_offset(
733                 table->s->null_bytes,
734                 &share->kc_info,
735                 table->s->primary_key,
736                 lhs_field->field_index);
737         v_str = *rhs_item->val_str(&v_str);
738         v_length = v_str.length();
739         if (v_length >= lhs_field->pack_length()) {
740             v_length = lhs_field->pack_length();
741             v_str.length(v_length); // truncate
742         } else {
743             v_length = lhs_field->pack_length();
744             uchar pad_char =
745                 lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
746             v_str.fill(lhs_field->pack_length(), pad_char); // pad
747         }
748         v_ptr = v_str.c_ptr();
749         break;
750     }
751     case MYSQL_TYPE_VARCHAR: {
752         update_operation = '=';
753         field_type =
754             lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
755         offset =
756             var_field_index(
757                 table,
758                 &share->kc_info,
759                 table->s->primary_key,
760                 lhs_field->field_index);
761         v_str = *rhs_item->val_str(&v_str);
762         v_length = v_str.length();
763         if (v_length >= lhs_field->row_pack_length()) {
764             v_length = lhs_field->row_pack_length();
765             v_str.length(v_length); // truncate
766         }
767         v_ptr = v_str.c_ptr();
768         break;
769     }
770     case MYSQL_TYPE_BLOB: {
771         update_operation = '=';
772         field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
773         offset =
774             blob_field_index(table, &share->kc_info, lhs_field->field_index);
775         v_str = *rhs_item->val_str(&v_str);
776         v_length = v_str.length();
777         if (v_length >= lhs_field->max_data_length()) {
778             v_length = lhs_field->max_data_length();
779             v_str.length(v_length); // truncate
780         }
781         v_ptr = v_str.c_ptr();
782         break;
783     }
784     default:
785         assert_unreachable();
786     }
787 
788     // marshall the update fields into the buffer
789     b.append_ui<uint32_t>(update_operation);
790     b.append_ui<uint32_t>(field_type);
791     b.append_ui<uint32_t>(field_null_num);
792     b.append_ui<uint32_t>(offset);
793     b.append_ui<uint32_t>(v_length);
794     b.append(v_ptr, v_length);
795 }
796 
797 // Save an item's value into the appropriate field.  Return 0 if successful.
save_in_field(Item * item,TABLE * table)798 static int save_in_field(Item* item, TABLE* table) {
799     assert_always(item->type() == Item::FUNC_ITEM);
800     Item_func *func = static_cast<Item_func*>(item);
801     assert_always(strcmp(func->func_name(), "=") == 0);
802     uint n = func->argument_count();
803     assert_always(n == 2);
804     Item **arguments = func->arguments();
805     assert_always(arguments[0]->type() == Item::FIELD_ITEM);
806     Item_field *field_item = static_cast<Item_field*>(arguments[0]);
807     my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
808     int error = arguments[1]->save_in_field(field_item->field, 0);
809     dbug_tmp_restore_column_map(table->write_set, old_map);
810     return error;
811 }
812 
count_update_types(Field * lhs_field,uint * num_varchars,uint * num_blobs)813 static void count_update_types(
814     Field* lhs_field,
815     uint* num_varchars,
816     uint* num_blobs) {
817 
818     switch (lhs_field->type()) {
819     case MYSQL_TYPE_VARCHAR:
820         *num_varchars += 1;
821         break;
822     case MYSQL_TYPE_BLOB:
823         *num_blobs += 1;
824         break;
825     default:
826         break;
827     }
828 }
829 
830 // Generate an update message for an update operation and send it into the
831 // primary tree.  Return 0 if successful.
send_update_message(List<Item> & update_fields,List<Item> & update_values,Item * conds,DB_TXN * txn)832 int ha_tokudb::send_update_message(
833     List<Item>& update_fields,
834     List<Item>& update_values,
835     Item* conds,
836     DB_TXN* txn) {
837 
838     int error;
839 
840     // Save the primary key from the where conditions
841     Item::Type t = conds->type();
842     if (t == Item::FUNC_ITEM) {
843         error = save_in_field(conds, table);
844     } else if (t == Item::COND_ITEM) {
845         Item_cond* cond_item = static_cast<Item_cond*>(conds);
846         List_iterator<Item> li(*cond_item->argument_list());
847         Item* list_item;
848         error = 0;
849         while (error == 0 && (list_item = li++)) {
850             error = save_in_field(list_item, table);
851         }
852     } else {
853         assert_unreachable();
854     }
855     if (error)
856         return error;
857 
858     // put the primary key into key_buff and wrap it with key_dbt
859     DBT key_dbt;
860     bool has_null;
861     create_dbt_key_from_table(
862         &key_dbt,
863         primary_key,
864         key_buff,
865         table->record[0],
866         &has_null);
867 
868     // construct the update message
869     tokudb::buffer update_message;
870 
871     uint8_t op = UPDATE_OP_UPDATE_2;
872     update_message.append(&op, sizeof op);
873 
874     uint32_t num_updates = update_fields.elements;
875     uint num_varchars = 0, num_blobs = 0;
876     if (1) {
877         List_iterator<Item> lhs_i(update_fields);
878         Item* lhs_item;
879         while ((lhs_item = lhs_i++)) {
880             if (lhs_item == NULL)
881                 break;
882             Field* lhs_field = find_field_by_name(table, lhs_item);
883             assert_always(lhs_field); // we found it before, so this should work
884             count_update_types(lhs_field, &num_varchars, &num_blobs);
885         }
886         if (num_varchars > 0 || num_blobs > 0)
887             num_updates++;
888         if (num_blobs > 0)
889             num_updates++;
890     }
891 
892     // append the updates
893     update_message.append_ui<uint32_t>(num_updates);
894 
895     if (num_varchars > 0 || num_blobs > 0)
896         marshall_varchar_descriptor(
897             update_message,
898             table,
899             &share->kc_info,
900             table->s->primary_key);
901     if (num_blobs > 0)
902         marshall_blobs_descriptor(update_message, table, &share->kc_info);
903 
904     List_iterator<Item> lhs_i(update_fields);
905     List_iterator<Item> rhs_i(update_values);
906     while (error == 0) {
907         Item* lhs_item = lhs_i++;
908         if (lhs_item == NULL)
909             break;
910         Item* rhs_item = rhs_i++;
911         assert_always(rhs_item != NULL);
912         marshall_update(update_message, lhs_item, rhs_item, table, share);
913     }
914 
915     rwlock_t_lock_read(share->_num_DBs_lock);
916 
917     // hot index in progress
918     if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
919         error = ENOTSUP; // run on the slow path
920     } else {
921         // send the update message
922         DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
923         update_dbt.data = update_message.data();
924         update_dbt.size = update_message.size();
925         error =
926             share->key_file[primary_key]->update(
927                 share->key_file[primary_key],
928                 txn,
929                 &key_dbt,
930                 &update_dbt,
931                 0);
932     }
933 
934     share->_num_DBs_lock.unlock();
935 
936     return error;
937 }
938 
939 // Determine if an upsert operation can be offloaded to the storage engine.
940 // An upsert consists of a row and a list of update expressions
941 // (update_fields[i] = update_values[i]).
942 // The function returns 0 if the upsert is handled in the storage engine.
943 // Otherwise, an error code is returned.
upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)944 int ha_tokudb::upsert(
945     THD* thd,
946     List<Item>& update_fields,
947     List<Item>& update_values) {
948 
949     TOKUDB_HANDLER_DBUG_ENTER("");
950     int error = 0;
951 
952     if (!tokudb::sysvars::enable_fast_upsert(thd)) {
953         error = ENOTSUP;
954         goto exit;
955     }
956 
957     if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
958         fprintf(stderr, "upsert\n");
959         dump_item_list("update_fields", update_fields);
960         dump_item_list("update_values", update_values);
961     }
962 
963     // not an upsert or something is fishy with the parameters
964     if (update_fields.elements < 1 ||
965         update_fields.elements != update_values.elements) {
966         error = ENOTSUP;
967         goto exit;
968     }
969 
970     if (!check_upsert(thd, update_fields, update_values)) {
971         error = HA_ERR_UNSUPPORTED;
972         goto exit;
973     }
974 
975     error = send_upsert_message(update_fields, update_values, transaction);
976 
977     if (error) {
978         int mapped_error = map_to_handler_error(error);
979         if (mapped_error == error)
980             error = HA_ERR_UNSUPPORTED;
981     }
982 
983 exit:
984 
985     if (error != 0 && error != ENOTSUP)
986         print_error(error, MYF(0));
987 
988     TOKUDB_HANDLER_DBUG_RETURN(error);
989 }
990 
991 // Check if an upsert can be handled by this storage engine.
992 // Return true if it can.
check_upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)993 bool ha_tokudb::check_upsert(
994     THD* thd,
995     List<Item>& update_fields,
996     List<Item>& update_values) {
997 
998     if (!transaction)
999         return false;
1000 
1001     // avoid strict mode arithmetic overflow issues
1002     if (is_strict_mode(thd))
1003         return false;
1004 
1005     // no triggers
1006     if (table->triggers)
1007         return false;
1008 
1009     // primary key must exist
1010     if (table->s->primary_key >= table->s->keys)
1011         return false;
1012 
1013     // no secondary keys
1014     if (table->s->keys > 1)
1015         return false;
1016 
1017     // no binlog
1018     if (mysql_bin_log.is_open() &&
1019         !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
1020           thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
1021         return false;
1022 
1023     if (!check_all_update_expressions(
1024             update_fields,
1025             update_values,
1026             table,
1027             true))
1028         return false;
1029 
1030     return true;
1031 }
1032 
1033 // Generate an upsert message and send it into the primary tree.
1034 // Return 0 if successful.
send_upsert_message(List<Item> & update_fields,List<Item> & update_values,DB_TXN * txn)1035 int ha_tokudb::send_upsert_message(
1036     List<Item>& update_fields,
1037     List<Item>& update_values,
1038     DB_TXN* txn) {
1039     int error = 0;
1040 
1041     // generate primary key
1042     DBT key_dbt;
1043     bool has_null;
1044     create_dbt_key_from_table(
1045         &key_dbt,
1046         primary_key,
1047         primary_key_buff,
1048         table->record[0],
1049         &has_null);
1050 
1051     // generate packed row
1052     DBT row;
1053     error = pack_row(&row, (const uchar *) table->record[0], primary_key);
1054     if (error)
1055         return error;
1056 
1057     tokudb::buffer update_message;
1058 
1059     // append the operation
1060     uint8_t op = UPDATE_OP_UPSERT_2;
1061     update_message.append(&op, sizeof op);
1062 
1063     // append the row
1064     update_message.append_ui<uint32_t>(row.size);
1065     update_message.append(row.data, row.size);
1066 
1067     uint32_t num_updates = update_fields.elements;
1068     uint num_varchars = 0, num_blobs = 0;
1069     if (1) {
1070         List_iterator<Item> lhs_i(update_fields);
1071         Item* lhs_item;
1072         while ((lhs_item = lhs_i++)) {
1073             if (lhs_item == NULL)
1074                 break;
1075             Field* lhs_field = find_field_by_name(table, lhs_item);
1076             assert_always(lhs_field); // we found it before, so this should work
1077             count_update_types(lhs_field, &num_varchars, &num_blobs);
1078         }
1079         if (num_varchars > 0 || num_blobs > 0)
1080             num_updates++;
1081         if (num_blobs > 0)
1082             num_updates++;
1083     }
1084 
1085     // append the updates
1086     update_message.append_ui<uint32_t>(num_updates);
1087 
1088     if (num_varchars > 0 || num_blobs > 0)
1089         marshall_varchar_descriptor(
1090             update_message,
1091             table, &share->kc_info,
1092             table->s->primary_key);
1093     if (num_blobs > 0)
1094         marshall_blobs_descriptor(update_message, table, &share->kc_info);
1095 
1096     List_iterator<Item> lhs_i(update_fields);
1097     List_iterator<Item> rhs_i(update_values);
1098     while (1) {
1099         Item* lhs_item = lhs_i++;
1100         if (lhs_item == NULL)
1101             break;
1102         Item* rhs_item = rhs_i++;
1103         assert_always(rhs_item != NULL);
1104         marshall_update(update_message, lhs_item, rhs_item, table, share);
1105     }
1106 
1107     rwlock_t_lock_read(share->_num_DBs_lock);
1108 
1109     // hot index in progress
1110     if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
1111         error = ENOTSUP; // run on the slow path
1112     } else {
1113         // send the upsert message
1114         DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
1115         update_dbt.data = update_message.data();
1116         update_dbt.size = update_message.size();
1117         error =
1118             share->key_file[primary_key]->update(
1119                 share->key_file[primary_key],
1120                 txn,
1121                 &key_dbt,
1122                 &update_dbt,
1123                 0);
1124     }
1125 
1126     share->_num_DBs_lock.unlock();
1127 
1128     return error;
1129 }
1130