1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDBis is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22 ======= */
23
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26 // Point updates and upserts
27
28 // Restrictions:
29 // No triggers
30 // Statement or mixed replication
31 // Primary key must be defined
32 // Simple and compound primary key
33 // Int, char and varchar primary key types
34 // No updates on fields that are part of any key
35 // No clustering keys
36 // Integer and char field updates
37 // Update expressions:
38 // x = constant
39 // x = x + constant
40 // x = x - constant
41 // x = if (x=0,0,x-1)
42 // x = x + values(x)
43 // Session variable disables slow updates and slow upserts
44
45 // Future features:
46 // Support more primary key types
47 // Force statement logging for fast updates
48 // Support clustering keys using broadcast updates
49 // Support primary key ranges using multicast messages
50 // Support more complicated update expressions
51 // Replace field_offset
52
53 // Debug function to dump an Item
dump_item(Item * item)54 static void dump_item(Item* item) {
55 fprintf(stderr, "%u", item->type());
56 switch (item->type()) {
57 case Item::FUNC_ITEM: {
58 Item_func* func = static_cast<Item_func*>(item);
59 uint n = func->argument_count();
60 Item** arguments = func->arguments();
61 fprintf(
62 stderr,
63 ":func=%u,%s,%u(",
64 func->functype(),
65 func->func_name(),
66 n);
67 for (uint i = 0; i < n ; i++) {
68 dump_item(arguments[i]);
69 if (i < n-1)
70 fprintf(stderr,",");
71 }
72 fprintf(stderr, ")");
73 break;
74 }
75 case Item::INT_ITEM: {
76 Item_int* int_item = static_cast<Item_int*>(item);
77 fprintf(stderr, ":int=%lld", int_item->val_int());
78 break;
79 }
80 case Item::STRING_ITEM: {
81 Item_string* str_item = static_cast<Item_string*>(item);
82 fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
83 break;
84 }
85 case Item::FIELD_ITEM: {
86 Item_field* field_item = static_cast<Item_field*>(item);
87 fprintf(
88 stderr,
89 ":field=%s.%s.%s",
90 field_item->db_name,
91 field_item->table_name,
92 field_item->field_name);
93 break;
94 }
95 case Item::COND_ITEM: {
96 Item_cond* cond_item = static_cast<Item_cond*>(item);
97 fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
98 List_iterator<Item> li(*cond_item->argument_list());
99 Item* list_item;
100 while ((list_item = li++)) {
101 dump_item(list_item);
102 fprintf(stderr, "\n");
103 }
104 fprintf(stderr, ")\n");
105 break;
106 }
107 case Item::INSERT_VALUE_ITEM: {
108 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
109 fprintf(stderr, ":insert_value");
110 dump_item(value_item->arg);
111 break;
112 }
113 default:
114 fprintf(stderr, ":unsupported\n");
115 break;
116 }
117 }
118
119 // Debug function to dump an Item list
dump_item_list(const char * h,List<Item> & l)120 static void dump_item_list(const char* h, List<Item> &l) {
121 fprintf(stderr, "%s elements=%u\n", h, l.elements);
122 List_iterator<Item> li(l);
123 Item* item;
124 while ((item = li++) != NULL) {
125 dump_item(item);
126 fprintf(stderr, "\n");
127 }
128 }
129
130 // Find a Field by its Item name
find_field_by_name(TOKUDB_UNUSED (TABLE * table),Item * item)131 static Field* find_field_by_name(TOKUDB_UNUSED(TABLE* table), Item* item) {
132 if (item->type() != Item::FIELD_ITEM)
133 return NULL;
134 Item_field* field_item = static_cast<Item_field*>(item);
135 #if 0
136 if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
137 strcmp(table->s->table_name.str, field_item->table_name) != 0)
138 return NULL;
139 Field *found_field = NULL;
140 for (uint i = 0; i < table->s->fields; i++) {
141 Field *test_field = table->s->field[i];
142 if (strcmp(field_item->field_name, test_field->field_name) == 0) {
143 found_field = test_field;
144 break;
145 }
146 }
147 return found_field;
148 #else
149 // item->field may be a shortcut instead of the above table lookup
150 return field_item->field;
151 #endif
152 }
153
154 // Return the starting offset in the value for a particular index (selected by idx) of a
155 // particular field (selected by expand_field_num).
156 // This only works for fixed length fields
fixed_field_offset(uint32_t null_bytes,KEY_AND_COL_INFO * kc_info,uint idx,uint expand_field_num)157 static uint32_t fixed_field_offset(
158 uint32_t null_bytes,
159 KEY_AND_COL_INFO* kc_info,
160 uint idx,
161 uint expand_field_num) {
162
163 uint32_t offset = null_bytes;
164 for (uint i = 0; i < expand_field_num; i++) {
165 if (bitmap_is_set(&kc_info->key_filters[idx], i))
166 continue;
167 offset += kc_info->field_lengths[i];
168 }
169 return offset;
170 }
171
var_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint idx,uint field_num)172 static uint32_t var_field_index(
173 TABLE* table,
174 KEY_AND_COL_INFO* kc_info,
175 uint idx,
176 uint field_num) {
177
178 assert_always(field_num < table->s->fields);
179 uint v_index = 0;
180 for (uint i = 0; i < table->s->fields; i++) {
181 if (bitmap_is_set(&kc_info->key_filters[idx], i))
182 continue;
183 if (kc_info->length_bytes[i]) {
184 if (i == field_num)
185 break;
186 v_index++;
187 }
188 }
189 return v_index;
190 }
191
blob_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint field_num)192 static uint32_t blob_field_index(TABLE* table,
193 KEY_AND_COL_INFO* kc_info,
194 uint field_num) {
195 assert_always(field_num < table->s->fields);
196 uint b_index;
197 for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
198 if (kc_info->blob_fields[b_index] == field_num)
199 break;
200 }
201 assert_always(b_index < kc_info->num_blobs);
202 return b_index;
203 }
204
205 // Determine if an update operation can be offloaded to the storage engine.
206 // The update operation consists of a list of update expressions
207 // (fields[i] = values[i]), and a list of where conditions (conds).
208 // The function returns 0 if the update is handled in the storage engine.
209 // Otherwise, an error is returned.
fast_update(THD * thd,List<Item> & update_fields,List<Item> & update_values,Item * conds)210 int ha_tokudb::fast_update(
211 THD* thd,
212 List<Item>& update_fields,
213 List<Item>& update_values,
214 Item* conds) {
215
216 TOKUDB_HANDLER_DBUG_ENTER("");
217 int error = 0;
218
219 if (!tokudb::sysvars::enable_fast_update(thd)) {
220 error = ENOTSUP;
221 goto exit;
222 }
223
224 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
225 dump_item_list("fields", update_fields);
226 dump_item_list("values", update_values);
227 if (conds) {
228 fprintf(stderr, "conds\n"); dump_item(conds); fprintf(stderr, "\n");
229 }
230 }
231
232 if (update_fields.elements < 1 ||
233 update_fields.elements != update_values.elements) {
234 error = ENOTSUP; // something is fishy with the parameters
235 goto exit;
236 }
237
238 if (!check_fast_update(thd, update_fields, update_values, conds)) {
239 error = HA_ERR_UNSUPPORTED;
240 goto exit;
241 }
242
243 error = send_update_message(
244 update_fields, update_values, conds, transaction);
245
246 if (error) {
247 int mapped_error = map_to_handler_error(error);
248 if (mapped_error == error)
249 error = HA_ERR_UNSUPPORTED;
250 }
251
252 exit:
253 if (error != 0 && error != ENOTSUP)
254 print_error(error, MYF(0));
255
256 TOKUDB_HANDLER_DBUG_RETURN(error);
257 }
258
259 // Return true if an expression is a simple int expression or a simple function
260 // of +- int expression.
check_int_result(Item * item)261 static bool check_int_result(Item* item) {
262 Item::Type t = item->type();
263 if (t == Item::INT_ITEM) {
264 return true;
265 } else if (t == Item::FUNC_ITEM) {
266 Item_func* item_func = static_cast<Item_func*>(item);
267 if (strcmp(item_func->func_name(), "+") != 0 &&
268 strcmp(item_func->func_name(), "-") != 0)
269 return false;
270 if (item_func->argument_count() != 1)
271 return false;
272 Item** arguments = item_func->arguments();
273 if (arguments[0]->type() != Item::INT_ITEM)
274 return false;
275 return true;
276 } else
277 return false;
278 }
279
280 // check that an item is an insert value item with the same field name
check_insert_value(Item * item,const char * field_name)281 static bool check_insert_value(Item* item, const char* field_name) {
282 if (item->type() != Item::INSERT_VALUE_ITEM)
283 return false;
284 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
285 if (value_item->arg->type() != Item::FIELD_ITEM)
286 return false;
287 Item_field* arg = static_cast<Item_field*>(value_item->arg);
288 if (strcmp(field_name, arg->field_name) != 0)
289 return false;
290 return true;
291 }
292
293 // Return true if an expression looks like field_name op constant.
check_x_op_constant(const char * field_name,Item * item,const char * op,Item ** item_constant,bool allow_insert_value)294 static bool check_x_op_constant(
295 const char* field_name,
296 Item* item,
297 const char* op,
298 Item** item_constant,
299 bool allow_insert_value) {
300
301 if (item->type() != Item::FUNC_ITEM)
302 return false;
303 Item_func* item_func = static_cast<Item_func*>(item);
304 if (strcmp(item_func->func_name(), op) != 0)
305 return false;
306 Item** arguments = item_func->arguments();
307 uint n = item_func->argument_count();
308 if (n != 2)
309 return false;
310 if (arguments[0]->type() != Item::FIELD_ITEM)
311 return false;
312 Item_field* arg0 = static_cast<Item_field*>(arguments[0]);
313 if (strcmp(field_name, arg0->field_name) != 0)
314 return false;
315 if (!check_int_result(arguments[1]))
316 if (!(allow_insert_value &&
317 check_insert_value(arguments[1], field_name)))
318 return false;
319 *item_constant = arguments[1];
320 return true;
321 }
322
323 // Return true if an expression looks like field_name = constant
check_x_equal_0(const char * field_name,Item * item)324 static bool check_x_equal_0(const char *field_name, Item *item) {
325 Item* item_constant;
326 if (!check_x_op_constant(field_name, item, "=", &item_constant, false))
327 return false;
328 if (item_constant->type() != Item::INT_ITEM ||
329 item_constant->val_int() != 0)
330 return false;
331 return true;
332 }
333
334 // Return true if an expression looks like fieldname - 1
check_x_minus_1(const char * field_name,Item * item)335 static bool check_x_minus_1(const char* field_name, Item* item) {
336 Item* item_constant;
337 if (!check_x_op_constant(field_name, item, "-", &item_constant, false))
338 return false;
339 if (item_constant->type() != Item::INT_ITEM ||
340 item_constant->val_int() != 1)
341 return false;
342 return true;
343 }
344
345 // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
346 // the field named by fieldname is an unsigned int.
check_decr_floor_expression(Field * lhs_field,Item * item)347 static bool check_decr_floor_expression(Field* lhs_field, Item* item) {
348 if (item->type() != Item::FUNC_ITEM)
349 return false;
350 Item_func* item_func = static_cast<Item_func*>(item);
351 if (strcmp(item_func->func_name(), "if") != 0)
352 return false;
353 Item** arguments = item_func->arguments();
354 uint n = item_func->argument_count();
355 if (n != 3)
356 return false;
357 if (!check_x_equal_0(lhs_field->field_name, arguments[0]))
358 return false;
359 if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
360 return false;
361 if (!check_x_minus_1(lhs_field->field_name, arguments[2]))
362 return false;
363 if (!(lhs_field->flags & UNSIGNED_FLAG))
364 return false;
365 return true;
366 }
367
368 // Check if lhs = rhs expression is simple. Return true if it is.
check_update_expression(Item * lhs_item,Item * rhs_item,TABLE * table,bool allow_insert_value)369 static bool check_update_expression(
370 Item* lhs_item,
371 Item* rhs_item,
372 TABLE* table,
373 bool allow_insert_value) {
374
375 Field* lhs_field = find_field_by_name(table, lhs_item);
376 if (lhs_field == NULL)
377 return false;
378 if (!lhs_field->part_of_key.is_clear_all())
379 return false;
380 enum_field_types lhs_type = lhs_field->type();
381 Item::Type rhs_type = rhs_item->type();
382 switch (lhs_type) {
383 case MYSQL_TYPE_TINY:
384 case MYSQL_TYPE_SHORT:
385 case MYSQL_TYPE_INT24:
386 case MYSQL_TYPE_LONG:
387 case MYSQL_TYPE_LONGLONG:
388 if (check_int_result(rhs_item))
389 return true;
390 Item* item_constant;
391 if (check_x_op_constant(
392 lhs_field->field_name,
393 rhs_item,
394 "+",
395 &item_constant,
396 allow_insert_value))
397 return true;
398 if (check_x_op_constant(
399 lhs_field->field_name,
400 rhs_item,
401 "-",
402 &item_constant,
403 allow_insert_value))
404 return true;
405 if (check_decr_floor_expression(lhs_field, rhs_item))
406 return true;
407 break;
408 case MYSQL_TYPE_STRING:
409 if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM)
410 return true;
411 break;
412 case MYSQL_TYPE_VARCHAR:
413 case MYSQL_TYPE_BLOB:
414 if (rhs_type == Item::STRING_ITEM)
415 return true;
416 break;
417 default:
418 break;
419 }
420 return false;
421 }
422
423 // Check that all update expressions are simple. Return true if they are.
check_all_update_expressions(List<Item> & fields,List<Item> & values,TABLE * table,bool allow_insert_value)424 static bool check_all_update_expressions(
425 List<Item>& fields,
426 List<Item>& values,
427 TABLE* table,
428 bool allow_insert_value) {
429
430 List_iterator<Item> lhs_i(fields);
431 List_iterator<Item> rhs_i(values);
432 while (1) {
433 Item* lhs_item = lhs_i++;
434 if (lhs_item == NULL)
435 break;
436 Item* rhs_item = rhs_i++;
437 assert_always(rhs_item != NULL);
438 if (!check_update_expression(
439 lhs_item,
440 rhs_item,
441 table,
442 allow_insert_value))
443 return false;
444 }
445 return true;
446 }
447
full_field_in_key(TABLE * table,Field * field)448 static bool full_field_in_key(TABLE* table, Field* field) {
449 assert_always(table->s->primary_key < table->s->keys);
450 KEY* key = &table->s->key_info[table->s->primary_key];
451 for (uint i = 0; i < key->user_defined_key_parts; i++) {
452 KEY_PART_INFO* key_part = &key->key_part[i];
453 if (strcmp(field->field_name, key_part->field->field_name) == 0) {
454 return key_part->length == field->field_length;
455 }
456 }
457 return false;
458 }
459
460 // Check that an expression looks like fieldname = constant, fieldname is part
461 // of the primary key, and the named field is an int, char or varchar type.
462 // Return true if it does.
check_pk_field_equal_constant(Item * item,TABLE * table,MY_BITMAP * pk_fields)463 static bool check_pk_field_equal_constant(
464 Item* item,
465 TABLE* table,
466 MY_BITMAP* pk_fields) {
467
468 if (item->type() != Item::FUNC_ITEM)
469 return false;
470 Item_func* func = static_cast<Item_func*>(item);
471 if (strcmp(func->func_name(), "=") != 0)
472 return false;
473 uint n = func->argument_count();
474 if (n != 2)
475 return false;
476 Item** arguments = func->arguments();
477 Field* field = find_field_by_name(table, arguments[0]);
478 if (field == NULL)
479 return false;
480 if (!bitmap_test_and_clear(pk_fields, field->field_index))
481 return false;
482 switch (field->type()) {
483 case MYSQL_TYPE_TINY:
484 case MYSQL_TYPE_SHORT:
485 case MYSQL_TYPE_INT24:
486 case MYSQL_TYPE_LONG:
487 case MYSQL_TYPE_LONGLONG:
488 return arguments[1]->type() == Item::INT_ITEM ||
489 arguments[1]->type() == Item::STRING_ITEM;
490 case MYSQL_TYPE_STRING:
491 case MYSQL_TYPE_VARCHAR:
492 return full_field_in_key(table, field) &&
493 (arguments[1]->type() == Item::INT_ITEM ||
494 arguments[1]->type() == Item::STRING_ITEM);
495 default:
496 return false;
497 }
498 }
499
500 // Check that the where condition covers all of the primary key components
501 // with fieldname = constant expressions. Return true if it does.
check_point_update(Item * conds,TABLE * table)502 static bool check_point_update(Item* conds, TABLE* table) {
503 bool result = false;
504
505 if (conds == NULL)
506 return false; // no where condition on the update
507
508 if (table->s->primary_key >= table->s->keys)
509 return false; // no primary key defined
510
511 // use a bitmap of the primary key fields to keep track of those fields
512 // that are covered by the where conditions
513 MY_BITMAP pk_fields;
514 if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
515 return false;
516 KEY *key = &table->s->key_info[table->s->primary_key];
517 for (uint i = 0; i < key->user_defined_key_parts; i++)
518 bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);
519
520 switch (conds->type()) {
521 case Item::FUNC_ITEM:
522 result = check_pk_field_equal_constant(conds, table, &pk_fields);
523 break;
524 case Item::COND_ITEM: {
525 Item_cond* cond_item = static_cast<Item_cond*>(conds);
526 if (strcmp(cond_item->func_name(), "and") != 0)
527 break;
528 List_iterator<Item> li(*cond_item->argument_list());
529 Item* list_item;
530 result = true;
531 while (result == true && (list_item = li++)) {
532 result = check_pk_field_equal_constant(
533 list_item,
534 table,
535 &pk_fields);
536 }
537 break;
538 }
539 default:
540 break;
541 }
542
543 if (!bitmap_is_clear_all(&pk_fields))
544 result = false;
545 bitmap_free(&pk_fields);
546 return result;
547 }
548
549 // Return true if there are any clustering keys (except the primary).
550 // Precompute this when the table is opened.
clustering_keys_exist(TABLE * table)551 static bool clustering_keys_exist(TABLE *table) {
552 for (uint keynr = 0; keynr < table->s->keys; keynr++) {
553 if (keynr != table->s->primary_key &&
554 key_is_clustering(&table->s->key_info[keynr]))
555 return true;
556 }
557 return false;
558 }
559
560 // Check if an update operation can be handled by this storage engine.
561 // Return true if it can.
check_fast_update(THD * thd,List<Item> & fields,List<Item> & values,Item * conds)562 bool ha_tokudb::check_fast_update(
563 THD* thd,
564 List<Item>& fields,
565 List<Item>& values,
566 Item* conds) {
567
568 if (!transaction)
569 return false;
570
571 // avoid strict mode arithmetic overflow issues
572 if (thd->is_strict_mode())
573 return false;
574
575 // no triggers
576 if (table->triggers)
577 return false;
578
579 // no binlog
580 if (mysql_bin_log.is_open() &&
581 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
582 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
583 return false;
584
585 // no clustering keys (need to broadcast an increment into the clustering
586 // keys since we are selecting with the primary key)
587 if (clustering_keys_exist(table))
588 return false;
589
590 if (!check_all_update_expressions(fields, values, table, false))
591 return false;
592
593 if (!check_point_update(conds, table))
594 return false;
595
596 return true;
597 }
598
marshall_varchar_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info,uint key_num)599 static void marshall_varchar_descriptor(
600 tokudb::buffer& b,
601 TABLE* table,
602 KEY_AND_COL_INFO* kc_info,
603 uint key_num) {
604
605 b.append_ui<uint32_t>('v');
606 b.append_ui<uint32_t>(
607 table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
608 uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
609 b.append_ui<uint32_t>(var_offset_bytes);
610 b.append_ui<uint32_t>(
611 var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
612 }
613
marshall_blobs_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info)614 static void marshall_blobs_descriptor(
615 tokudb::buffer& b,
616 TABLE* table,
617 KEY_AND_COL_INFO* kc_info) {
618
619 b.append_ui<uint32_t>('b');
620 uint32_t n = kc_info->num_blobs;
621 b.append_ui<uint32_t>(n);
622 for (uint i = 0; i < n; i++) {
623 uint blob_field_index = kc_info->blob_fields[i];
624 assert_always(blob_field_index < table->s->fields);
625 uint8_t blob_field_length =
626 table->s->field[blob_field_index]->row_pack_length();
627 b.append(&blob_field_length, sizeof blob_field_length);
628 }
629 }
630
631 static inline uint32_t get_null_bit_position(uint32_t null_bit);
632
633 // evaluate the int value of an item
item_val_int(Item * item)634 static longlong item_val_int(Item* item) {
635 Item::Type t = item->type();
636 if (t == Item::INSERT_VALUE_ITEM) {
637 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
638 return value_item->arg->val_int();
639 } else
640 return item->val_int();
641 }
642
643 // Marshall update operations to a buffer.
marshall_update(tokudb::buffer & b,Item * lhs_item,Item * rhs_item,TABLE * table,TOKUDB_SHARE * share)644 static void marshall_update(
645 tokudb::buffer& b,
646 Item* lhs_item,
647 Item* rhs_item,
648 TABLE* table,
649 TOKUDB_SHARE* share) {
650
651 // figure out the update operation type (again)
652 Field* lhs_field = find_field_by_name(table, lhs_item);
653 assert_always(lhs_field); // we found it before, so this should work
654
655 // compute the update info
656 uint32_t field_type;
657 uint32_t field_null_num = 0;
658 if (lhs_field->real_maybe_null()) {
659 uint32_t field_num = lhs_field->field_index;
660 field_null_num =
661 ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
662 }
663 uint32_t offset;
664 void* v_ptr = NULL;
665 uint32_t v_length;
666 uint32_t update_operation;
667 longlong v_ll;
668 String v_str;
669
670 switch (lhs_field->type()) {
671 case MYSQL_TYPE_TINY:
672 case MYSQL_TYPE_SHORT:
673 case MYSQL_TYPE_INT24:
674 case MYSQL_TYPE_LONG:
675 case MYSQL_TYPE_LONGLONG: {
676 Field_num* lhs_num = static_cast<Field_num*>(lhs_field);
677 field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
678 offset =
679 fixed_field_offset(
680 table->s->null_bytes,
681 &share->kc_info,
682 table->s->primary_key,
683 lhs_field->field_index);
684 switch (rhs_item->type()) {
685 case Item::INT_ITEM: {
686 update_operation = '=';
687 v_ll = rhs_item->val_int();
688 v_length = lhs_field->pack_length();
689 v_ptr = &v_ll;
690 break;
691 }
692 case Item::FUNC_ITEM: {
693 Item_func* rhs_func = static_cast<Item_func*>(rhs_item);
694 Item** arguments = rhs_func->arguments();
695 // we only support one if function for now, and it is a
696 // decrement with floor.
697 if (strcmp(rhs_func->func_name(), "if") == 0) {
698 update_operation = '-';
699 v_ll = 1;
700 } else if (rhs_func->argument_count() == 1) {
701 update_operation = '=';
702 v_ll = rhs_func->val_int();
703 } else {
704 update_operation = rhs_func->func_name()[0];
705 v_ll = item_val_int(arguments[1]);
706 }
707 v_length = lhs_field->pack_length();
708 v_ptr = &v_ll;
709 break;
710 }
711 default:
712 assert_unreachable();
713 }
714 break;
715 }
716 case MYSQL_TYPE_STRING: {
717 update_operation = '=';
718 field_type =
719 lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
720 offset =
721 fixed_field_offset(
722 table->s->null_bytes,
723 &share->kc_info,
724 table->s->primary_key,
725 lhs_field->field_index);
726 v_str = *rhs_item->val_str(&v_str);
727 v_length = v_str.length();
728 if (v_length >= lhs_field->pack_length()) {
729 v_length = lhs_field->pack_length();
730 v_str.length(v_length); // truncate
731 } else {
732 v_length = lhs_field->pack_length();
733 uchar pad_char =
734 lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
735 v_str.fill(lhs_field->pack_length(), pad_char); // pad
736 }
737 v_ptr = v_str.c_ptr();
738 break;
739 }
740 case MYSQL_TYPE_VARCHAR: {
741 update_operation = '=';
742 field_type =
743 lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
744 offset =
745 var_field_index(
746 table,
747 &share->kc_info,
748 table->s->primary_key,
749 lhs_field->field_index);
750 v_str = *rhs_item->val_str(&v_str);
751 v_length = v_str.length();
752 if (v_length >= lhs_field->row_pack_length()) {
753 v_length = lhs_field->row_pack_length();
754 v_str.length(v_length); // truncate
755 }
756 v_ptr = v_str.c_ptr();
757 break;
758 }
759 case MYSQL_TYPE_BLOB: {
760 update_operation = '=';
761 field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
762 offset =
763 blob_field_index(table, &share->kc_info, lhs_field->field_index);
764 v_str = *rhs_item->val_str(&v_str);
765 v_length = v_str.length();
766 if (v_length >= lhs_field->max_data_length()) {
767 v_length = lhs_field->max_data_length();
768 v_str.length(v_length); // truncate
769 }
770 v_ptr = v_str.c_ptr();
771 break;
772 }
773 default:
774 assert_unreachable();
775 }
776
777 // marshall the update fields into the buffer
778 b.append_ui<uint32_t>(update_operation);
779 b.append_ui<uint32_t>(field_type);
780 b.append_ui<uint32_t>(field_null_num);
781 b.append_ui<uint32_t>(offset);
782 b.append_ui<uint32_t>(v_length);
783 b.append(v_ptr, v_length);
784 }
785
786 // Save an item's value into the appropriate field. Return 0 if successful.
save_in_field(Item * item,TABLE * table)787 static int save_in_field(Item* item, TABLE* table) {
788 assert_always(item->type() == Item::FUNC_ITEM);
789 Item_func *func = static_cast<Item_func*>(item);
790 assert_always(strcmp(func->func_name(), "=") == 0);
791 uint n = func->argument_count();
792 assert_always(n == 2);
793 Item **arguments = func->arguments();
794 assert_always(arguments[0]->type() == Item::FIELD_ITEM);
795 Item_field *field_item = static_cast<Item_field*>(arguments[0]);
796 my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
797 int error = arguments[1]->save_in_field(field_item->field, 0);
798 dbug_tmp_restore_column_map(table->write_set, old_map);
799 return error;
800 }
801
count_update_types(Field * lhs_field,uint * num_varchars,uint * num_blobs)802 static void count_update_types(
803 Field* lhs_field,
804 uint* num_varchars,
805 uint* num_blobs) {
806
807 switch (lhs_field->type()) {
808 case MYSQL_TYPE_VARCHAR:
809 *num_varchars += 1;
810 break;
811 case MYSQL_TYPE_BLOB:
812 *num_blobs += 1;
813 break;
814 default:
815 break;
816 }
817 }
818
819 // Generate an update message for an update operation and send it into the
820 // primary tree. Return 0 if successful.
send_update_message(List<Item> & update_fields,List<Item> & update_values,Item * conds,DB_TXN * txn)821 int ha_tokudb::send_update_message(
822 List<Item>& update_fields,
823 List<Item>& update_values,
824 Item* conds,
825 DB_TXN* txn) {
826
827 int error;
828
829 // Save the primary key from the where conditions
830 Item::Type t = conds->type();
831 if (t == Item::FUNC_ITEM) {
832 error = save_in_field(conds, table);
833 } else if (t == Item::COND_ITEM) {
834 Item_cond* cond_item = static_cast<Item_cond*>(conds);
835 List_iterator<Item> li(*cond_item->argument_list());
836 Item* list_item;
837 error = 0;
838 while (error == 0 && (list_item = li++)) {
839 error = save_in_field(list_item, table);
840 }
841 } else {
842 assert_unreachable();
843 }
844 if (error)
845 return error;
846
847 // put the primary key into key_buff and wrap it with key_dbt
848 DBT key_dbt;
849 bool has_null;
850 create_dbt_key_from_table(
851 &key_dbt,
852 primary_key,
853 key_buff,
854 table->record[0],
855 &has_null);
856
857 // construct the update message
858 tokudb::buffer update_message;
859
860 uint8_t op = UPDATE_OP_UPDATE_2;
861 update_message.append(&op, sizeof op);
862
863 uint32_t num_updates = update_fields.elements;
864 uint num_varchars = 0, num_blobs = 0;
865 if (1) {
866 List_iterator<Item> lhs_i(update_fields);
867 Item* lhs_item;
868 while ((lhs_item = lhs_i++)) {
869 if (lhs_item == NULL)
870 break;
871 Field* lhs_field = find_field_by_name(table, lhs_item);
872 assert_always(lhs_field); // we found it before, so this should work
873 count_update_types(lhs_field, &num_varchars, &num_blobs);
874 }
875 if (num_varchars > 0 || num_blobs > 0)
876 num_updates++;
877 if (num_blobs > 0)
878 num_updates++;
879 }
880
881 // append the updates
882 update_message.append_ui<uint32_t>(num_updates);
883
884 if (num_varchars > 0 || num_blobs > 0)
885 marshall_varchar_descriptor(
886 update_message,
887 table,
888 &share->kc_info,
889 table->s->primary_key);
890 if (num_blobs > 0)
891 marshall_blobs_descriptor(update_message, table, &share->kc_info);
892
893 List_iterator<Item> lhs_i(update_fields);
894 List_iterator<Item> rhs_i(update_values);
895 while (error == 0) {
896 Item* lhs_item = lhs_i++;
897 if (lhs_item == NULL)
898 break;
899 Item* rhs_item = rhs_i++;
900 assert_always(rhs_item != NULL);
901 marshall_update(update_message, lhs_item, rhs_item, table, share);
902 }
903
904 rwlock_t_lock_read(share->_num_DBs_lock);
905
906 // hot index in progress
907 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
908 error = ENOTSUP; // run on the slow path
909 } else {
910 // send the update message
911 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
912 update_dbt.data = update_message.data();
913 update_dbt.size = update_message.size();
914 error =
915 share->key_file[primary_key]->update(
916 share->key_file[primary_key],
917 txn,
918 &key_dbt,
919 &update_dbt,
920 0);
921 }
922
923 share->_num_DBs_lock.unlock();
924
925 return error;
926 }
927
928 // Determine if an upsert operation can be offloaded to the storage engine.
929 // An upsert consists of a row and a list of update expressions
930 // (update_fields[i] = update_values[i]).
931 // The function returns 0 if the upsert is handled in the storage engine.
932 // Otherwise, an error code is returned.
upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)933 int ha_tokudb::upsert(
934 THD* thd,
935 List<Item>& update_fields,
936 List<Item>& update_values) {
937
938 TOKUDB_HANDLER_DBUG_ENTER("");
939 int error = 0;
940
941 if (!tokudb::sysvars::enable_fast_upsert(thd)) {
942 error = ENOTSUP;
943 goto exit;
944 }
945
946 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
947 fprintf(stderr, "upsert\n");
948 dump_item_list("update_fields", update_fields);
949 dump_item_list("update_values", update_values);
950 }
951
952 // not an upsert or something is fishy with the parameters
953 if (update_fields.elements < 1 ||
954 update_fields.elements != update_values.elements) {
955 error = ENOTSUP;
956 goto exit;
957 }
958
959 if (!check_upsert(thd, update_fields, update_values)) {
960 error = HA_ERR_UNSUPPORTED;
961 goto exit;
962 }
963
964 error = send_upsert_message(update_fields, update_values, transaction);
965
966 if (error) {
967 int mapped_error = map_to_handler_error(error);
968 if (mapped_error == error)
969 error = HA_ERR_UNSUPPORTED;
970 }
971
972 exit:
973 if (error != 0 && error != ENOTSUP)
974 print_error(error, MYF(0));
975
976 TOKUDB_HANDLER_DBUG_RETURN(error);
977 }
978
979 // Check if an upsert can be handled by this storage engine.
980 // Return true if it can.
check_upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)981 bool ha_tokudb::check_upsert(
982 THD* thd,
983 List<Item>& update_fields,
984 List<Item>& update_values) {
985
986 if (!transaction)
987 return false;
988
989 // avoid strict mode arithmetic overflow issues
990 if (thd->is_strict_mode())
991 return false;
992
993 // no triggers
994 if (table->triggers)
995 return false;
996
997 // primary key must exist
998 if (table->s->primary_key >= table->s->keys)
999 return false;
1000
1001 // no secondary keys
1002 if (table->s->keys > 1)
1003 return false;
1004
1005 // no binlog
1006 if (mysql_bin_log.is_open() &&
1007 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
1008 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
1009 return false;
1010
1011 if (!check_all_update_expressions(
1012 update_fields,
1013 update_values,
1014 table,
1015 true))
1016 return false;
1017
1018 return true;
1019 }
1020
1021 // Generate an upsert message and send it into the primary tree.
1022 // Return 0 if successful.
send_upsert_message(List<Item> & update_fields,List<Item> & update_values,DB_TXN * txn)1023 int ha_tokudb::send_upsert_message(
1024 List<Item>& update_fields,
1025 List<Item>& update_values,
1026 DB_TXN* txn) {
1027 int error = 0;
1028
1029 // generate primary key
1030 DBT key_dbt;
1031 bool has_null;
1032 create_dbt_key_from_table(
1033 &key_dbt,
1034 primary_key,
1035 primary_key_buff,
1036 table->record[0],
1037 &has_null);
1038
1039 // generate packed row
1040 DBT row;
1041 error = pack_row(&row, (const uchar *) table->record[0], primary_key);
1042 if (error)
1043 return error;
1044
1045 tokudb::buffer update_message;
1046
1047 // append the operation
1048 uint8_t op = UPDATE_OP_UPSERT_2;
1049 update_message.append(&op, sizeof op);
1050
1051 // append the row
1052 update_message.append_ui<uint32_t>(row.size);
1053 update_message.append(row.data, row.size);
1054
1055 uint32_t num_updates = update_fields.elements;
1056 uint num_varchars = 0, num_blobs = 0;
1057 if (1) {
1058 List_iterator<Item> lhs_i(update_fields);
1059 Item* lhs_item;
1060 while ((lhs_item = lhs_i++)) {
1061 if (lhs_item == NULL)
1062 break;
1063 Field* lhs_field = find_field_by_name(table, lhs_item);
1064 assert_always(lhs_field); // we found it before, so this should work
1065 count_update_types(lhs_field, &num_varchars, &num_blobs);
1066 }
1067 if (num_varchars > 0 || num_blobs > 0)
1068 num_updates++;
1069 if (num_blobs > 0)
1070 num_updates++;
1071 }
1072
1073 // append the updates
1074 update_message.append_ui<uint32_t>(num_updates);
1075
1076 if (num_varchars > 0 || num_blobs > 0)
1077 marshall_varchar_descriptor(
1078 update_message,
1079 table, &share->kc_info,
1080 table->s->primary_key);
1081 if (num_blobs > 0)
1082 marshall_blobs_descriptor(update_message, table, &share->kc_info);
1083
1084 List_iterator<Item> lhs_i(update_fields);
1085 List_iterator<Item> rhs_i(update_values);
1086 while (1) {
1087 Item* lhs_item = lhs_i++;
1088 if (lhs_item == NULL)
1089 break;
1090 Item* rhs_item = rhs_i++;
1091 assert_always(rhs_item != NULL);
1092 marshall_update(update_message, lhs_item, rhs_item, table, share);
1093 }
1094
1095 rwlock_t_lock_read(share->_num_DBs_lock);
1096
1097 // hot index in progress
1098 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
1099 error = ENOTSUP; // run on the slow path
1100 } else {
1101 // send the upsert message
1102 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
1103 update_dbt.data = update_message.data();
1104 update_dbt.size = update_message.size();
1105 error =
1106 share->key_file[primary_key]->update(
1107 share->key_file[primary_key],
1108 txn,
1109 &key_dbt,
1110 &update_dbt,
1111 0);
1112 }
1113
1114 share->_num_DBs_lock.unlock();
1115
1116 return error;
1117 }
1118