1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDBis is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22 ======= */
23
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26 // Point updates and upserts
27
28 // Restrictions:
29 // No triggers
30 // Statement or mixed replication
31 // Primary key must be defined
32 // Simple and compound primary key
33 // Int, char and varchar primary key types
34 // No updates on fields that are part of any key
35 // No clustering keys
36 // Integer and char field updates
37 // Update expressions:
38 // x = constant
39 // x = x + constant
40 // x = x - constant
41 // x = if (x=0,0,x-1)
42 // x = x + values(x)
43 // Session variable disables slow updates and slow upserts
44
45 // Future features:
46 // Support more primary key types
47 // Force statement logging for fast updates
48 // Support clustering keys using broadcast updates
49 // Support primary key ranges using multicast messages
50 // Support more complicated update expressions
51 // Replace field_offset
52
53 #if defined(TOKU_INCLUDE_UPSERT) && TOKU_INCLUDE_UPSERT
54 // Debug function to dump an Item
dump_item(Item * item)55 static void dump_item(Item* item) {
56 fprintf(stderr, "%u", item->type());
57 switch (item->type()) {
58 case Item::FUNC_ITEM: {
59 Item_func* func = static_cast<Item_func*>(item);
60 uint n = func->argument_count();
61 Item** arguments = func->arguments();
62 fprintf(
63 stderr,
64 ":func=%u,%s,%u(",
65 func->functype(),
66 func->func_name(),
67 n);
68 for (uint i = 0; i < n ; i++) {
69 dump_item(arguments[i]);
70 if (i < n-1)
71 fprintf(stderr,",");
72 }
73 fprintf(stderr, ")");
74 break;
75 }
76 case Item::INT_ITEM: {
77 Item_int* int_item = static_cast<Item_int*>(item);
78 fprintf(stderr, ":int=%lld", int_item->val_int());
79 break;
80 }
81 case Item::STRING_ITEM: {
82 Item_string* str_item = static_cast<Item_string*>(item);
83 fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
84 break;
85 }
86 case Item::FIELD_ITEM: {
87 Item_field* field_item = static_cast<Item_field*>(item);
88 fprintf(
89 stderr,
90 ":field=%s.%s.%s",
91 field_item->db_name,
92 field_item->table_name,
93 field_item->field_name.str);
94 break;
95 }
96 case Item::COND_ITEM: {
97 Item_cond* cond_item = static_cast<Item_cond*>(item);
98 fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
99 List_iterator<Item> li(*cond_item->argument_list());
100 Item* list_item;
101 while ((list_item = li++)) {
102 dump_item(list_item);
103 fprintf(stderr, "\n");
104 }
105 fprintf(stderr, ")\n");
106 break;
107 }
108 case Item::INSERT_VALUE_ITEM: {
109 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
110 fprintf(stderr, ":insert_value");
111 dump_item(value_item->arg);
112 break;
113 }
114 default:
115 fprintf(stderr, ":unsupported\n");
116 break;
117 }
118 }
119
120 // Debug function to dump an Item list
dump_item_list(const char * h,List<Item> & l)121 static void dump_item_list(const char* h, List<Item> &l) {
122 fprintf(stderr, "%s elements=%u\n", h, l.elements);
123 List_iterator<Item> li(l);
124 Item* item;
125 while ((item = li++) != NULL) {
126 dump_item(item);
127 fprintf(stderr, "\n");
128 }
129 }
130
131 // Find a Field by its Item name
find_field_by_name(TOKUDB_UNUSED (TABLE * table),Item * item)132 static Field* find_field_by_name(TOKUDB_UNUSED(TABLE* table), Item* item) {
133 if (item->type() != Item::FIELD_ITEM)
134 return NULL;
135 Item_field* field_item = static_cast<Item_field*>(item);
136 #if 0
137 if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
138 strcmp(table->s->table_name.str, field_item->table_name) != 0)
139 return NULL;
140 Field *found_field = NULL;
141 for (uint i = 0; i < table->s->fields; i++) {
142 Field *test_field = table->s->field[i];
143 if (strcmp(field_item->field_name.str, test_field->field_name.str) == 0) {
144 found_field = test_field;
145 break;
146 }
147 }
148 return found_field;
149 #else
150 // item->field may be a shortcut instead of the above table lookup
151 return field_item->field;
152 #endif
153 }
154
155 // Return the starting offset in the value for a particular index (selected by idx) of a
156 // particular field (selected by expand_field_num).
157 // This only works for fixed length fields
fixed_field_offset(uint32_t null_bytes,KEY_AND_COL_INFO * kc_info,uint idx,uint expand_field_num)158 static uint32_t fixed_field_offset(
159 uint32_t null_bytes,
160 KEY_AND_COL_INFO* kc_info,
161 uint idx,
162 uint expand_field_num) {
163
164 uint32_t offset = null_bytes;
165 for (uint i = 0; i < expand_field_num; i++) {
166 if (bitmap_is_set(&kc_info->key_filters[idx], i))
167 continue;
168 offset += kc_info->field_lengths[i];
169 }
170 return offset;
171 }
172
var_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint idx,uint field_num)173 static uint32_t var_field_index(
174 TABLE* table,
175 KEY_AND_COL_INFO* kc_info,
176 uint idx,
177 uint field_num) {
178
179 assert_always(field_num < table->s->fields);
180 uint v_index = 0;
181 for (uint i = 0; i < table->s->fields; i++) {
182 if (bitmap_is_set(&kc_info->key_filters[idx], i))
183 continue;
184 if (kc_info->length_bytes[i]) {
185 if (i == field_num)
186 break;
187 v_index++;
188 }
189 }
190 return v_index;
191 }
192
blob_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint field_num)193 static uint32_t blob_field_index(TABLE* table,
194 KEY_AND_COL_INFO* kc_info,
195 uint field_num) {
196 assert_always(field_num < table->s->fields);
197 uint b_index;
198 for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
199 if (kc_info->blob_fields[b_index] == field_num)
200 break;
201 }
202 assert_always(b_index < kc_info->num_blobs);
203 return b_index;
204 }
205
206 // Determine if an update operation can be offloaded to the storage engine.
207 // The update operation consists of a list of update expressions
208 // (fields[i] = values[i]), and a list of where conditions (conds).
209 // The function returns 0 if the update is handled in the storage engine.
210 // Otherwise, an error is returned.
fast_update(THD * thd,List<Item> & update_fields,List<Item> & update_values,Item * conds)211 int ha_tokudb::fast_update(
212 THD* thd,
213 List<Item>& update_fields,
214 List<Item>& update_values,
215 Item* conds) {
216
217 TOKUDB_HANDLER_DBUG_ENTER("");
218 int error = 0;
219
220 if (!tokudb::sysvars::enable_fast_update(thd)) {
221 error = ENOTSUP;
222 goto exit;
223 }
224
225 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
226 dump_item_list("fields", update_fields);
227 dump_item_list("values", update_values);
228 if (conds) {
229 fprintf(stderr, "conds\n");
230 dump_item(conds);
231 fprintf(stderr, "\n");
232 }
233 }
234
235 if (update_fields.elements < 1 ||
236 update_fields.elements != update_values.elements) {
237 error = ENOTSUP; // something is fishy with the parameters
238 goto exit;
239 }
240
241 if (!check_fast_update(thd, update_fields, update_values, conds)) {
242 error = HA_ERR_UNSUPPORTED;
243 goto exit;
244 }
245
246 error = send_update_message(
247 update_fields, update_values, conds, transaction);
248
249 if (error) {
250 int mapped_error = map_to_handler_error(error);
251 if (mapped_error == error)
252 error = HA_ERR_UNSUPPORTED;
253 }
254
255 exit:
256
257 if (error != 0 && error != ENOTSUP)
258 print_error(error, MYF(0));
259
260 TOKUDB_HANDLER_DBUG_RETURN(error);
261 }
262
263 // Return true if an expression is a simple int expression or a simple function
264 // of +- int expression.
check_int_result(Item * item)265 static bool check_int_result(Item* item) {
266 Item::Type t = item->type();
267 if (t == Item::INT_ITEM) {
268 return true;
269 } else if (t == Item::FUNC_ITEM) {
270 Item_func* item_func = static_cast<Item_func*>(item);
271 if (strcmp(item_func->func_name(), "+") != 0 &&
272 strcmp(item_func->func_name(), "-") != 0)
273 return false;
274 if (item_func->argument_count() != 1)
275 return false;
276 Item** arguments = item_func->arguments();
277 if (arguments[0]->type() != Item::INT_ITEM)
278 return false;
279 return true;
280 } else
281 return false;
282 }
283
284 // check that an item is an insert value item with the same field name
check_insert_value(Item * item,const char * field_name)285 static bool check_insert_value(Item* item, const char* field_name) {
286 if (item->type() != Item::INSERT_VALUE_ITEM)
287 return false;
288 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
289 if (value_item->arg->type() != Item::FIELD_ITEM)
290 return false;
291 Item_field* arg = static_cast<Item_field*>(value_item->arg);
292 if (strcmp(field_name, arg->field_name.str) != 0)
293 return false;
294 return true;
295 }
296
297 // Return true if an expression looks like field_name op constant.
check_x_op_constant(const char * field_name,Item * item,const char * op,Item ** item_constant,bool allow_insert_value)298 static bool check_x_op_constant(
299 const char* field_name,
300 Item* item,
301 const char* op,
302 Item** item_constant,
303 bool allow_insert_value) {
304
305 if (item->type() != Item::FUNC_ITEM)
306 return false;
307 Item_func* item_func = static_cast<Item_func*>(item);
308 if (strcmp(item_func->func_name(), op) != 0)
309 return false;
310 Item** arguments = item_func->arguments();
311 uint n = item_func->argument_count();
312 if (n != 2)
313 return false;
314 if (arguments[0]->type() != Item::FIELD_ITEM)
315 return false;
316 Item_field* arg0 = static_cast<Item_field*>(arguments[0]);
317 if (strcmp(field_name, arg0->field_name.str) != 0)
318 return false;
319 if (!check_int_result(arguments[1]))
320 if (!(allow_insert_value &&
321 check_insert_value(arguments[1], field_name)))
322 return false;
323 *item_constant = arguments[1];
324 return true;
325 }
326
327 // Return true if an expression looks like field_name = constant
check_x_equal_0(const char * field_name,Item * item)328 static bool check_x_equal_0(const char *field_name, Item *item) {
329 Item* item_constant;
330 if (!check_x_op_constant(field_name, item, "=", &item_constant, false))
331 return false;
332 if (item_constant->type() != Item::INT_ITEM ||
333 item_constant->val_int() != 0)
334 return false;
335 return true;
336 }
337
338 // Return true if an expression looks like fieldname - 1
check_x_minus_1(const char * field_name,Item * item)339 static bool check_x_minus_1(const char* field_name, Item* item) {
340 Item* item_constant;
341 if (!check_x_op_constant(field_name, item, "-", &item_constant, false))
342 return false;
343 if (item_constant->type() != Item::INT_ITEM ||
344 item_constant->val_int() != 1)
345 return false;
346 return true;
347 }
348
349 // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
350 // the field named by fieldname is an unsigned int.
check_decr_floor_expression(Field * lhs_field,Item * item)351 static bool check_decr_floor_expression(Field* lhs_field, Item* item) {
352 if (item->type() != Item::FUNC_ITEM)
353 return false;
354 Item_func* item_func = static_cast<Item_func*>(item);
355 if (strcmp(item_func->func_name(), "if") != 0)
356 return false;
357 Item** arguments = item_func->arguments();
358 uint n = item_func->argument_count();
359 if (n != 3)
360 return false;
361 if (!check_x_equal_0(lhs_field->field_name.str, arguments[0]))
362 return false;
363 if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
364 return false;
365 if (!check_x_minus_1(lhs_field->field_name.str, arguments[2]))
366 return false;
367 if (!(lhs_field->flags & UNSIGNED_FLAG))
368 return false;
369 return true;
370 }
371
372 // Check if lhs = rhs expression is simple. Return true if it is.
check_update_expression(Item * lhs_item,Item * rhs_item,TABLE * table,bool allow_insert_value)373 static bool check_update_expression(
374 Item* lhs_item,
375 Item* rhs_item,
376 TABLE* table,
377 bool allow_insert_value) {
378
379 Field* lhs_field = find_field_by_name(table, lhs_item);
380 if (lhs_field == NULL)
381 return false;
382 if (!lhs_field->part_of_key.is_clear_all())
383 return false;
384 enum_field_types lhs_type = lhs_field->type();
385 Item::Type rhs_type = rhs_item->type();
386 switch (lhs_type) {
387 case MYSQL_TYPE_TINY:
388 case MYSQL_TYPE_SHORT:
389 case MYSQL_TYPE_INT24:
390 case MYSQL_TYPE_LONG:
391 case MYSQL_TYPE_LONGLONG:
392 if (check_int_result(rhs_item))
393 return true;
394 Item* item_constant;
395 if (check_x_op_constant(
396 lhs_field->field_name.str,
397 rhs_item,
398 "+",
399 &item_constant,
400 allow_insert_value))
401 return true;
402 if (check_x_op_constant(
403 lhs_field->field_name.str,
404 rhs_item,
405 "-",
406 &item_constant,
407 allow_insert_value))
408 return true;
409 if (check_decr_floor_expression(lhs_field, rhs_item))
410 return true;
411 break;
412 case MYSQL_TYPE_STRING:
413 if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM)
414 return true;
415 break;
416 case MYSQL_TYPE_VARCHAR:
417 case MYSQL_TYPE_BLOB:
418 if (rhs_type == Item::STRING_ITEM)
419 return true;
420 break;
421 default:
422 break;
423 }
424 return false;
425 }
426
427 // Check that all update expressions are simple. Return true if they are.
check_all_update_expressions(List<Item> & fields,List<Item> & values,TABLE * table,bool allow_insert_value)428 static bool check_all_update_expressions(
429 List<Item>& fields,
430 List<Item>& values,
431 TABLE* table,
432 bool allow_insert_value) {
433
434 List_iterator<Item> lhs_i(fields);
435 List_iterator<Item> rhs_i(values);
436 while (1) {
437 Item* lhs_item = lhs_i++;
438 if (lhs_item == NULL)
439 break;
440 Item* rhs_item = rhs_i++;
441 assert_always(rhs_item != NULL);
442 if (!check_update_expression(
443 lhs_item,
444 rhs_item,
445 table,
446 allow_insert_value))
447 return false;
448 }
449 return true;
450 }
451
full_field_in_key(TABLE * table,Field * field)452 static bool full_field_in_key(TABLE* table, Field* field) {
453 assert_always(table->s->primary_key < table->s->keys);
454 KEY* key = &table->s->key_info[table->s->primary_key];
455 for (uint i = 0; i < key->user_defined_key_parts; i++) {
456 KEY_PART_INFO* key_part = &key->key_part[i];
457 if (strcmp(field->field_name.str, key_part->field->field_name.str) == 0) {
458 return key_part->length == field->field_length;
459 }
460 }
461 return false;
462 }
463
464 // Check that an expression looks like fieldname = constant, fieldname is part
465 // of the primary key, and the named field is an int, char or varchar type.
466 // Return true if it does.
check_pk_field_equal_constant(Item * item,TABLE * table,MY_BITMAP * pk_fields)467 static bool check_pk_field_equal_constant(
468 Item* item,
469 TABLE* table,
470 MY_BITMAP* pk_fields) {
471
472 if (item->type() != Item::FUNC_ITEM)
473 return false;
474 Item_func* func = static_cast<Item_func*>(item);
475 if (strcmp(func->func_name(), "=") != 0)
476 return false;
477 uint n = func->argument_count();
478 if (n != 2)
479 return false;
480 Item** arguments = func->arguments();
481 Field* field = find_field_by_name(table, arguments[0]);
482 if (field == NULL)
483 return false;
484 if (!bitmap_test_and_clear(pk_fields, field->field_index))
485 return false;
486 switch (field->type()) {
487 case MYSQL_TYPE_TINY:
488 case MYSQL_TYPE_SHORT:
489 case MYSQL_TYPE_INT24:
490 case MYSQL_TYPE_LONG:
491 case MYSQL_TYPE_LONGLONG:
492 return arguments[1]->type() == Item::INT_ITEM ||
493 arguments[1]->type() == Item::STRING_ITEM;
494 case MYSQL_TYPE_STRING:
495 case MYSQL_TYPE_VARCHAR:
496 return full_field_in_key(table, field) &&
497 (arguments[1]->type() == Item::INT_ITEM ||
498 arguments[1]->type() == Item::STRING_ITEM);
499 default:
500 return false;
501 }
502 }
503
504 // Check that the where condition covers all of the primary key components
505 // with fieldname = constant expressions. Return true if it does.
check_point_update(Item * conds,TABLE * table)506 static bool check_point_update(Item* conds, TABLE* table) {
507 bool result = false;
508
509 if (conds == NULL)
510 return false; // no where condition on the update
511
512 if (table->s->primary_key >= table->s->keys)
513 return false; // no primary key defined
514
515 // use a bitmap of the primary key fields to keep track of those fields
516 // that are covered by the where conditions
517 MY_BITMAP pk_fields;
518 if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
519 return false;
520 KEY *key = &table->s->key_info[table->s->primary_key];
521 for (uint i = 0; i < key->user_defined_key_parts; i++)
522 bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);
523
524 switch (conds->type()) {
525 case Item::FUNC_ITEM:
526 result = check_pk_field_equal_constant(conds, table, &pk_fields);
527 break;
528 case Item::COND_ITEM: {
529 Item_cond* cond_item = static_cast<Item_cond*>(conds);
530 if (strcmp(cond_item->func_name(), "and") != 0)
531 break;
532 List_iterator<Item> li(*cond_item->argument_list());
533 Item* list_item;
534 result = true;
535 while (result == true && (list_item = li++)) {
536 result = check_pk_field_equal_constant(
537 list_item,
538 table,
539 &pk_fields);
540 }
541 break;
542 }
543 default:
544 break;
545 }
546
547 if (!bitmap_is_clear_all(&pk_fields))
548 result = false;
549 bitmap_free(&pk_fields);
550 return result;
551 }
552
553 // Return true if there are any clustering keys (except the primary).
554 // Precompute this when the table is opened.
clustering_keys_exist(TABLE * table)555 static bool clustering_keys_exist(TABLE *table) {
556 for (uint keynr = 0; keynr < table->s->keys; keynr++) {
557 if (keynr != table->s->primary_key &&
558 key_is_clustering(&table->s->key_info[keynr]))
559 return true;
560 }
561 return false;
562 }
563
is_strict_mode(THD * thd)564 static bool is_strict_mode(THD* thd) {
565 #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
566 return thd->is_strict_mode();
567 #else
568 return tokudb_test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
569 #endif
570 }
571
572 // Check if an update operation can be handled by this storage engine.
573 // Return true if it can.
check_fast_update(THD * thd,List<Item> & fields,List<Item> & values,Item * conds)574 bool ha_tokudb::check_fast_update(
575 THD* thd,
576 List<Item>& fields,
577 List<Item>& values,
578 Item* conds) {
579
580 if (!transaction)
581 return false;
582
583 // avoid strict mode arithmetic overflow issues
584 if (is_strict_mode(thd))
585 return false;
586
587 // no triggers
588 if (table->triggers)
589 return false;
590
591 // no binlog
592 if (mysql_bin_log.is_open() &&
593 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
594 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
595 return false;
596
597 // no clustering keys (need to broadcast an increment into the clustering
598 // keys since we are selecting with the primary key)
599 if (clustering_keys_exist(table))
600 return false;
601
602 if (!check_all_update_expressions(fields, values, table, false))
603 return false;
604
605 if (!check_point_update(conds, table))
606 return false;
607
608 return true;
609 }
610
marshall_varchar_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info,uint key_num)611 static void marshall_varchar_descriptor(
612 tokudb::buffer& b,
613 TABLE* table,
614 KEY_AND_COL_INFO* kc_info,
615 uint key_num) {
616
617 b.append_ui<uint32_t>('v');
618 b.append_ui<uint32_t>(
619 table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
620 uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
621 b.append_ui<uint32_t>(var_offset_bytes);
622 b.append_ui<uint32_t>(
623 var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
624 }
625
marshall_blobs_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info)626 static void marshall_blobs_descriptor(
627 tokudb::buffer& b,
628 TABLE* table,
629 KEY_AND_COL_INFO* kc_info) {
630
631 b.append_ui<uint32_t>('b');
632 uint32_t n = kc_info->num_blobs;
633 b.append_ui<uint32_t>(n);
634 for (uint i = 0; i < n; i++) {
635 uint blob_field_index = kc_info->blob_fields[i];
636 assert_always(blob_field_index < table->s->fields);
637 uint8_t blob_field_length =
638 table->s->field[blob_field_index]->row_pack_length();
639 b.append(&blob_field_length, sizeof blob_field_length);
640 }
641 }
642
643 static inline uint32_t get_null_bit_position(uint32_t null_bit);
644
645 // evaluate the int value of an item
item_val_int(Item * item)646 static longlong item_val_int(Item* item) {
647 Item::Type t = item->type();
648 if (t == Item::INSERT_VALUE_ITEM) {
649 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
650 return value_item->arg->val_int();
651 } else
652 return item->val_int();
653 }
654
655 // Marshall update operations to a buffer.
marshall_update(tokudb::buffer & b,Item * lhs_item,Item * rhs_item,TABLE * table,TOKUDB_SHARE * share)656 static void marshall_update(
657 tokudb::buffer& b,
658 Item* lhs_item,
659 Item* rhs_item,
660 TABLE* table,
661 TOKUDB_SHARE* share) {
662
663 // figure out the update operation type (again)
664 Field* lhs_field = find_field_by_name(table, lhs_item);
665 assert_always(lhs_field); // we found it before, so this should work
666
667 // compute the update info
668 uint32_t field_type;
669 uint32_t field_null_num = 0;
670 if (lhs_field->real_maybe_null()) {
671 uint32_t field_num = lhs_field->field_index;
672 field_null_num =
673 ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
674 }
675 uint32_t offset;
676 void* v_ptr = NULL;
677 uint32_t v_length;
678 uint32_t update_operation;
679 longlong v_ll;
680 String v_str;
681
682 switch (lhs_field->type()) {
683 case MYSQL_TYPE_TINY:
684 case MYSQL_TYPE_SHORT:
685 case MYSQL_TYPE_INT24:
686 case MYSQL_TYPE_LONG:
687 case MYSQL_TYPE_LONGLONG: {
688 Field_num* lhs_num = static_cast<Field_num*>(lhs_field);
689 field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
690 offset =
691 fixed_field_offset(
692 table->s->null_bytes,
693 &share->kc_info,
694 table->s->primary_key,
695 lhs_field->field_index);
696 switch (rhs_item->type()) {
697 case Item::INT_ITEM: {
698 update_operation = '=';
699 v_ll = rhs_item->val_int();
700 v_length = lhs_field->pack_length();
701 v_ptr = &v_ll;
702 break;
703 }
704 case Item::FUNC_ITEM: {
705 Item_func* rhs_func = static_cast<Item_func*>(rhs_item);
706 Item** arguments = rhs_func->arguments();
707 // we only support one if function for now, and it is a
708 // decrement with floor.
709 if (strcmp(rhs_func->func_name(), "if") == 0) {
710 update_operation = '-';
711 v_ll = 1;
712 } else if (rhs_func->argument_count() == 1) {
713 update_operation = '=';
714 v_ll = rhs_func->val_int();
715 } else {
716 update_operation = rhs_func->func_name()[0];
717 v_ll = item_val_int(arguments[1]);
718 }
719 v_length = lhs_field->pack_length();
720 v_ptr = &v_ll;
721 break;
722 }
723 default:
724 assert_unreachable();
725 }
726 break;
727 }
728 case MYSQL_TYPE_STRING: {
729 update_operation = '=';
730 field_type =
731 lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
732 offset =
733 fixed_field_offset(
734 table->s->null_bytes,
735 &share->kc_info,
736 table->s->primary_key,
737 lhs_field->field_index);
738 v_str = *rhs_item->val_str(&v_str);
739 v_length = v_str.length();
740 if (v_length >= lhs_field->pack_length()) {
741 v_length = lhs_field->pack_length();
742 v_str.length(v_length); // truncate
743 } else {
744 v_length = lhs_field->pack_length();
745 uchar pad_char =
746 lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
747 v_str.fill(lhs_field->pack_length(), pad_char); // pad
748 }
749 v_ptr = v_str.c_ptr();
750 break;
751 }
752 case MYSQL_TYPE_VARCHAR: {
753 update_operation = '=';
754 field_type =
755 lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
756 offset =
757 var_field_index(
758 table,
759 &share->kc_info,
760 table->s->primary_key,
761 lhs_field->field_index);
762 v_str = *rhs_item->val_str(&v_str);
763 v_length = v_str.length();
764 if (v_length >= lhs_field->row_pack_length()) {
765 v_length = lhs_field->row_pack_length();
766 v_str.length(v_length); // truncate
767 }
768 v_ptr = v_str.c_ptr();
769 break;
770 }
771 case MYSQL_TYPE_BLOB: {
772 update_operation = '=';
773 field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
774 offset =
775 blob_field_index(table, &share->kc_info, lhs_field->field_index);
776 v_str = *rhs_item->val_str(&v_str);
777 v_length = v_str.length();
778 if (v_length >= lhs_field->max_data_length()) {
779 v_length = lhs_field->max_data_length();
780 v_str.length(v_length); // truncate
781 }
782 v_ptr = v_str.c_ptr();
783 break;
784 }
785 default:
786 assert_unreachable();
787 }
788
789 // marshall the update fields into the buffer
790 b.append_ui<uint32_t>(update_operation);
791 b.append_ui<uint32_t>(field_type);
792 b.append_ui<uint32_t>(field_null_num);
793 b.append_ui<uint32_t>(offset);
794 b.append_ui<uint32_t>(v_length);
795 b.append(v_ptr, v_length);
796 }
797
798 // Save an item's value into the appropriate field. Return 0 if successful.
save_in_field(Item * item,TABLE * table)799 static int save_in_field(Item* item, TABLE* table) {
800 assert_always(item->type() == Item::FUNC_ITEM);
801 Item_func *func = static_cast<Item_func*>(item);
802 assert_always(strcmp(func->func_name(), "=") == 0);
803 uint n = func->argument_count();
804 assert_always(n == 2);
805 Item **arguments = func->arguments();
806 assert_always(arguments[0]->type() == Item::FIELD_ITEM);
807 Item_field *field_item = static_cast<Item_field*>(arguments[0]);
808 my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
809 int error = arguments[1]->save_in_field(field_item->field, 0);
810 dbug_tmp_restore_column_map(table->write_set, old_map);
811 return error;
812 }
813
count_update_types(Field * lhs_field,uint * num_varchars,uint * num_blobs)814 static void count_update_types(
815 Field* lhs_field,
816 uint* num_varchars,
817 uint* num_blobs) {
818
819 switch (lhs_field->type()) {
820 case MYSQL_TYPE_VARCHAR:
821 *num_varchars += 1;
822 break;
823 case MYSQL_TYPE_BLOB:
824 *num_blobs += 1;
825 break;
826 default:
827 break;
828 }
829 }
830
831 // Generate an update message for an update operation and send it into the
832 // primary tree. Return 0 if successful.
send_update_message(List<Item> & update_fields,List<Item> & update_values,Item * conds,DB_TXN * txn)833 int ha_tokudb::send_update_message(
834 List<Item>& update_fields,
835 List<Item>& update_values,
836 Item* conds,
837 DB_TXN* txn) {
838
839 int error;
840
841 // Save the primary key from the where conditions
842 Item::Type t = conds->type();
843 if (t == Item::FUNC_ITEM) {
844 error = save_in_field(conds, table);
845 } else if (t == Item::COND_ITEM) {
846 Item_cond* cond_item = static_cast<Item_cond*>(conds);
847 List_iterator<Item> li(*cond_item->argument_list());
848 Item* list_item;
849 error = 0;
850 while (error == 0 && (list_item = li++)) {
851 error = save_in_field(list_item, table);
852 }
853 } else {
854 assert_unreachable();
855 }
856 if (error)
857 return error;
858
859 // put the primary key into key_buff and wrap it with key_dbt
860 DBT key_dbt;
861 bool has_null;
862 create_dbt_key_from_table(
863 &key_dbt,
864 primary_key,
865 key_buff,
866 table->record[0],
867 &has_null);
868
869 // construct the update message
870 tokudb::buffer update_message;
871
872 uint8_t op = UPDATE_OP_UPDATE_2;
873 update_message.append(&op, sizeof op);
874
875 uint32_t num_updates = update_fields.elements;
876 uint num_varchars = 0, num_blobs = 0;
877 if (1) {
878 List_iterator<Item> lhs_i(update_fields);
879 Item* lhs_item;
880 while ((lhs_item = lhs_i++)) {
881 if (lhs_item == NULL)
882 break;
883 Field* lhs_field = find_field_by_name(table, lhs_item);
884 assert_always(lhs_field); // we found it before, so this should work
885 count_update_types(lhs_field, &num_varchars, &num_blobs);
886 }
887 if (num_varchars > 0 || num_blobs > 0)
888 num_updates++;
889 if (num_blobs > 0)
890 num_updates++;
891 }
892
893 // append the updates
894 update_message.append_ui<uint32_t>(num_updates);
895
896 if (num_varchars > 0 || num_blobs > 0)
897 marshall_varchar_descriptor(
898 update_message,
899 table,
900 &share->kc_info,
901 table->s->primary_key);
902 if (num_blobs > 0)
903 marshall_blobs_descriptor(update_message, table, &share->kc_info);
904
905 List_iterator<Item> lhs_i(update_fields);
906 List_iterator<Item> rhs_i(update_values);
907 while (error == 0) {
908 Item* lhs_item = lhs_i++;
909 if (lhs_item == NULL)
910 break;
911 Item* rhs_item = rhs_i++;
912 assert_always(rhs_item != NULL);
913 marshall_update(update_message, lhs_item, rhs_item, table, share);
914 }
915
916 rwlock_t_lock_read(share->_num_DBs_lock);
917
918 // hot index in progress
919 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
920 error = ENOTSUP; // run on the slow path
921 } else {
922 // send the update message
923 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
924 update_dbt.data = update_message.data();
925 update_dbt.size = update_message.size();
926 error =
927 share->key_file[primary_key]->update(
928 share->key_file[primary_key],
929 txn,
930 &key_dbt,
931 &update_dbt,
932 0);
933 }
934
935 share->_num_DBs_lock.unlock();
936
937 return error;
938 }
939
940 // Determine if an upsert operation can be offloaded to the storage engine.
941 // An upsert consists of a row and a list of update expressions
942 // (update_fields[i] = update_values[i]).
943 // The function returns 0 if the upsert is handled in the storage engine.
944 // Otherwise, an error code is returned.
upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)945 int ha_tokudb::upsert(
946 THD* thd,
947 List<Item>& update_fields,
948 List<Item>& update_values) {
949
950 TOKUDB_HANDLER_DBUG_ENTER("");
951 int error = 0;
952
953 if (!tokudb::sysvars::enable_fast_upsert(thd)) {
954 error = ENOTSUP;
955 goto exit;
956 }
957
958 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
959 fprintf(stderr, "upsert\n");
960 dump_item_list("update_fields", update_fields);
961 dump_item_list("update_values", update_values);
962 }
963
964 // not an upsert or something is fishy with the parameters
965 if (update_fields.elements < 1 ||
966 update_fields.elements != update_values.elements) {
967 error = ENOTSUP;
968 goto exit;
969 }
970
971 if (!check_upsert(thd, update_fields, update_values)) {
972 error = HA_ERR_UNSUPPORTED;
973 goto exit;
974 }
975
976 error = send_upsert_message(update_fields, update_values, transaction);
977
978 if (error) {
979 int mapped_error = map_to_handler_error(error);
980 if (mapped_error == error)
981 error = HA_ERR_UNSUPPORTED;
982 }
983
984 exit:
985
986 if (error != 0 && error != ENOTSUP)
987 print_error(error, MYF(0));
988
989 TOKUDB_HANDLER_DBUG_RETURN(error);
990 }
991
992 // Check if an upsert can be handled by this storage engine.
993 // Return true if it can.
check_upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)994 bool ha_tokudb::check_upsert(
995 THD* thd,
996 List<Item>& update_fields,
997 List<Item>& update_values) {
998
999 if (!transaction)
1000 return false;
1001
1002 // avoid strict mode arithmetic overflow issues
1003 if (is_strict_mode(thd))
1004 return false;
1005
1006 // no triggers
1007 if (table->triggers)
1008 return false;
1009
1010 // primary key must exist
1011 if (table->s->primary_key >= table->s->keys)
1012 return false;
1013
1014 // no secondary keys
1015 if (table->s->keys > 1)
1016 return false;
1017
1018 // no binlog
1019 if (mysql_bin_log.is_open() &&
1020 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
1021 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
1022 return false;
1023
1024 if (!check_all_update_expressions(
1025 update_fields,
1026 update_values,
1027 table,
1028 true))
1029 return false;
1030
1031 return true;
1032 }
1033
1034 // Generate an upsert message and send it into the primary tree.
1035 // Return 0 if successful.
send_upsert_message(List<Item> & update_fields,List<Item> & update_values,DB_TXN * txn)1036 int ha_tokudb::send_upsert_message(
1037 List<Item>& update_fields,
1038 List<Item>& update_values,
1039 DB_TXN* txn) {
1040 int error = 0;
1041
1042 // generate primary key
1043 DBT key_dbt;
1044 bool has_null;
1045 create_dbt_key_from_table(
1046 &key_dbt,
1047 primary_key,
1048 primary_key_buff,
1049 table->record[0],
1050 &has_null);
1051
1052 // generate packed row
1053 DBT row;
1054 error = pack_row(&row, (const uchar *) table->record[0], primary_key);
1055 if (error)
1056 return error;
1057
1058 tokudb::buffer update_message;
1059
1060 // append the operation
1061 uint8_t op = UPDATE_OP_UPSERT_2;
1062 update_message.append(&op, sizeof op);
1063
1064 // append the row
1065 update_message.append_ui<uint32_t>(row.size);
1066 update_message.append(row.data, row.size);
1067
1068 uint32_t num_updates = update_fields.elements;
1069 uint num_varchars = 0, num_blobs = 0;
1070 if (1) {
1071 List_iterator<Item> lhs_i(update_fields);
1072 Item* lhs_item;
1073 while ((lhs_item = lhs_i++)) {
1074 if (lhs_item == NULL)
1075 break;
1076 Field* lhs_field = find_field_by_name(table, lhs_item);
1077 assert_always(lhs_field); // we found it before, so this should work
1078 count_update_types(lhs_field, &num_varchars, &num_blobs);
1079 }
1080 if (num_varchars > 0 || num_blobs > 0)
1081 num_updates++;
1082 if (num_blobs > 0)
1083 num_updates++;
1084 }
1085
1086 // append the updates
1087 update_message.append_ui<uint32_t>(num_updates);
1088
1089 if (num_varchars > 0 || num_blobs > 0)
1090 marshall_varchar_descriptor(
1091 update_message,
1092 table, &share->kc_info,
1093 table->s->primary_key);
1094 if (num_blobs > 0)
1095 marshall_blobs_descriptor(update_message, table, &share->kc_info);
1096
1097 List_iterator<Item> lhs_i(update_fields);
1098 List_iterator<Item> rhs_i(update_values);
1099 while (1) {
1100 Item* lhs_item = lhs_i++;
1101 if (lhs_item == NULL)
1102 break;
1103 Item* rhs_item = rhs_i++;
1104 assert_always(rhs_item != NULL);
1105 marshall_update(update_message, lhs_item, rhs_item, table, share);
1106 }
1107
1108 rwlock_t_lock_read(share->_num_DBs_lock);
1109
1110 // hot index in progress
1111 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
1112 error = ENOTSUP; // run on the slow path
1113 } else {
1114 // send the upsert message
1115 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
1116 update_dbt.data = update_message.data();
1117 update_dbt.size = update_message.size();
1118 error =
1119 share->key_file[primary_key]->update(
1120 share->key_file[primary_key],
1121 txn,
1122 &key_dbt,
1123 &update_dbt,
1124 0);
1125 }
1126
1127 share->_num_DBs_lock.unlock();
1128
1129 return error;
1130 }
1131 #endif // defined(TOKU_INCLUDE_UPSERT) && TOKU_INCLUDE_UPSERT
1132