1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of TokuDB
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDBis is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22 ======= */
23
24 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26 // Point updates and upserts
27
28 // Restrictions:
29 // No triggers
30 // Statement or mixed replication
31 // Primary key must be defined
32 // Simple and compound primary key
33 // Int, char and varchar primary key types
34 // No updates on fields that are part of any key
35 // No clustering keys
36 // Integer and char field updates
37 // Update expressions:
38 // x = constant
39 // x = x + constant
40 // x = x - constant
41 // x = if (x=0,0,x-1)
42 // x = x + values(x)
43 // Session variable disables slow updates and slow upserts
44
45 // Future features:
46 // Support more primary key types
47 // Force statement logging for fast updates
48 // Support clustering keys using broadcast updates
49 // Support primary key ranges using multicast messages
50 // Support more complicated update expressions
51 // Replace field_offset
52
53 // Debug function to dump an Item
dump_item(Item * item)54 static void dump_item(Item* item) {
55 fprintf(stderr, "%u", item->type());
56 switch (item->type()) {
57 case Item::FUNC_ITEM: {
58 Item_func* func = static_cast<Item_func*>(item);
59 uint n = func->argument_count();
60 Item** arguments = func->arguments();
61 fprintf(
62 stderr,
63 ":func=%u,%s,%u(",
64 func->functype(),
65 func->func_name(),
66 n);
67 for (uint i = 0; i < n ; i++) {
68 dump_item(arguments[i]);
69 if (i < n-1)
70 fprintf(stderr,",");
71 }
72 fprintf(stderr, ")");
73 break;
74 }
75 case Item::INT_ITEM: {
76 Item_int* int_item = static_cast<Item_int*>(item);
77 fprintf(stderr, ":int=%lld", int_item->val_int());
78 break;
79 }
80 case Item::STRING_ITEM: {
81 Item_string* str_item = static_cast<Item_string*>(item);
82 fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
83 break;
84 }
85 case Item::FIELD_ITEM: {
86 Item_field* field_item = static_cast<Item_field*>(item);
87 fprintf(
88 stderr,
89 ":field=%s.%s.%s",
90 field_item->db_name,
91 field_item->table_name,
92 field_item->field_name);
93 break;
94 }
95 case Item::COND_ITEM: {
96 Item_cond* cond_item = static_cast<Item_cond*>(item);
97 fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
98 List_iterator<Item> li(*cond_item->argument_list());
99 Item* list_item;
100 while ((list_item = li++)) {
101 dump_item(list_item);
102 fprintf(stderr, "\n");
103 }
104 fprintf(stderr, ")\n");
105 break;
106 }
107 case Item::INSERT_VALUE_ITEM: {
108 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
109 fprintf(stderr, ":insert_value");
110 dump_item(value_item->arg);
111 break;
112 }
113 default:
114 fprintf(stderr, ":unsupported\n");
115 break;
116 }
117 }
118
119 // Debug function to dump an Item list
dump_item_list(const char * h,List<Item> & l)120 static void dump_item_list(const char* h, List<Item> &l) {
121 fprintf(stderr, "%s elements=%u\n", h, l.elements);
122 List_iterator<Item> li(l);
123 Item* item;
124 while ((item = li++) != NULL) {
125 dump_item(item);
126 fprintf(stderr, "\n");
127 }
128 }
129
130 // Find a Field by its Item name
find_field_by_name(TOKUDB_UNUSED (TABLE * table),Item * item)131 static Field* find_field_by_name(TOKUDB_UNUSED(TABLE* table), Item* item) {
132 if (item->type() != Item::FIELD_ITEM)
133 return NULL;
134 Item_field* field_item = static_cast<Item_field*>(item);
135 #if 0
136 if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
137 strcmp(table->s->table_name.str, field_item->table_name) != 0)
138 return NULL;
139 Field *found_field = NULL;
140 for (uint i = 0; i < table->s->fields; i++) {
141 Field *test_field = table->s->field[i];
142 if (strcmp(field_item->field_name, test_field->field_name) == 0) {
143 found_field = test_field;
144 break;
145 }
146 }
147 return found_field;
148 #else
149 // item->field may be a shortcut instead of the above table lookup
150 return field_item->field;
151 #endif
152 }
153
154 // Return the starting offset in the value for a particular index (selected by idx) of a
155 // particular field (selected by expand_field_num).
156 // This only works for fixed length fields
fixed_field_offset(uint32_t null_bytes,KEY_AND_COL_INFO * kc_info,uint idx,uint expand_field_num)157 static uint32_t fixed_field_offset(
158 uint32_t null_bytes,
159 KEY_AND_COL_INFO* kc_info,
160 uint idx,
161 uint expand_field_num) {
162
163 uint32_t offset = null_bytes;
164 for (uint i = 0; i < expand_field_num; i++) {
165 if (bitmap_is_set(&kc_info->key_filters[idx], i))
166 continue;
167 offset += kc_info->field_lengths[i];
168 }
169 return offset;
170 }
171
var_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint idx,uint field_num)172 static uint32_t var_field_index(
173 TABLE* table,
174 KEY_AND_COL_INFO* kc_info,
175 uint idx,
176 uint field_num) {
177
178 assert_always(field_num < table->s->fields);
179 uint v_index = 0;
180 for (uint i = 0; i < table->s->fields; i++) {
181 if (bitmap_is_set(&kc_info->key_filters[idx], i))
182 continue;
183 if (kc_info->length_bytes[i]) {
184 if (i == field_num)
185 break;
186 v_index++;
187 }
188 }
189 return v_index;
190 }
191
blob_field_index(TABLE * table,KEY_AND_COL_INFO * kc_info,uint field_num)192 static uint32_t blob_field_index(TABLE* table,
193 KEY_AND_COL_INFO* kc_info,
194 uint field_num) {
195 assert_always(field_num < table->s->fields);
196 uint b_index;
197 for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
198 if (kc_info->blob_fields[b_index] == field_num)
199 break;
200 }
201 assert_always(b_index < kc_info->num_blobs);
202 return b_index;
203 }
204
205 // Determine if an update operation can be offloaded to the storage engine.
206 // The update operation consists of a list of update expressions
207 // (fields[i] = values[i]), and a list of where conditions (conds).
208 // The function returns 0 if the update is handled in the storage engine.
209 // Otherwise, an error is returned.
fast_update(THD * thd,List<Item> & update_fields,List<Item> & update_values,Item * conds)210 int ha_tokudb::fast_update(
211 THD* thd,
212 List<Item>& update_fields,
213 List<Item>& update_values,
214 Item* conds) {
215
216 TOKUDB_HANDLER_DBUG_ENTER("");
217 int error = 0;
218
219 if (!tokudb::sysvars::enable_fast_update(thd)) {
220 error = ENOTSUP;
221 goto exit;
222 }
223
224 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
225 dump_item_list("fields", update_fields);
226 dump_item_list("values", update_values);
227 if (conds) {
228 fprintf(stderr, "conds\n");
229 dump_item(conds);
230 fprintf(stderr, "\n");
231 }
232 }
233
234 if (update_fields.elements < 1 ||
235 update_fields.elements != update_values.elements) {
236 error = ENOTSUP; // something is fishy with the parameters
237 goto exit;
238 }
239
240 if (!check_fast_update(thd, update_fields, update_values, conds)) {
241 error = HA_ERR_UNSUPPORTED;
242 goto exit;
243 }
244
245 error = send_update_message(
246 update_fields, update_values, conds, transaction);
247
248 if (error) {
249 int mapped_error = map_to_handler_error(error);
250 if (mapped_error == error)
251 error = HA_ERR_UNSUPPORTED;
252 }
253
254 exit:
255
256 if (error != 0 && error != ENOTSUP)
257 print_error(error, MYF(0));
258
259 TOKUDB_HANDLER_DBUG_RETURN(error);
260 }
261
262 // Return true if an expression is a simple int expression or a simple function
263 // of +- int expression.
check_int_result(Item * item)264 static bool check_int_result(Item* item) {
265 Item::Type t = item->type();
266 if (t == Item::INT_ITEM) {
267 return true;
268 } else if (t == Item::FUNC_ITEM) {
269 Item_func* item_func = static_cast<Item_func*>(item);
270 if (strcmp(item_func->func_name(), "+") != 0 &&
271 strcmp(item_func->func_name(), "-") != 0)
272 return false;
273 if (item_func->argument_count() != 1)
274 return false;
275 Item** arguments = item_func->arguments();
276 if (arguments[0]->type() != Item::INT_ITEM)
277 return false;
278 return true;
279 } else
280 return false;
281 }
282
283 // check that an item is an insert value item with the same field name
check_insert_value(Item * item,const char * field_name)284 static bool check_insert_value(Item* item, const char* field_name) {
285 if (item->type() != Item::INSERT_VALUE_ITEM)
286 return false;
287 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
288 if (value_item->arg->type() != Item::FIELD_ITEM)
289 return false;
290 Item_field* arg = static_cast<Item_field*>(value_item->arg);
291 if (strcmp(field_name, arg->field_name) != 0)
292 return false;
293 return true;
294 }
295
296 // Return true if an expression looks like field_name op constant.
check_x_op_constant(const char * field_name,Item * item,const char * op,Item ** item_constant,bool allow_insert_value)297 static bool check_x_op_constant(
298 const char* field_name,
299 Item* item,
300 const char* op,
301 Item** item_constant,
302 bool allow_insert_value) {
303
304 if (item->type() != Item::FUNC_ITEM)
305 return false;
306 Item_func* item_func = static_cast<Item_func*>(item);
307 if (strcmp(item_func->func_name(), op) != 0)
308 return false;
309 Item** arguments = item_func->arguments();
310 uint n = item_func->argument_count();
311 if (n != 2)
312 return false;
313 if (arguments[0]->type() != Item::FIELD_ITEM)
314 return false;
315 Item_field* arg0 = static_cast<Item_field*>(arguments[0]);
316 if (strcmp(field_name, arg0->field_name) != 0)
317 return false;
318 if (!check_int_result(arguments[1]))
319 if (!(allow_insert_value &&
320 check_insert_value(arguments[1], field_name)))
321 return false;
322 *item_constant = arguments[1];
323 return true;
324 }
325
326 // Return true if an expression looks like field_name = constant
check_x_equal_0(const char * field_name,Item * item)327 static bool check_x_equal_0(const char *field_name, Item *item) {
328 Item* item_constant;
329 if (!check_x_op_constant(field_name, item, "=", &item_constant, false))
330 return false;
331 if (item_constant->type() != Item::INT_ITEM ||
332 item_constant->val_int() != 0)
333 return false;
334 return true;
335 }
336
337 // Return true if an expression looks like fieldname - 1
check_x_minus_1(const char * field_name,Item * item)338 static bool check_x_minus_1(const char* field_name, Item* item) {
339 Item* item_constant;
340 if (!check_x_op_constant(field_name, item, "-", &item_constant, false))
341 return false;
342 if (item_constant->type() != Item::INT_ITEM ||
343 item_constant->val_int() != 1)
344 return false;
345 return true;
346 }
347
348 // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
349 // the field named by fieldname is an unsigned int.
check_decr_floor_expression(Field * lhs_field,Item * item)350 static bool check_decr_floor_expression(Field* lhs_field, Item* item) {
351 if (item->type() != Item::FUNC_ITEM)
352 return false;
353 Item_func* item_func = static_cast<Item_func*>(item);
354 if (strcmp(item_func->func_name(), "if") != 0)
355 return false;
356 Item** arguments = item_func->arguments();
357 uint n = item_func->argument_count();
358 if (n != 3)
359 return false;
360 if (!check_x_equal_0(lhs_field->field_name, arguments[0]))
361 return false;
362 if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
363 return false;
364 if (!check_x_minus_1(lhs_field->field_name, arguments[2]))
365 return false;
366 if (!(lhs_field->flags & UNSIGNED_FLAG))
367 return false;
368 return true;
369 }
370
371 // Check if lhs = rhs expression is simple. Return true if it is.
check_update_expression(Item * lhs_item,Item * rhs_item,TABLE * table,bool allow_insert_value)372 static bool check_update_expression(
373 Item* lhs_item,
374 Item* rhs_item,
375 TABLE* table,
376 bool allow_insert_value) {
377
378 Field* lhs_field = find_field_by_name(table, lhs_item);
379 if (lhs_field == NULL)
380 return false;
381 if (!lhs_field->part_of_key.is_clear_all())
382 return false;
383 enum_field_types lhs_type = lhs_field->type();
384 Item::Type rhs_type = rhs_item->type();
385 switch (lhs_type) {
386 case MYSQL_TYPE_TINY:
387 case MYSQL_TYPE_SHORT:
388 case MYSQL_TYPE_INT24:
389 case MYSQL_TYPE_LONG:
390 case MYSQL_TYPE_LONGLONG:
391 if (check_int_result(rhs_item))
392 return true;
393 Item* item_constant;
394 if (check_x_op_constant(
395 lhs_field->field_name,
396 rhs_item,
397 "+",
398 &item_constant,
399 allow_insert_value))
400 return true;
401 if (check_x_op_constant(
402 lhs_field->field_name,
403 rhs_item,
404 "-",
405 &item_constant,
406 allow_insert_value))
407 return true;
408 if (check_decr_floor_expression(lhs_field, rhs_item))
409 return true;
410 break;
411 case MYSQL_TYPE_STRING:
412 if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM)
413 return true;
414 break;
415 case MYSQL_TYPE_VARCHAR:
416 case MYSQL_TYPE_BLOB:
417 if (rhs_type == Item::STRING_ITEM)
418 return true;
419 break;
420 default:
421 break;
422 }
423 return false;
424 }
425
426 // Check that all update expressions are simple. Return true if they are.
check_all_update_expressions(List<Item> & fields,List<Item> & values,TABLE * table,bool allow_insert_value)427 static bool check_all_update_expressions(
428 List<Item>& fields,
429 List<Item>& values,
430 TABLE* table,
431 bool allow_insert_value) {
432
433 List_iterator<Item> lhs_i(fields);
434 List_iterator<Item> rhs_i(values);
435 while (1) {
436 Item* lhs_item = lhs_i++;
437 if (lhs_item == NULL)
438 break;
439 Item* rhs_item = rhs_i++;
440 assert_always(rhs_item != NULL);
441 if (!check_update_expression(
442 lhs_item,
443 rhs_item,
444 table,
445 allow_insert_value))
446 return false;
447 }
448 return true;
449 }
450
full_field_in_key(TABLE * table,Field * field)451 static bool full_field_in_key(TABLE* table, Field* field) {
452 assert_always(table->s->primary_key < table->s->keys);
453 KEY* key = &table->s->key_info[table->s->primary_key];
454 for (uint i = 0; i < key->user_defined_key_parts; i++) {
455 KEY_PART_INFO* key_part = &key->key_part[i];
456 if (strcmp(field->field_name, key_part->field->field_name) == 0) {
457 return key_part->length == field->field_length;
458 }
459 }
460 return false;
461 }
462
463 // Check that an expression looks like fieldname = constant, fieldname is part
464 // of the primary key, and the named field is an int, char or varchar type.
465 // Return true if it does.
check_pk_field_equal_constant(Item * item,TABLE * table,MY_BITMAP * pk_fields)466 static bool check_pk_field_equal_constant(
467 Item* item,
468 TABLE* table,
469 MY_BITMAP* pk_fields) {
470
471 if (item->type() != Item::FUNC_ITEM)
472 return false;
473 Item_func* func = static_cast<Item_func*>(item);
474 if (strcmp(func->func_name(), "=") != 0)
475 return false;
476 uint n = func->argument_count();
477 if (n != 2)
478 return false;
479 Item** arguments = func->arguments();
480 Field* field = find_field_by_name(table, arguments[0]);
481 if (field == NULL)
482 return false;
483 if (!bitmap_test_and_clear(pk_fields, field->field_index))
484 return false;
485 switch (field->type()) {
486 case MYSQL_TYPE_TINY:
487 case MYSQL_TYPE_SHORT:
488 case MYSQL_TYPE_INT24:
489 case MYSQL_TYPE_LONG:
490 case MYSQL_TYPE_LONGLONG:
491 return arguments[1]->type() == Item::INT_ITEM ||
492 arguments[1]->type() == Item::STRING_ITEM;
493 case MYSQL_TYPE_STRING:
494 case MYSQL_TYPE_VARCHAR:
495 return full_field_in_key(table, field) &&
496 (arguments[1]->type() == Item::INT_ITEM ||
497 arguments[1]->type() == Item::STRING_ITEM);
498 default:
499 return false;
500 }
501 }
502
503 // Check that the where condition covers all of the primary key components
504 // with fieldname = constant expressions. Return true if it does.
check_point_update(Item * conds,TABLE * table)505 static bool check_point_update(Item* conds, TABLE* table) {
506 bool result = false;
507
508 if (conds == NULL)
509 return false; // no where condition on the update
510
511 if (table->s->primary_key >= table->s->keys)
512 return false; // no primary key defined
513
514 // use a bitmap of the primary key fields to keep track of those fields
515 // that are covered by the where conditions
516 MY_BITMAP pk_fields;
517 if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
518 return false;
519 KEY *key = &table->s->key_info[table->s->primary_key];
520 for (uint i = 0; i < key->user_defined_key_parts; i++)
521 bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);
522
523 switch (conds->type()) {
524 case Item::FUNC_ITEM:
525 result = check_pk_field_equal_constant(conds, table, &pk_fields);
526 break;
527 case Item::COND_ITEM: {
528 Item_cond* cond_item = static_cast<Item_cond*>(conds);
529 if (strcmp(cond_item->func_name(), "and") != 0)
530 break;
531 List_iterator<Item> li(*cond_item->argument_list());
532 Item* list_item;
533 result = true;
534 while (result == true && (list_item = li++)) {
535 result = check_pk_field_equal_constant(
536 list_item,
537 table,
538 &pk_fields);
539 }
540 break;
541 }
542 default:
543 break;
544 }
545
546 if (!bitmap_is_clear_all(&pk_fields))
547 result = false;
548 bitmap_free(&pk_fields);
549 return result;
550 }
551
552 // Return true if there are any clustering keys (except the primary).
553 // Precompute this when the table is opened.
clustering_keys_exist(TABLE * table)554 static bool clustering_keys_exist(TABLE *table) {
555 for (uint keynr = 0; keynr < table->s->keys; keynr++) {
556 if (keynr != table->s->primary_key &&
557 key_is_clustering(&table->s->key_info[keynr]))
558 return true;
559 }
560 return false;
561 }
562
is_strict_mode(THD * thd)563 static bool is_strict_mode(THD* thd) {
564 #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
565 return thd->is_strict_mode();
566 #else
567 return tokudb_test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
568 #endif
569 }
570
571 // Check if an update operation can be handled by this storage engine.
572 // Return true if it can.
check_fast_update(THD * thd,List<Item> & fields,List<Item> & values,Item * conds)573 bool ha_tokudb::check_fast_update(
574 THD* thd,
575 List<Item>& fields,
576 List<Item>& values,
577 Item* conds) {
578
579 if (!transaction)
580 return false;
581
582 // avoid strict mode arithmetic overflow issues
583 if (is_strict_mode(thd))
584 return false;
585
586 // no triggers
587 if (table->triggers)
588 return false;
589
590 // no binlog
591 if (mysql_bin_log.is_open() &&
592 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
593 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
594 return false;
595
596 // no clustering keys (need to broadcast an increment into the clustering
597 // keys since we are selecting with the primary key)
598 if (clustering_keys_exist(table))
599 return false;
600
601 if (!check_all_update_expressions(fields, values, table, false))
602 return false;
603
604 if (!check_point_update(conds, table))
605 return false;
606
607 return true;
608 }
609
marshall_varchar_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info,uint key_num)610 static void marshall_varchar_descriptor(
611 tokudb::buffer& b,
612 TABLE* table,
613 KEY_AND_COL_INFO* kc_info,
614 uint key_num) {
615
616 b.append_ui<uint32_t>('v');
617 b.append_ui<uint32_t>(
618 table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
619 uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
620 b.append_ui<uint32_t>(var_offset_bytes);
621 b.append_ui<uint32_t>(
622 var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
623 }
624
marshall_blobs_descriptor(tokudb::buffer & b,TABLE * table,KEY_AND_COL_INFO * kc_info)625 static void marshall_blobs_descriptor(
626 tokudb::buffer& b,
627 TABLE* table,
628 KEY_AND_COL_INFO* kc_info) {
629
630 b.append_ui<uint32_t>('b');
631 uint32_t n = kc_info->num_blobs;
632 b.append_ui<uint32_t>(n);
633 for (uint i = 0; i < n; i++) {
634 uint blob_field_index = kc_info->blob_fields[i];
635 assert_always(blob_field_index < table->s->fields);
636 uint8_t blob_field_length =
637 table->s->field[blob_field_index]->row_pack_length();
638 b.append(&blob_field_length, sizeof blob_field_length);
639 }
640 }
641
642 static inline uint32_t get_null_bit_position(uint32_t null_bit);
643
644 // evaluate the int value of an item
item_val_int(Item * item)645 static longlong item_val_int(Item* item) {
646 Item::Type t = item->type();
647 if (t == Item::INSERT_VALUE_ITEM) {
648 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
649 return value_item->arg->val_int();
650 } else
651 return item->val_int();
652 }
653
654 // Marshall update operations to a buffer.
marshall_update(tokudb::buffer & b,Item * lhs_item,Item * rhs_item,TABLE * table,TOKUDB_SHARE * share)655 static void marshall_update(
656 tokudb::buffer& b,
657 Item* lhs_item,
658 Item* rhs_item,
659 TABLE* table,
660 TOKUDB_SHARE* share) {
661
662 // figure out the update operation type (again)
663 Field* lhs_field = find_field_by_name(table, lhs_item);
664 assert_always(lhs_field); // we found it before, so this should work
665
666 // compute the update info
667 uint32_t field_type;
668 uint32_t field_null_num = 0;
669 if (lhs_field->real_maybe_null()) {
670 uint32_t field_num = lhs_field->field_index;
671 field_null_num =
672 ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
673 }
674 uint32_t offset;
675 void* v_ptr = NULL;
676 uint32_t v_length;
677 uint32_t update_operation;
678 longlong v_ll;
679 String v_str;
680
681 switch (lhs_field->type()) {
682 case MYSQL_TYPE_TINY:
683 case MYSQL_TYPE_SHORT:
684 case MYSQL_TYPE_INT24:
685 case MYSQL_TYPE_LONG:
686 case MYSQL_TYPE_LONGLONG: {
687 Field_num* lhs_num = static_cast<Field_num*>(lhs_field);
688 field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
689 offset =
690 fixed_field_offset(
691 table->s->null_bytes,
692 &share->kc_info,
693 table->s->primary_key,
694 lhs_field->field_index);
695 switch (rhs_item->type()) {
696 case Item::INT_ITEM: {
697 update_operation = '=';
698 v_ll = rhs_item->val_int();
699 v_length = lhs_field->pack_length();
700 v_ptr = &v_ll;
701 break;
702 }
703 case Item::FUNC_ITEM: {
704 Item_func* rhs_func = static_cast<Item_func*>(rhs_item);
705 Item** arguments = rhs_func->arguments();
706 // we only support one if function for now, and it is a
707 // decrement with floor.
708 if (strcmp(rhs_func->func_name(), "if") == 0) {
709 update_operation = '-';
710 v_ll = 1;
711 } else if (rhs_func->argument_count() == 1) {
712 update_operation = '=';
713 v_ll = rhs_func->val_int();
714 } else {
715 update_operation = rhs_func->func_name()[0];
716 v_ll = item_val_int(arguments[1]);
717 }
718 v_length = lhs_field->pack_length();
719 v_ptr = &v_ll;
720 break;
721 }
722 default:
723 assert_unreachable();
724 }
725 break;
726 }
727 case MYSQL_TYPE_STRING: {
728 update_operation = '=';
729 field_type =
730 lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
731 offset =
732 fixed_field_offset(
733 table->s->null_bytes,
734 &share->kc_info,
735 table->s->primary_key,
736 lhs_field->field_index);
737 v_str = *rhs_item->val_str(&v_str);
738 v_length = v_str.length();
739 if (v_length >= lhs_field->pack_length()) {
740 v_length = lhs_field->pack_length();
741 v_str.length(v_length); // truncate
742 } else {
743 v_length = lhs_field->pack_length();
744 uchar pad_char =
745 lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
746 v_str.fill(lhs_field->pack_length(), pad_char); // pad
747 }
748 v_ptr = v_str.c_ptr();
749 break;
750 }
751 case MYSQL_TYPE_VARCHAR: {
752 update_operation = '=';
753 field_type =
754 lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
755 offset =
756 var_field_index(
757 table,
758 &share->kc_info,
759 table->s->primary_key,
760 lhs_field->field_index);
761 v_str = *rhs_item->val_str(&v_str);
762 v_length = v_str.length();
763 if (v_length >= lhs_field->row_pack_length()) {
764 v_length = lhs_field->row_pack_length();
765 v_str.length(v_length); // truncate
766 }
767 v_ptr = v_str.c_ptr();
768 break;
769 }
770 case MYSQL_TYPE_BLOB: {
771 update_operation = '=';
772 field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
773 offset =
774 blob_field_index(table, &share->kc_info, lhs_field->field_index);
775 v_str = *rhs_item->val_str(&v_str);
776 v_length = v_str.length();
777 if (v_length >= lhs_field->max_data_length()) {
778 v_length = lhs_field->max_data_length();
779 v_str.length(v_length); // truncate
780 }
781 v_ptr = v_str.c_ptr();
782 break;
783 }
784 default:
785 assert_unreachable();
786 }
787
788 // marshall the update fields into the buffer
789 b.append_ui<uint32_t>(update_operation);
790 b.append_ui<uint32_t>(field_type);
791 b.append_ui<uint32_t>(field_null_num);
792 b.append_ui<uint32_t>(offset);
793 b.append_ui<uint32_t>(v_length);
794 b.append(v_ptr, v_length);
795 }
796
797 // Save an item's value into the appropriate field. Return 0 if successful.
save_in_field(Item * item,TABLE * table)798 static int save_in_field(Item* item, TABLE* table) {
799 assert_always(item->type() == Item::FUNC_ITEM);
800 Item_func *func = static_cast<Item_func*>(item);
801 assert_always(strcmp(func->func_name(), "=") == 0);
802 uint n = func->argument_count();
803 assert_always(n == 2);
804 Item **arguments = func->arguments();
805 assert_always(arguments[0]->type() == Item::FIELD_ITEM);
806 Item_field *field_item = static_cast<Item_field*>(arguments[0]);
807 my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
808 int error = arguments[1]->save_in_field(field_item->field, 0);
809 dbug_tmp_restore_column_map(table->write_set, old_map);
810 return error;
811 }
812
count_update_types(Field * lhs_field,uint * num_varchars,uint * num_blobs)813 static void count_update_types(
814 Field* lhs_field,
815 uint* num_varchars,
816 uint* num_blobs) {
817
818 switch (lhs_field->type()) {
819 case MYSQL_TYPE_VARCHAR:
820 *num_varchars += 1;
821 break;
822 case MYSQL_TYPE_BLOB:
823 *num_blobs += 1;
824 break;
825 default:
826 break;
827 }
828 }
829
830 // Generate an update message for an update operation and send it into the
831 // primary tree. Return 0 if successful.
send_update_message(List<Item> & update_fields,List<Item> & update_values,Item * conds,DB_TXN * txn)832 int ha_tokudb::send_update_message(
833 List<Item>& update_fields,
834 List<Item>& update_values,
835 Item* conds,
836 DB_TXN* txn) {
837
838 int error;
839
840 // Save the primary key from the where conditions
841 Item::Type t = conds->type();
842 if (t == Item::FUNC_ITEM) {
843 error = save_in_field(conds, table);
844 } else if (t == Item::COND_ITEM) {
845 Item_cond* cond_item = static_cast<Item_cond*>(conds);
846 List_iterator<Item> li(*cond_item->argument_list());
847 Item* list_item;
848 error = 0;
849 while (error == 0 && (list_item = li++)) {
850 error = save_in_field(list_item, table);
851 }
852 } else {
853 assert_unreachable();
854 }
855 if (error)
856 return error;
857
858 // put the primary key into key_buff and wrap it with key_dbt
859 DBT key_dbt;
860 bool has_null;
861 create_dbt_key_from_table(
862 &key_dbt,
863 primary_key,
864 key_buff,
865 table->record[0],
866 &has_null);
867
868 // construct the update message
869 tokudb::buffer update_message;
870
871 uint8_t op = UPDATE_OP_UPDATE_2;
872 update_message.append(&op, sizeof op);
873
874 uint32_t num_updates = update_fields.elements;
875 uint num_varchars = 0, num_blobs = 0;
876 if (1) {
877 List_iterator<Item> lhs_i(update_fields);
878 Item* lhs_item;
879 while ((lhs_item = lhs_i++)) {
880 if (lhs_item == NULL)
881 break;
882 Field* lhs_field = find_field_by_name(table, lhs_item);
883 assert_always(lhs_field); // we found it before, so this should work
884 count_update_types(lhs_field, &num_varchars, &num_blobs);
885 }
886 if (num_varchars > 0 || num_blobs > 0)
887 num_updates++;
888 if (num_blobs > 0)
889 num_updates++;
890 }
891
892 // append the updates
893 update_message.append_ui<uint32_t>(num_updates);
894
895 if (num_varchars > 0 || num_blobs > 0)
896 marshall_varchar_descriptor(
897 update_message,
898 table,
899 &share->kc_info,
900 table->s->primary_key);
901 if (num_blobs > 0)
902 marshall_blobs_descriptor(update_message, table, &share->kc_info);
903
904 List_iterator<Item> lhs_i(update_fields);
905 List_iterator<Item> rhs_i(update_values);
906 while (error == 0) {
907 Item* lhs_item = lhs_i++;
908 if (lhs_item == NULL)
909 break;
910 Item* rhs_item = rhs_i++;
911 assert_always(rhs_item != NULL);
912 marshall_update(update_message, lhs_item, rhs_item, table, share);
913 }
914
915 rwlock_t_lock_read(share->_num_DBs_lock);
916
917 // hot index in progress
918 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
919 error = ENOTSUP; // run on the slow path
920 } else {
921 // send the update message
922 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
923 update_dbt.data = update_message.data();
924 update_dbt.size = update_message.size();
925 error =
926 share->key_file[primary_key]->update(
927 share->key_file[primary_key],
928 txn,
929 &key_dbt,
930 &update_dbt,
931 0);
932 }
933
934 share->_num_DBs_lock.unlock();
935
936 return error;
937 }
938
939 // Determine if an upsert operation can be offloaded to the storage engine.
940 // An upsert consists of a row and a list of update expressions
941 // (update_fields[i] = update_values[i]).
942 // The function returns 0 if the upsert is handled in the storage engine.
943 // Otherwise, an error code is returned.
upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)944 int ha_tokudb::upsert(
945 THD* thd,
946 List<Item>& update_fields,
947 List<Item>& update_values) {
948
949 TOKUDB_HANDLER_DBUG_ENTER("");
950 int error = 0;
951
952 if (!tokudb::sysvars::enable_fast_upsert(thd)) {
953 error = ENOTSUP;
954 goto exit;
955 }
956
957 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
958 fprintf(stderr, "upsert\n");
959 dump_item_list("update_fields", update_fields);
960 dump_item_list("update_values", update_values);
961 }
962
963 // not an upsert or something is fishy with the parameters
964 if (update_fields.elements < 1 ||
965 update_fields.elements != update_values.elements) {
966 error = ENOTSUP;
967 goto exit;
968 }
969
970 if (!check_upsert(thd, update_fields, update_values)) {
971 error = HA_ERR_UNSUPPORTED;
972 goto exit;
973 }
974
975 error = send_upsert_message(update_fields, update_values, transaction);
976
977 if (error) {
978 int mapped_error = map_to_handler_error(error);
979 if (mapped_error == error)
980 error = HA_ERR_UNSUPPORTED;
981 }
982
983 exit:
984
985 if (error != 0 && error != ENOTSUP)
986 print_error(error, MYF(0));
987
988 TOKUDB_HANDLER_DBUG_RETURN(error);
989 }
990
991 // Check if an upsert can be handled by this storage engine.
992 // Return true if it can.
check_upsert(THD * thd,List<Item> & update_fields,List<Item> & update_values)993 bool ha_tokudb::check_upsert(
994 THD* thd,
995 List<Item>& update_fields,
996 List<Item>& update_values) {
997
998 if (!transaction)
999 return false;
1000
1001 // avoid strict mode arithmetic overflow issues
1002 if (is_strict_mode(thd))
1003 return false;
1004
1005 // no triggers
1006 if (table->triggers)
1007 return false;
1008
1009 // primary key must exist
1010 if (table->s->primary_key >= table->s->keys)
1011 return false;
1012
1013 // no secondary keys
1014 if (table->s->keys > 1)
1015 return false;
1016
1017 // no binlog
1018 if (mysql_bin_log.is_open() &&
1019 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
1020 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
1021 return false;
1022
1023 if (!check_all_update_expressions(
1024 update_fields,
1025 update_values,
1026 table,
1027 true))
1028 return false;
1029
1030 return true;
1031 }
1032
1033 // Generate an upsert message and send it into the primary tree.
1034 // Return 0 if successful.
send_upsert_message(List<Item> & update_fields,List<Item> & update_values,DB_TXN * txn)1035 int ha_tokudb::send_upsert_message(
1036 List<Item>& update_fields,
1037 List<Item>& update_values,
1038 DB_TXN* txn) {
1039 int error = 0;
1040
1041 // generate primary key
1042 DBT key_dbt;
1043 bool has_null;
1044 create_dbt_key_from_table(
1045 &key_dbt,
1046 primary_key,
1047 primary_key_buff,
1048 table->record[0],
1049 &has_null);
1050
1051 // generate packed row
1052 DBT row;
1053 error = pack_row(&row, (const uchar *) table->record[0], primary_key);
1054 if (error)
1055 return error;
1056
1057 tokudb::buffer update_message;
1058
1059 // append the operation
1060 uint8_t op = UPDATE_OP_UPSERT_2;
1061 update_message.append(&op, sizeof op);
1062
1063 // append the row
1064 update_message.append_ui<uint32_t>(row.size);
1065 update_message.append(row.data, row.size);
1066
1067 uint32_t num_updates = update_fields.elements;
1068 uint num_varchars = 0, num_blobs = 0;
1069 if (1) {
1070 List_iterator<Item> lhs_i(update_fields);
1071 Item* lhs_item;
1072 while ((lhs_item = lhs_i++)) {
1073 if (lhs_item == NULL)
1074 break;
1075 Field* lhs_field = find_field_by_name(table, lhs_item);
1076 assert_always(lhs_field); // we found it before, so this should work
1077 count_update_types(lhs_field, &num_varchars, &num_blobs);
1078 }
1079 if (num_varchars > 0 || num_blobs > 0)
1080 num_updates++;
1081 if (num_blobs > 0)
1082 num_updates++;
1083 }
1084
1085 // append the updates
1086 update_message.append_ui<uint32_t>(num_updates);
1087
1088 if (num_varchars > 0 || num_blobs > 0)
1089 marshall_varchar_descriptor(
1090 update_message,
1091 table, &share->kc_info,
1092 table->s->primary_key);
1093 if (num_blobs > 0)
1094 marshall_blobs_descriptor(update_message, table, &share->kc_info);
1095
1096 List_iterator<Item> lhs_i(update_fields);
1097 List_iterator<Item> rhs_i(update_values);
1098 while (1) {
1099 Item* lhs_item = lhs_i++;
1100 if (lhs_item == NULL)
1101 break;
1102 Item* rhs_item = rhs_i++;
1103 assert_always(rhs_item != NULL);
1104 marshall_update(update_message, lhs_item, rhs_item, table, share);
1105 }
1106
1107 rwlock_t_lock_read(share->_num_DBs_lock);
1108
1109 // hot index in progress
1110 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
1111 error = ENOTSUP; // run on the slow path
1112 } else {
1113 // send the upsert message
1114 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
1115 update_dbt.data = update_message.data();
1116 update_dbt.size = update_message.size();
1117 error =
1118 share->key_file[primary_key]->update(
1119 share->key_file[primary_key],
1120 txn,
1121 &key_dbt,
1122 &update_dbt,
1123 0);
1124 }
1125
1126 share->_num_DBs_lock.unlock();
1127
1128 return error;
1129 }
1130