1 /*
2 Copyright (c) 2000, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 /* Insert of records */
26
27 #include "sql_insert.h"
28
29 #include "auth_common.h" // check_grant_all_columns
30 #include "debug_sync.h" // DEBUG_SYNC
31 #include "item.h" // Item
32 #include "lock.h" // mysql_unlock_tables
33 #include "opt_explain.h" // Modification_plan
34 #include "opt_explain_format.h" // enum_mod_type
35 #include "rpl_rli.h" // Relay_log_info
36 #include "rpl_slave.h" // rpl_master_has_bug
37 #include "sql_base.h" // setup_fields
38 #include "sql_resolver.h" // Column_privilege_tracker
39 #include "sql_select.h" // free_underlaid_joins
40 #include "sql_show.h" // store_create_info
41 #include "sql_table.h" // quick_rm_table
42 #include "sql_tmp_table.h" // create_tmp_field
43 #include "sql_update.h" // records_are_comparable
44 #include "sql_view.h" // check_key_in_view
45 #include "table_trigger_dispatcher.h" // Table_trigger_dispatcher
46 #include "transaction.h" // trans_commit_stmt
47 #include "sql_resolver.h" // validate_gc_assignment
48 #include "partition_info.h" // partition_info
49 #include "probes_mysql.h" // MYSQL_INSERT_START
50
51 static bool check_view_insertability(THD *thd, TABLE_LIST *view,
52 const TABLE_LIST *insert_table_ref);
53
54 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables);
55
56 /**
57 Check that insert fields are from a single table of a multi-table view.
58
59 @param fields The insert fields to be checked.
60 @param view The view for insert.
61 @param insert_table_ref[out] Reference to table to insert into
62
63 This function is called to check that the fields being inserted into
64 are from a single base table. This must be checked when the table to
65 be inserted into is a multi-table view.
66
67 @return false if success, true if an error was raised.
68 */
69
check_single_table_insert(List<Item> & fields,TABLE_LIST * view,TABLE_LIST ** insert_table_ref)70 static bool check_single_table_insert(List<Item> &fields, TABLE_LIST *view,
71 TABLE_LIST **insert_table_ref)
72 {
73 // It is join view => we need to find the table for insert
74 List_iterator_fast<Item> it(fields);
75 Item *item;
76 *insert_table_ref= NULL; // reset for call to check_single_table()
77 table_map tables= 0;
78
79 while ((item= it++))
80 tables|= item->used_tables();
81
82 if (view->check_single_table(insert_table_ref, tables))
83 {
84 my_error(ER_VIEW_MULTIUPDATE, MYF(0),
85 view->view_db.str, view->view_name.str);
86 return true;
87 }
88 assert(*insert_table_ref && (*insert_table_ref)->is_insertable());
89
90 return false;
91 }
92
93
94 /**
95 Check insert fields.
96
97 @param thd The current thread.
98 @param table_list The table for insert.
99 @param fields The insert fields.
100 @param value_count Number of values supplied
101 @param value_count_known if false, delay field count check
102 @todo: Eliminate this when preparation is properly phased
103 @param check_unique If duplicate values should be rejected.
104
105 @return false if success, true if error
106
107 Resolved reference to base table is returned in lex->insert_table_leaf.
108
109 @todo check_insert_fields() should be refactored as follows:
110 - Remove the argument value_count_known and all predicates involving it.
111 - Rearrange the call to check_insert_fields() from
112 mysql_prepare_insert() so that the value_count is known also when
113 processing a prepared statement.
114 */
115
check_insert_fields(THD * thd,TABLE_LIST * table_list,List<Item> & fields,uint value_count,bool value_count_known,bool check_unique)116 static bool check_insert_fields(THD *thd, TABLE_LIST *table_list,
117 List<Item> &fields, uint value_count,
118 bool value_count_known, bool check_unique)
119 {
120 LEX *const lex= thd->lex;
121
122 #ifndef NDEBUG
123 TABLE_LIST *const saved_insert_table_leaf= lex->insert_table_leaf;
124 #endif
125
126 TABLE *table= table_list->table;
127
128 assert(table_list->is_insertable());
129
130 if (fields.elements == 0 && value_count_known && value_count > 0)
131 {
132 /*
133 No field list supplied, but a value list has been supplied.
134 Use field list of table being updated.
135 */
136 assert(table); // This branch is not reached with a view:
137
138 lex->insert_table_leaf= table_list;
139
140 // Values for all fields in table are needed
141 if (value_count != table->s->fields)
142 {
143 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
144 return true;
145 }
146 #ifndef NO_EMBEDDED_ACCESS_CHECKS
147 Field_iterator_table_ref field_it;
148 field_it.set(table_list);
149 if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
150 return true;
151 #endif
152 /*
153 No fields are provided so all fields must be provided in the values.
154 Thus we set all bits in the write set.
155 */
156 bitmap_set_all(table->write_set);
157 }
158 else
159 {
160 // INSERT with explicit field list.
161 SELECT_LEX *select_lex= thd->lex->select_lex;
162 Name_resolution_context *context= &select_lex->context;
163 Name_resolution_context_state ctx_state;
164 int res;
165
166 if (value_count_known && fields.elements != value_count)
167 {
168 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
169 return true;
170 }
171
172 thd->dup_field= 0;
173
174 /* Save the state of the current name resolution context. */
175 ctx_state.save_state(context, table_list);
176
177 /*
178 Perform name resolution only in the first table - 'table_list',
179 which is the table that is inserted into.
180 */
181 table_list->next_local= NULL;
182 context->resolve_in_table_list_only(table_list);
183 res= setup_fields(thd, Ref_ptr_array(), fields, INSERT_ACL, NULL,
184 false, true);
185
186 /* Restore the current context. */
187 ctx_state.restore_state(context, table_list);
188
189 if (res)
190 return true;
191
192 if (table_list->is_merged())
193 {
194 if (check_single_table_insert(fields, table_list,
195 &lex->insert_table_leaf))
196 return true;
197 table= lex->insert_table_leaf->table;
198 }
199 else
200 {
201 lex->insert_table_leaf= table_list;
202 }
203
204 if (check_unique && thd->dup_field)
205 {
206 my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
207 return true;
208 }
209 }
210 /* Mark all generated columns for write*/
211 if (table->vfield)
212 table->mark_generated_columns(false);
213
214 if (check_key_in_view(thd, table_list, lex->insert_table_leaf) ||
215 (table_list->is_view() &&
216 check_view_insertability(thd, table_list, lex->insert_table_leaf)))
217 {
218 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
219 return true;
220 }
221
222 assert(saved_insert_table_leaf == NULL ||
223 lex->insert_table_leaf == saved_insert_table_leaf);
224
225 return false;
226 }
227
228
229 /**
230 Check that table references are restricted to the supplied table map.
231 The check can be ignored if the supplied table is a base table.
232
233 @param view Table being specified
234 @param values Values whose used tables are to be matched against table map
235 @param map Table map to match against
236
237 @return false if success, true if error
238 */
239
check_valid_table_refs(const TABLE_LIST * view,List<Item> & values,table_map map)240 static bool check_valid_table_refs(const TABLE_LIST *view, List<Item> &values,
241 table_map map)
242 {
243 List_iterator_fast<Item> it(values);
244 Item *item;
245
246 // A base table will always match the supplied map.
247 assert(view->is_view() || (view->table && map));
248
249 if (!view->is_view()) // Ignore check if called with base table.
250 return false;
251
252 map|= PSEUDO_TABLE_BITS;
253
254 while ((item= it++))
255 {
256 if (item->used_tables() & ~map)
257 {
258 my_error(ER_VIEW_MULTIUPDATE, MYF(0),
259 view->view_db.str, view->view_name.str);
260 return true;
261 }
262 }
263 return false;
264 }
265
266
267 /**
268 Validates default value of fields which are not specified in
269 the column list of INSERT statement.
270
271 @Note table->record[0] should be be populated with default values
272 before calling this function.
273
274 @param thd thread context
275 @param table table to which values are inserted.
276
277 @return
278 @retval false Success.
279 @retval true Failure.
280 */
281
validate_default_values_of_unset_fields(THD * thd,TABLE * table)282 bool validate_default_values_of_unset_fields(THD *thd, TABLE *table)
283 {
284 MY_BITMAP *write_set= table->write_set;
285 DBUG_ENTER("validate_default_values_of_unset_fields");
286
287 for (Field **field= table->field; *field; field++)
288 {
289 if (!bitmap_is_set(write_set, (*field)->field_index) &&
290 !((*field)->flags & NO_DEFAULT_VALUE_FLAG))
291 {
292 if ((*field)->validate_stored_val(thd) && thd->is_error())
293 DBUG_RETURN(true);
294 }
295 }
296
297 DBUG_RETURN(false);
298 }
299
300
301 /*
302 Prepare triggers for INSERT-like statement.
303
304 SYNOPSIS
305 prepare_triggers_for_insert_stmt()
306 table Table to which insert will happen
307
308 NOTE
309 Prepare triggers for INSERT-like statement by marking fields
310 used by triggers and inform handlers that batching of UPDATE/DELETE
311 cannot be done if there are BEFORE UPDATE/DELETE triggers.
312 */
313
prepare_triggers_for_insert_stmt(TABLE * table)314 void prepare_triggers_for_insert_stmt(TABLE *table)
315 {
316 if (table->triggers)
317 {
318 if (table->triggers->has_triggers(TRG_EVENT_DELETE,
319 TRG_ACTION_AFTER))
320 {
321 /*
322 The table has AFTER DELETE triggers that might access to
323 subject table and therefore might need delete to be done
324 immediately. So we turn-off the batching.
325 */
326 (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
327 }
328 if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
329 TRG_ACTION_AFTER))
330 {
331 /*
332 The table has AFTER UPDATE triggers that might access to subject
333 table and therefore might need update to be done immediately.
334 So we turn-off the batching.
335 */
336 (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
337 }
338 }
339 table->mark_columns_needed_for_insert();
340 }
341
342 /**
343 Setup data for field BLOB/GEOMETRY field types for execution of
344 "INSERT...UPDATE" statement. For a expression in 'UPDATE' clause
345 like "a= VALUES(a)", let as call Field* referring 'a' as LHS_FIELD
346 and Field* referring field 'a' in "VALUES(a)" as RHS_FIELD
347
348 This function creates a separate copy of the blob value for RHS_FIELD,
349 if the field is updated as well as accessed through VALUES()
350 function in 'UPDATE' clause of "INSERT...UPDATE" statement.
351
352 @param [in] thd
353 Pointer to THD object.
354
355 @param [in] fields
356 List of fields representing LHS_FIELD of all expressions
357 in 'UPDATE' clause.
358
359 @return - Can fail only when we are out of memory.
360 @retval false Success
361 @retval true Failure
362 */
363
mysql_prepare_blob_values(THD * thd,List<Item> & fields,MEM_ROOT * mem_root)364 bool mysql_prepare_blob_values(THD *thd, List<Item> &fields, MEM_ROOT *mem_root)
365 {
366 DBUG_ENTER("mysql_prepare_blob_values");
367
368 if (fields.elements <= 1)
369 DBUG_RETURN(false);
370
371 // Collect LHS_FIELD's which are updated in a 'set'.
372 // This 'set' helps decide if we need to make copy of BLOB value
373 // or not.
374
375 Prealloced_array<Field_blob *, 16, true>
376 blob_update_field_set(PSI_NOT_INSTRUMENTED);
377 if (blob_update_field_set.reserve(fields.elements))
378 DBUG_RETURN(true);
379
380 List_iterator_fast<Item> f(fields);
381 Item *fld;
382 while ((fld= f++))
383 {
384 Item_field *field= fld->field_for_view_update();
385 Field *lhs_field= field->field;
386
387 if (lhs_field->type() == MYSQL_TYPE_BLOB ||
388 lhs_field->type() == MYSQL_TYPE_GEOMETRY)
389 blob_update_field_set.insert_unique(down_cast<Field_blob *>(lhs_field));
390 }
391
392 // Traverse through thd->lex->insert_update_values_map
393 // and make copy of BLOB values in RHS_FIELD, if the same field is
394 // modified (present in above 'set' prepared).
395 if (thd->lex->has_values_map())
396 {
397 std::map<Field *, Field *>::iterator iter;
398 for(iter= thd->lex->begin_values_map();
399 iter != thd->lex->end_values_map();
400 ++iter)
401 {
402 // Retrieve the Field_blob pointers from the map.
403 // and initialize newly declared variables immediately.
404 Field_blob *lhs_field= down_cast<Field_blob *>(iter->first);
405 Field_blob *rhs_field= down_cast<Field_blob *>(iter->second);
406
407 // Check if the Field_blob object is updated before making a copy.
408 if (blob_update_field_set.count_unique(lhs_field) == 0)
409 continue;
410
411 // Copy blob value
412 if(rhs_field->copy_blob_value(mem_root))
413 DBUG_RETURN(true);
414 }
415 }
416
417 DBUG_RETURN(false);
418 }
419
420 /**
421 INSERT statement implementation
422
423 @note Like implementations of other DDL/DML in MySQL, this function
424 relies on the caller to close the thread tables. This is done in the
425 end of dispatch_command().
426 */
427
mysql_insert(THD * thd,TABLE_LIST * table_list)428 bool Sql_cmd_insert::mysql_insert(THD *thd,TABLE_LIST *table_list)
429 {
430 DBUG_ENTER("mysql_insert");
431
432 LEX *const lex= thd->lex;
433 int error, res;
434 bool err= true;
435 bool transactional_table, joins_freed= FALSE;
436 bool changed;
437 bool is_locked= false;
438 ulong counter= 0;
439 ulonglong id;
440 /*
441 We have three alternative syntax rules for the INSERT statement:
442 1) "INSERT (columns) VALUES ...", so non-listed columns need a default
443 2) "INSERT VALUES (), ..." so all columns need a default;
444 note that "VALUES (),(expr_1, ..., expr_n)" is not allowed, so checking
445 emptiness of the first row is enough
446 3) "INSERT VALUES (expr_1, ...), ..." so no defaults are needed; even if
447 expr_i is "DEFAULT" (in which case the column is set by
448 Item_default_value::save_in_field_inner()).
449 */
450 const bool manage_defaults=
451 insert_field_list.elements != 0 || // 1)
452 insert_many_values.head()->elements == 0; // 2)
453 COPY_INFO info(COPY_INFO::INSERT_OPERATION,
454 &insert_field_list,
455 manage_defaults,
456 duplicates);
457 COPY_INFO update(COPY_INFO::UPDATE_OPERATION, &insert_update_list,
458 &insert_value_list);
459 Name_resolution_context *context;
460 Name_resolution_context_state ctx_state;
461 uint num_partitions= 0;
462 enum partition_info::enum_can_prune can_prune_partitions=
463 partition_info::PRUNE_NO;
464 MY_BITMAP used_partitions;
465 bool prune_needs_default_values;
466
467 SELECT_LEX *const select_lex= lex->select_lex;
468
469 select_lex->make_active_options(0, 0);
470
471 if (open_tables_for_query(thd, table_list, 0))
472 DBUG_RETURN(true);
473
474 if (run_before_dml_hook(thd))
475 DBUG_RETURN(true);
476
477 THD_STAGE_INFO(thd, stage_init);
478 lex->used_tables=0;
479
480 List_iterator_fast<List_item> its(insert_many_values);
481 List_item *values= its++;
482 const uint value_count= values->elements;
483 TABLE *insert_table= NULL;
484 if (mysql_prepare_insert(thd, table_list, values, false))
485 goto exit_without_my_ok;
486
487 insert_table= lex->insert_table_leaf->table;
488
489 if (duplicates == DUP_UPDATE || duplicates == DUP_REPLACE)
490 prepare_for_positional_update(insert_table, table_list);
491
492 /* Must be done before can_prune_insert, due to internal initialization. */
493 if (info.add_function_default_columns(insert_table, insert_table->write_set))
494 goto exit_without_my_ok; /* purecov: inspected */
495 if (duplicates == DUP_UPDATE &&
496 update.add_function_default_columns(insert_table,
497 insert_table->write_set))
498 goto exit_without_my_ok; /* purecov: inspected */
499
500 context= &select_lex->context;
501 /*
502 These three asserts test the hypothesis that the resetting of the name
503 resolution context below is not necessary at all since the list of local
504 tables for INSERT always consists of one table.
505 */
506 assert(!table_list->next_local);
507 assert(!context->table_list->next_local);
508 assert(!context->first_name_resolution_table->next_name_resolution_table);
509
510 /* Save the state of the current name resolution context. */
511 ctx_state.save_state(context, table_list);
512
513 /*
514 Perform name resolution only in the first table - 'table_list',
515 which is the table that is inserted into.
516 */
517 assert(table_list->next_local == 0);
518 context->resolve_in_table_list_only(table_list);
519
520 if (!is_locked && insert_table->part_info)
521 {
522 if (insert_table->part_info->can_prune_insert(thd,
523 duplicates,
524 update,
525 insert_update_list,
526 insert_field_list,
527 !MY_TEST(values->elements),
528 &can_prune_partitions,
529 &prune_needs_default_values,
530 &used_partitions))
531 goto exit_without_my_ok; /* purecov: inspected */
532
533 if (can_prune_partitions != partition_info::PRUNE_NO)
534 {
535 num_partitions= insert_table->part_info->lock_partitions.n_bits;
536 /*
537 Pruning probably possible, all partitions is unmarked for read/lock,
538 and we must now add them on row by row basis.
539
540 Check the first INSERT value.
541 Do not fail here, since that would break MyISAM behavior of inserting
542 all rows before the failing row.
543
544 PRUNE_DEFAULTS means the partitioning fields are only set to DEFAULT
545 values, so we only need to check the first INSERT value, since all the
546 rest will be in the same partition.
547 */
548 if (insert_table->part_info->set_used_partition(insert_field_list,
549 *values,
550 info,
551 prune_needs_default_values,
552 &used_partitions))
553 can_prune_partitions= partition_info::PRUNE_NO;
554 }
555 }
556
557 its.rewind();
558 while ((values= its++))
559 {
560 counter++;
561 /*
562 To make it possible to increase concurrency on table level locking
563 engines such as MyISAM, we check pruning for each row until we will use
564 all partitions, Even if the number of rows is much higher than the
565 number of partitions.
566 TODO: Cache the calculated part_id and reuse in
567 ha_partition::write_row() if possible.
568 */
569 if (can_prune_partitions == partition_info::PRUNE_YES)
570 {
571 if (insert_table->part_info->set_used_partition(insert_field_list,
572 *values,
573 info,
574 prune_needs_default_values,
575 &used_partitions))
576 can_prune_partitions= partition_info::PRUNE_NO;
577 if (!(counter % num_partitions))
578 {
579 /*
580 Check if we using all partitions in table after adding partition
581 for current row to the set of used partitions. Do it only from
582 time to time to avoid overhead from bitmap_is_set_all() call.
583 */
584 if (bitmap_is_set_all(&used_partitions))
585 can_prune_partitions= partition_info::PRUNE_NO;
586 }
587 }
588 }
589 insert_table->auto_increment_field_not_null= false;
590 its.rewind ();
591
592 /* Restore the current context. */
593 ctx_state.restore_state(context, table_list);
594
595 { // Statement plan is available within these braces
596 Modification_plan plan(thd,
597 (lex->sql_command == SQLCOM_INSERT) ?
598 MT_INSERT : MT_REPLACE, insert_table,
599 NULL, false, 0);
600 DEBUG_SYNC(thd, "planned_single_insert");
601
602 if (can_prune_partitions != partition_info::PRUNE_NO)
603 {
604 /*
605 Only lock the partitions we will insert into.
606 And also only read from those partitions (duplicates etc.).
607 If explicit partition selection 'INSERT INTO t PARTITION (p1)' is used,
608 the new set of read/lock partitions is the intersection of read/lock
609 partitions and used partitions, i.e only the partitions that exists in
610 both sets will be marked for read/lock.
611 It is also safe for REPLACE, since all potentially conflicting records
612 always belong to the same partition as the one which we try to
613 insert a row. This is because ALL unique/primary keys must
614 include ALL partitioning columns.
615 */
616 bitmap_intersect(&insert_table->part_info->read_partitions,
617 &used_partitions);
618 bitmap_intersect(&insert_table->part_info->lock_partitions,
619 &used_partitions);
620 }
621
622 // Lock the tables now if not locked already.
623 if (!is_locked &&
624 lock_tables(thd, table_list, lex->table_count, 0))
625 DBUG_RETURN(true);
626
627 if (lex->describe)
628 {
629 err= explain_single_table_modification(thd, &plan, select_lex);
630 goto exit_without_my_ok;
631 }
632
633 /*
634 Count warnings for all inserts.
635 For single line insert, generate an error if try to set a NOT NULL field
636 to NULL.
637 */
638 thd->count_cuted_fields= ((insert_many_values.elements == 1 &&
639 !lex->is_ignore()) ?
640 CHECK_FIELD_ERROR_FOR_NULL :
641 CHECK_FIELD_WARN);
642 thd->cuted_fields = 0L;
643 insert_table->next_number_field= insert_table->found_next_number_field;
644
645 #ifdef HAVE_REPLICATION
646 if (thd->slave_thread)
647 {
648 /* Get SQL thread's rli, even for a slave worker thread */
649 Relay_log_info* c_rli= thd->rli_slave->get_c_rli();
650 assert(c_rli != NULL);
651 if(info.get_duplicate_handling() == DUP_UPDATE &&
652 insert_table->next_number_field != NULL &&
653 rpl_master_has_bug(c_rli, 24432, TRUE, NULL, NULL))
654 goto exit_without_my_ok;
655 }
656 #endif
657
658 error=0;
659 THD_STAGE_INFO(thd, stage_update);
660 if (duplicates == DUP_REPLACE &&
661 (!insert_table->triggers ||
662 !insert_table->triggers->has_delete_triggers()))
663 insert_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
664 if (duplicates == DUP_UPDATE)
665 insert_table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
666 /*
667 let's *try* to start bulk inserts. It won't necessary
668 start them as insert_many_values.elements should be greater than
669 some - handler dependent - threshold.
670 We should not start bulk inserts if this statement uses
671 functions or invokes triggers since they may access
672 to the same table and therefore should not see its
673 inconsistent state created by this optimization.
674 So we call start_bulk_insert to perform nesessary checks on
675 insert_many_values.elements, and - if nothing else - to initialize
676 the code to make the call of end_bulk_insert() below safe.
677 */
678 if (duplicates != DUP_ERROR || lex->is_ignore())
679 insert_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
680 /**
681 This is a simple check for the case when the table has a trigger
682 that reads from it, or when the statement invokes a stored function
683 that reads from the table being inserted to.
684 Engines can't handle a bulk insert in parallel with a read form the
685 same table in the same connection.
686 */
687 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
688 insert_table->file->ha_start_bulk_insert(insert_many_values.elements);
689
690 prepare_triggers_for_insert_stmt(insert_table);
691
692 for (Field** next_field= insert_table->field; *next_field; ++next_field)
693 {
694 (*next_field)->reset_warnings();
695 }
696
697 while ((values= its++))
698 {
699 if (insert_field_list.elements || !value_count)
700 {
701 restore_record(insert_table, s->default_values); // Get empty record
702
703 /*
704 Check whether default values of the insert_field_list not specified in
705 column list are correct or not.
706 */
707 if (validate_default_values_of_unset_fields(thd, insert_table))
708 {
709 error= 1;
710 break;
711 }
712 if (fill_record_n_invoke_before_triggers(thd, &info, insert_field_list,
713 *values, insert_table,
714 TRG_EVENT_INSERT,
715 insert_table->s->fields))
716 {
717 assert(thd->is_error());
718 /*
719 TODO: Convert warnings to errors if values_list.elements == 1
720 and check that all items return warning in case of problem with
721 storing field.
722 */
723 error= 1;
724 break;
725 }
726
727 res= check_that_all_fields_are_given_values(thd, insert_table,
728 table_list);
729 if (res)
730 {
731 assert(thd->is_error());
732 error= 1;
733 break;
734 }
735 }
736 else
737 {
738 if (lex->used_tables) // Column used in values()
739 restore_record(insert_table, s->default_values); // Get empty record
740 else
741 {
742 TABLE_SHARE *share= insert_table->s;
743
744 /*
745 Fix delete marker. No need to restore rest of record since it will
746 be overwritten by fill_record() anyway (and fill_record() does not
747 use default values in this case).
748 */
749 insert_table->record[0][0]= share->default_values[0];
750
751 /* Fix undefined null_bits. */
752 if (share->null_bytes > 1 && share->last_null_bit_pos)
753 {
754 insert_table->record[0][share->null_bytes - 1]=
755 share->default_values[share->null_bytes - 1];
756 }
757 }
758 if (fill_record_n_invoke_before_triggers(thd, insert_table->field,
759 *values, insert_table,
760 TRG_EVENT_INSERT,
761 insert_table->s->fields))
762 {
763 assert(thd->is_error());
764 error= 1;
765 break;
766 }
767 }
768
769 if ((res= table_list->view_check_option(thd)) == VIEW_CHECK_SKIP)
770 continue;
771 else if (res == VIEW_CHECK_ERROR)
772 {
773 error= 1;
774 break;
775 }
776 error= insert_table->file->ha_upsert(thd,
777 insert_update_list,
778 insert_value_list);
779 if (error == ENOTSUP)
780 error= write_record(thd, insert_table, &info, &update);
781 if (error)
782 break;
783 thd->get_stmt_da()->inc_current_row_for_condition();
784 }
785 } // Statement plan is available within these braces
786
787 error= thd->get_stmt_da()->is_error();
788 free_underlaid_joins(thd, select_lex);
789 joins_freed= true;
790
791 /*
792 Now all rows are inserted. Time to update logs and sends response to
793 user
794 */
795 {
796 /* TODO: Only call this if insert_table->found_next_number_field.*/
797 insert_table->file->ha_release_auto_increment();
798 /*
799 Make sure 'end_bulk_insert()' is called regardless of current error
800 */
801 int loc_error= 0;
802 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
803 loc_error= insert_table->file->ha_end_bulk_insert();
804 /*
805 Report error if 'end_bulk_insert()' failed, and set 'error' to 1
806 */
807 if (loc_error && !error)
808 {
809 /* purecov: begin inspected */
810 myf error_flags= MYF(0);
811 if (insert_table->file->is_fatal_error(loc_error))
812 error_flags|= ME_FATALERROR;
813
814 insert_table->file->print_error(loc_error, error_flags);
815 error= 1;
816 /* purecov: end */
817 }
818 if (duplicates != DUP_ERROR || lex->is_ignore())
819 insert_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
820
821 transactional_table= insert_table->file->has_transactions();
822
823 if ((changed= (info.stats.copied || info.stats.deleted || info.stats.updated)))
824 {
825 /*
826 Invalidate the table in the query cache if something changed.
827 For the transactional algorithm to work the invalidation must be
828 before binlog writing and ha_autocommit_or_rollback
829 */
830 query_cache.invalidate_single(thd, lex->insert_table_leaf, true);
831 DEBUG_SYNC(thd, "wait_after_query_cache_invalidate");
832 }
833
834 if (error <= 0 || thd->get_transaction()->cannot_safely_rollback(
835 Transaction_ctx::STMT))
836 {
837 if (mysql_bin_log.is_open())
838 {
839 int errcode= 0;
840 if (error <= 0)
841 {
842 /*
843 [Guilhem wrote] Temporary errors may have filled
844 thd->net.last_error/errno. For example if there has
845 been a disk full error when writing the row, and it was
846 MyISAM, then thd->net.last_error/errno will be set to
847 "disk full"... and the mysql_file_pwrite() will wait until free
848 space appears, and so when it finishes then the
849 write_row() was entirely successful
850 */
851 /* todo: consider removing */
852 thd->clear_error();
853 }
854 else
855 errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
856
857 /* bug#22725:
858
859 A query which per-row-loop can not be interrupted with
860 KILLED, like INSERT, and that does not invoke stored
861 routines can be binlogged with neglecting the KILLED error.
862
863 If there was no error (error == zero) until after the end of
864 inserting loop the KILLED flag that appeared later can be
865 disregarded since previously possible invocation of stored
866 routines did not result in any error due to the KILLED. In
867 such case the flag is ignored for constructing binlog event.
868 */
869 assert(thd->killed != THD::KILL_BAD_DATA || error > 0);
870 if (thd->binlog_query(THD::ROW_QUERY_TYPE,
871 thd->query().str, thd->query().length,
872 transactional_table, FALSE, FALSE,
873 errcode))
874 error= 1;
875 }
876 }
877 assert(transactional_table || !changed ||
878 thd->get_transaction()->cannot_safely_rollback(
879 Transaction_ctx::STMT));
880 }
881 THD_STAGE_INFO(thd, stage_end);
882 /*
883 We'll report to the client this id:
884 - if the table contains an autoincrement column and we successfully
885 inserted an autogenerated value, the autogenerated value.
886 - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
887 called, X.
888 - if the table contains an autoincrement column, and some rows were
889 inserted, the id of the last "inserted" row (if IGNORE, that value may not
890 have been really inserted but ignored).
891 */
892 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
893 thd->first_successful_insert_id_in_cur_stmt :
894 (thd->arg_of_last_insert_id_function ?
895 thd->first_successful_insert_id_in_prev_stmt :
896 ((insert_table->next_number_field && info.stats.copied) ?
897 insert_table->next_number_field->val_int() : 0));
898 insert_table->next_number_field= 0;
899 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
900 insert_table->auto_increment_field_not_null= FALSE;
901 if (duplicates == DUP_REPLACE &&
902 (!insert_table->triggers ||
903 !insert_table->triggers->has_delete_triggers()))
904 insert_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
905
906 if (thd->is_error())
907 goto exit_without_my_ok;
908
909 ha_rows row_count;
910
911 if (insert_many_values.elements == 1 &&
912 (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
913 {
914 row_count= info.stats.copied + info.stats.deleted +
915 (thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
916 info.stats.touched : info.stats.updated);
917 my_ok(thd, row_count, id);
918 }
919 else
920 {
921 char buff[160];
922 ha_rows updated=
923 thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
924 info.stats.touched : info.stats.updated;
925 if (lex->is_ignore())
926 my_snprintf(buff, sizeof(buff),
927 ER(ER_INSERT_INFO), (long) info.stats.records,
928 (long) (info.stats.records - info.stats.copied),
929 (long) thd->get_stmt_da()->current_statement_cond_count());
930 else
931 my_snprintf(buff, sizeof(buff),
932 ER(ER_INSERT_INFO), (long) info.stats.records,
933 (long) (info.stats.deleted + updated),
934 (long) thd->get_stmt_da()->current_statement_cond_count());
935 row_count= info.stats.copied + info.stats.deleted + updated;
936 my_ok(thd, row_count, id, buff);
937 }
938 thd->updated_row_count+= row_count;
939 DBUG_RETURN(FALSE);
940
941 exit_without_my_ok:
942 thd->lex->clear_values_map();
943 if (!joins_freed)
944 free_underlaid_joins(thd, select_lex);
945 DBUG_RETURN(err);
946 }
947
948
949 /**
950 Additional check for insertability for VIEW
951
952 A view is insertable if the following conditions are true:
953 - All columns being inserted into are from a single table.
954 - All not used columns in table have default values.
955 - All columns in view are distinct (not referring to the same column).
956
957 @param thd thread handler
958 @param[in,out] view reference to view being inserted into.
959 view->contain_auto_increment is true if and only if
960 the view contains an auto_increment field.
961 @param insert_table_ref reference to underlying table being inserted into
962
963 @return false if success, true if error
964 */
965
check_view_insertability(THD * thd,TABLE_LIST * view,const TABLE_LIST * insert_table_ref)966 static bool check_view_insertability(THD *thd, TABLE_LIST *view,
967 const TABLE_LIST *insert_table_ref)
968 {
969 DBUG_ENTER("check_view_insertability");
970
971 const uint num= view->view_query()->select_lex->item_list.elements;
972 TABLE *const table= insert_table_ref->table;
973 MY_BITMAP used_fields;
974 enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
975
976 const uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
977 uint32 *const used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
978 if (!used_fields_buff)
979 DBUG_RETURN(true); /* purecov: inspected */
980
981 assert(view->table == NULL &&
982 table != NULL &&
983 view->field_translation != 0);
984
985 (void) bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
986 bitmap_clear_all(&used_fields);
987
988 view->contain_auto_increment= false;
989
990 thd->mark_used_columns= MARK_COLUMNS_NONE;
991
992 // No privilege checking is done for these columns
993 Column_privilege_tracker column_privilege(thd, 0);
994
995 /* check simplicity and prepare unique test of view */
996 Field_translator *const trans_start= view->field_translation;
997 Field_translator *const trans_end= trans_start + num;
998
999 for (Field_translator *trans= trans_start; trans != trans_end; trans++)
1000 {
1001 if (trans->item == NULL)
1002 continue;
1003 /*
1004 @todo
1005 This fix_fields() call is necessary for execution of prepared statements.
1006 When repeated preparation is eliminated the call can be deleted.
1007 */
1008 if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1009 DBUG_RETURN(true); /* purecov: inspected */
1010
1011 Item_field *field;
1012 /* simple SELECT list entry (field without expression) */
1013 if (!(field= trans->item->field_for_view_update()))
1014 DBUG_RETURN(true);
1015
1016 if (field->field->unireg_check == Field::NEXT_NUMBER)
1017 view->contain_auto_increment= true;
1018 /* prepare unique test */
1019 /*
1020 remove collation (or other transparent for update function) if we have
1021 it
1022 */
1023 trans->item= field;
1024 }
1025 thd->mark_used_columns= save_mark_used_columns;
1026
1027 /* unique test */
1028 for (Field_translator *trans= trans_start; trans != trans_end; trans++)
1029 {
1030 if (trans->item == NULL)
1031 continue;
1032 /* Thanks to test above, we know that all columns are of type Item_field */
1033 Item_field *field= (Item_field *)trans->item;
1034 /* check fields belong to table in which we are inserting */
1035 if (field->field->table == table &&
1036 bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1037 DBUG_RETURN(true);
1038 }
1039
1040 DBUG_RETURN(false);
1041 }
1042
1043
1044 /**
1045 Recursive helper function for resolving join conditions for
1046 insertion into view for prepared statements.
1047
1048 @param thd Thread handler
1049 @param tr Table structure which is traversed recursively
1050
1051 @return false if success, true if error
1052 */
fix_join_cond_for_insert(THD * thd,TABLE_LIST * tr)1053 static bool fix_join_cond_for_insert(THD *thd, TABLE_LIST *tr)
1054 {
1055 if (tr->join_cond() && !tr->join_cond()->fixed)
1056 {
1057 Column_privilege_tracker column_privilege(thd, SELECT_ACL);
1058
1059 if (tr->join_cond()->fix_fields(thd, NULL))
1060 return true; /* purecov: inspected */
1061 }
1062
1063 if (tr->nested_join == NULL)
1064 return false;
1065
1066 List_iterator<TABLE_LIST> li(tr->nested_join->join_list);
1067 TABLE_LIST *ti;
1068
1069 while ((ti= li++))
1070 {
1071 if (fix_join_cond_for_insert(thd, ti))
1072 return true; /* purecov: inspected */
1073 }
1074 return false;
1075 }
1076
1077 /**
1078 Check if table can be updated
1079
1080 @param thd Thread handle
1081 @param table_list Table reference
1082 @param fields List of fields to be inserted
1083 @param select_insert True if processing INSERT ... SELECT statement
1084
1085 @return false if success, true if error
1086 */
1087
1088 bool
mysql_prepare_insert_check_table(THD * thd,TABLE_LIST * table_list,List<Item> & fields,bool select_insert)1089 Sql_cmd_insert_base::mysql_prepare_insert_check_table(THD *thd,
1090 TABLE_LIST *table_list,
1091 List<Item> &fields,
1092 bool select_insert)
1093 {
1094 DBUG_ENTER("mysql_prepare_insert_check_table");
1095
1096 SELECT_LEX *const select= thd->lex->select_lex;
1097 const bool insert_into_view= table_list->is_view();
1098
1099 if (select->setup_tables(thd, table_list, select_insert))
1100 DBUG_RETURN(true); /* purecov: inspected */
1101
1102 if (insert_into_view)
1103 {
1104 // Allowing semi-join would transform this table into a "join view"
1105 if (table_list->resolve_derived(thd, false))
1106 DBUG_RETURN(true);
1107
1108 if (select->merge_derived(thd, table_list))
1109 DBUG_RETURN(true); /* purecov: inspected */
1110
1111 /*
1112 On second preparation, we may need to resolve view condition generated
1113 when merging the view.
1114 */
1115 if (!select->first_execution && table_list->is_merged() &&
1116 fix_join_cond_for_insert(thd, table_list))
1117 DBUG_RETURN(true); /* purecov: inspected */
1118 }
1119
1120 if (!table_list->is_insertable())
1121 {
1122 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
1123 DBUG_RETURN(true);
1124 }
1125
1126 // Allow semi-join for selected tables containing subqueries
1127 if (select->derived_table_count && select->resolve_derived(thd, true))
1128 DBUG_RETURN(true);
1129
1130 /*
1131 First table in list is the one being inserted into, requires INSERT_ACL.
1132 All other tables require SELECT_ACL only.
1133 */
1134 if (select->derived_table_count &&
1135 select->check_view_privileges(thd, INSERT_ACL, SELECT_ACL))
1136 DBUG_RETURN(true);
1137
1138 // Precompute and store the row types of NATURAL/USING joins.
1139 if (setup_natural_join_row_types(thd, select->join_list, &select->context))
1140 DBUG_RETURN(true);
1141
1142 if (insert_into_view && !fields.elements)
1143 {
1144 empty_field_list_on_rset= true;
1145 if (table_list->is_multiple_tables())
1146 {
1147 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1148 table_list->view_db.str, table_list->view_name.str);
1149 DBUG_RETURN(true);
1150 }
1151 if (insert_view_fields(thd, &fields, table_list))
1152 DBUG_RETURN(true);
1153 /*
1154 Item_fields inserted above from field_translation list have been
1155 already fixed in resolved_derived(), thus setup_fields() in
1156 check_insert_fields() will not process them, not mark them in write_set;
1157 we have to do it:
1158 */
1159 bitmap_set_all(table_list->updatable_base_table()->table->write_set);
1160 }
1161
1162 DBUG_RETURN(false);
1163 }
1164
1165
1166 /**
1167 Get extra info for tables we insert into
1168
1169 @param table table(TABLE object) we insert into,
1170 might be NULL in case of view
1171 @param table(TABLE_LIST object) or view we insert into
1172 */
1173
prepare_for_positional_update(TABLE * table,TABLE_LIST * tables)1174 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1175 {
1176 if (table)
1177 {
1178 table->prepare_for_position();
1179 return;
1180 }
1181
1182 assert(tables->is_view());
1183 List_iterator<TABLE_LIST> it(*tables->view_tables);
1184 TABLE_LIST *tbl;
1185 while ((tbl= it++))
1186 prepare_for_positional_update(tbl->table, tbl);
1187
1188 return;
1189 }
1190
1191
1192 /**
1193 Prepare items in INSERT statement
1194
1195 @param thd Thread handler
1196 @param table_list Global/local table list
1197 @param values List of values to be inserted
1198 @param duplic What to do on duplicate key error
1199 @param where Where clause (for insert ... select)
1200 @param select_insert TRUE if INSERT ... SELECT statement
1201
1202 @todo (in far future)
1203 In cases of:
1204 INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1205 ON DUPLICATE KEY ...
1206 we should be able to refer to sum1 in the ON DUPLICATE KEY part
1207
1208 WARNING
1209 You MUST set table->insert_values to 0 after calling this function
1210 before releasing the table object.
1211
1212 @return false if success, true if error
1213 */
1214
mysql_prepare_insert(THD * thd,TABLE_LIST * table_list,List_item * values,bool select_insert)1215 bool Sql_cmd_insert_base::mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1216 List_item *values,
1217 bool select_insert)
1218 {
1219 DBUG_ENTER("mysql_prepare_insert");
1220
1221 // INSERT should have a SELECT or VALUES clause
1222 assert (!select_insert || !values);
1223
1224 // Number of update fields must match number of update values
1225 assert(insert_update_list.elements == insert_value_list.elements);
1226
1227 LEX * const lex= thd->lex;
1228 SELECT_LEX *const select_lex= lex->select_lex;
1229 Name_resolution_context *const context= &select_lex->context;
1230 Name_resolution_context_state ctx_state;
1231 const bool insert_into_view= table_list->is_view();
1232 bool res= false;
1233
1234 DBUG_PRINT("enter", ("table_list 0x%lx, view %d",
1235 (ulong)table_list,
1236 (int)insert_into_view));
1237 /*
1238 For subqueries in VALUES() we should not see the table in which we are
1239 inserting (for INSERT ... SELECT this is done by changing table_list,
1240 because INSERT ... SELECT share SELECT_LEX it with SELECT.
1241 */
1242 if (!select_insert)
1243 {
1244 for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1245 un;
1246 un= un->next_unit())
1247 {
1248 for (SELECT_LEX *sl= un->first_select();
1249 sl;
1250 sl= sl->next_select())
1251 {
1252 sl->context.outer_context= 0;
1253 }
1254 }
1255 }
1256
1257 if (mysql_prepare_insert_check_table(thd, table_list, insert_field_list,
1258 select_insert))
1259 DBUG_RETURN(true);
1260
1261 // REPLACE for a JOIN view is not permitted.
1262 if (table_list->is_multiple_tables() && duplicates == DUP_REPLACE)
1263 {
1264 my_error(ER_VIEW_DELETE_MERGE_VIEW, MYF(0),
1265 table_list->view_db.str, table_list->view_name.str);
1266 DBUG_RETURN(true);
1267 }
1268
1269 if (duplicates == DUP_UPDATE)
1270 {
1271 /* it should be allocated before Item::fix_fields() */
1272 if (table_list->set_insert_values(thd->mem_root))
1273 DBUG_RETURN(true); /* purecov: inspected */
1274 }
1275
1276 // Save the state of the current name resolution context.
1277 ctx_state.save_state(context, table_list);
1278
1279 // Prepare the lists of columns and values in the statement.
1280 if (values)
1281 {
1282 // if we have INSERT ... VALUES () we cannot have a GROUP BY clause
1283 assert (!select_lex->group_list.elements);
1284
1285 /*
1286 Perform name resolution only in the first table - 'table_list',
1287 which is the table that is inserted into.
1288 */
1289 assert(table_list->next_local == NULL);
1290 table_list->next_local= NULL;
1291 context->resolve_in_table_list_only(table_list);
1292
1293 if (!res)
1294 res= check_insert_fields(thd, context->table_list, insert_field_list,
1295 values->elements, true, !insert_into_view);
1296 table_map map= 0;
1297 if (!res)
1298 map= lex->insert_table_leaf->map();
1299
1300 // values is reset here to cover all the rows in the VALUES-list.
1301 List_iterator_fast<List_item> its(insert_many_values);
1302
1303 // Check whether all rows have the same number of fields.
1304 const uint value_count= values->elements;
1305 ulong counter= 0;
1306 while ((values= its++))
1307 {
1308 counter++;
1309 if (values->elements != value_count)
1310 {
1311 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
1312 DBUG_RETURN(true);
1313 }
1314
1315 if (!res)
1316 res= setup_fields(thd, Ref_ptr_array(), *values, SELECT_ACL, NULL,
1317 false, false);
1318 if (!res)
1319 res= check_valid_table_refs(table_list, *values, map);
1320
1321 if (!res && lex->insert_table_leaf->table->has_gcol())
1322 res= validate_gc_assignment(thd, &insert_field_list, values,
1323 lex->insert_table_leaf->table);
1324 }
1325 its.rewind();
1326 values= its++;
1327
1328 if (!res && duplicates == DUP_UPDATE)
1329 {
1330 #ifndef NO_EMBEDDED_ACCESS_CHECKS
1331 table_list->set_want_privilege(UPDATE_ACL);
1332 #endif
1333 // Setup the columns to be updated
1334 res= setup_fields(thd, Ref_ptr_array(),
1335 insert_update_list, UPDATE_ACL, NULL, false, true);
1336 if (!res)
1337 res= check_valid_table_refs(table_list, insert_update_list, map);
1338
1339 // Setup the corresponding values
1340 thd->lex->in_update_value_clause= true;
1341 if (!res)
1342 res= setup_fields(thd, Ref_ptr_array(), insert_value_list, SELECT_ACL,
1343 NULL, false, false);
1344 thd->lex->in_update_value_clause= false;
1345
1346 if (!res)
1347 res= check_valid_table_refs(table_list, insert_value_list, map);
1348
1349 if (!res && lex->insert_table_leaf->table->has_gcol())
1350 res= validate_gc_assignment(thd, &insert_update_list,
1351 &insert_value_list,
1352 lex->insert_table_leaf->table);
1353 }
1354 }
1355 else if (thd->stmt_arena->is_stmt_prepare())
1356 {
1357 /*
1358 This section of code is more or less a duplicate of the code in
1359 Query_result_insert::prepare, and the 'if' branch above.
1360 @todo Consolidate these three sections into one.
1361 */
1362 /*
1363 Perform name resolution only in the first table - 'table_list',
1364 which is the table that is inserted into.
1365 */
1366 table_list->next_local= NULL;
1367 thd->dup_field= NULL;
1368 context->resolve_in_table_list_only(table_list);
1369
1370 /*
1371 When processing a prepared INSERT ... SELECT statement,
1372 mysql_prepare_insert() is called from
1373 mysql_insert_select_prepare_tester(), when the values list (aka the
1374 SELECT list from the SELECT) is not resolved yet, so pass "false"
1375 for value_count_known.
1376 */
1377 res= check_insert_fields(thd, context->table_list, insert_field_list, 0,
1378 false, !insert_into_view);
1379 table_map map= 0;
1380 if (!res)
1381 map= lex->insert_table_leaf->map();
1382
1383 if (!res && lex->insert_table_leaf->table->vfield)
1384 res= validate_gc_assignment(thd, &insert_field_list, values,
1385 lex->insert_table_leaf->table);
1386
1387 if (!res && duplicates == DUP_UPDATE)
1388 {
1389 #ifndef NO_EMBEDDED_ACCESS_CHECKS
1390 table_list->set_want_privilege(UPDATE_ACL);
1391 #endif
1392 // Setup the columns to be modified
1393 res= setup_fields(thd, Ref_ptr_array(),
1394 insert_update_list, UPDATE_ACL, NULL, false, true);
1395 if (!res)
1396 res= check_valid_table_refs(table_list, insert_update_list, map);
1397
1398 if (!res && lex->insert_table_leaf->table->vfield)
1399 res= validate_gc_assignment(thd, &insert_update_list,
1400 &insert_value_list,
1401 lex->insert_table_leaf->table);
1402 assert(!table_list->next_name_resolution_table);
1403 if (select_lex->group_list.elements == 0 && !select_lex->with_sum_func)
1404 {
1405 /*
1406 There are two separata name resolution contexts:
1407 the INSERT table and the tables in the SELECT expression
1408 Make a single context out of them by concatenating the lists:
1409 */
1410 table_list->next_name_resolution_table=
1411 ctx_state.get_first_name_resolution_table();
1412 }
1413 thd->lex->in_update_value_clause= true;
1414 if (!res)
1415 res= setup_fields(thd, Ref_ptr_array(), insert_value_list,
1416 SELECT_ACL, NULL, false, false);
1417 thd->lex->in_update_value_clause= false;
1418
1419 /*
1420 Notice that there is no need to apply the Item::update_value_transformer
1421 here, as this will be done during EXECUTE in
1422 Query_result_insert::prepare().
1423 */
1424 }
1425 }
1426
1427 // Restore the current name resolution context
1428 ctx_state.restore_state(context, table_list);
1429
1430 if (res)
1431 DBUG_RETURN(res);
1432
1433 if (!select_insert)
1434 {
1435 TABLE_LIST *const duplicate=
1436 unique_table(thd, lex->insert_table_leaf, table_list->next_global, true);
1437 if (duplicate)
1438 {
1439 update_non_unique_table_error(table_list, "INSERT", duplicate);
1440 DBUG_RETURN(true);
1441 }
1442 }
1443
1444 if (table_list->is_merged())
1445 {
1446 Column_privilege_tracker column_privilege(thd, SELECT_ACL);
1447
1448 if (table_list->prepare_check_option(thd))
1449 DBUG_RETURN(true);
1450
1451 if (duplicates == DUP_REPLACE &&
1452 table_list->prepare_replace_filter(thd))
1453 DBUG_RETURN(true);
1454 }
1455
1456 if (!select_insert && select_lex->apply_local_transforms(thd, false))
1457 DBUG_RETURN(true);
1458
1459 DBUG_RETURN(false);
1460 }
1461
1462
1463 /* Check if there is more uniq keys after field */
1464
last_uniq_key(TABLE * table,uint keynr)1465 static int last_uniq_key(TABLE *table,uint keynr)
1466 {
1467 /*
1468 When an underlying storage engine informs that the unique key
1469 conflicts are not reported in the ascending order by setting
1470 the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1471 information to determine the last key conflict.
1472
1473 The information about the last key conflict will be used to
1474 do a replace of the new row on the conflicting row, rather
1475 than doing a delete (of old row) + insert (of new row).
1476
1477 Hence check for this flag and disable replacing the last row
1478 by returning 0 always. Returning 0 will result in doing
1479 a delete + insert always.
1480 */
1481 if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1482 return 0;
1483
1484 while (++keynr < table->s->keys)
1485 if (table->key_info[keynr].flags & HA_NOSAME)
1486 return 0;
1487 return 1;
1488 }
1489
1490
1491 /**
1492 Write a record to table with optional deletion of conflicting records,
1493 invoke proper triggers if needed.
1494
1495 SYNOPSIS
1496 write_record()
1497 thd - thread context
1498 table - table to which record should be written
1499 info - COPY_INFO structure describing handling of duplicates
1500 and which is used for counting number of records inserted
1501 and deleted.
1502 update - COPY_INFO structure describing the UPDATE part (only used for
1503 INSERT ON DUPLICATE KEY UPDATE)
1504
1505 @note
1506
1507 Once this record is written to the table buffer, any AFTER INSERT trigger
1508 will be invoked. If instead of inserting a new record we end up updating an
1509 old one, both ON UPDATE triggers will fire instead. Similarly both ON
1510 DELETE triggers will be invoked if are to delete conflicting records.
1511
1512 Call thd->transaction.stmt.mark_modified_non_trans_table() if table is a
1513 non-transactional table.
1514
1515 RETURN VALUE
1516 0 - success
1517 non-0 - error
1518 */
1519
write_record(THD * thd,TABLE * table,COPY_INFO * info,COPY_INFO * update)1520 int write_record(THD *thd, TABLE *table, COPY_INFO *info, COPY_INFO *update)
1521 {
1522 int error, trg_error= 0;
1523 char *key=0;
1524 MY_BITMAP *save_read_set, *save_write_set;
1525 ulonglong prev_insert_id= table->file->next_insert_id;
1526 ulonglong insert_id_for_cur_row= 0;
1527 MEM_ROOT mem_root;
1528 DBUG_ENTER("write_record");
1529
1530 /* Here we are using separate MEM_ROOT as this memory should be freed once we
1531 exit write_record() function. This is marked as not instumented as it is
1532 allocated for very short time in a very specific case.
1533 */
1534 init_sql_alloc(PSI_NOT_INSTRUMENTED, &mem_root, 256, 0);
1535 info->stats.records++;
1536 save_read_set= table->read_set;
1537 save_write_set= table->write_set;
1538
1539 info->set_function_defaults(table);
1540
1541 const enum_duplicates duplicate_handling= info->get_duplicate_handling();
1542
1543 if (duplicate_handling == DUP_REPLACE || duplicate_handling == DUP_UPDATE)
1544 {
1545 assert(duplicate_handling != DUP_UPDATE || update != NULL);
1546 while ((error=table->file->ha_write_row(table->record[0])))
1547 {
1548 uint key_nr;
1549 /*
1550 If we do more than one iteration of this loop, from the second one the
1551 row will have an explicit value in the autoinc field, which was set at
1552 the first call of handler::update_auto_increment(). So we must save
1553 the autogenerated value to avoid thd->insert_id_for_cur_row to become
1554 0.
1555 */
1556 if (table->file->insert_id_for_cur_row > 0)
1557 insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1558 else
1559 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1560 bool is_duplicate_key_error;
1561 if (!table->file->is_ignorable_error(error))
1562 goto err;
1563 is_duplicate_key_error= (error == HA_ERR_FOUND_DUPP_KEY ||
1564 error == HA_ERR_FOUND_DUPP_UNIQUE);
1565 if (!is_duplicate_key_error)
1566 {
1567 /*
1568 We come here when we had an ignorable error which is not a duplicate
1569 key error. In this we ignore error if ignore flag is set, otherwise
1570 report error as usual. We will not do any duplicate key processing.
1571 */
1572 info->last_errno= error;
1573 table->file->print_error(error, MYF(0));
1574 /*
1575 If IGNORE option is used, handler errors will be downgraded
1576 to warnings and don't have to stop the iteration.
1577 */
1578 if (thd->is_error())
1579 goto before_trg_err;
1580 goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1581 }
1582 if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
1583 {
1584 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1585 goto err;
1586 }
1587 /*
1588 key index value is either valid in the range [0-MAX_KEY) or
1589 has value MAX_KEY as a marker for the case when no information
1590 about key can be found. In the last case we have to require
1591 that storage engine has the flag HA_DUPLICATE_POS turned on.
1592 If this invariant is false then assert will crash
1593 the server built in debug mode. For the server that was built
1594 without DEBUG we have additional check for the value of key_nr
1595 in the code below in order to report about error in any case.
1596 */
1597 assert(key_nr != MAX_KEY ||
1598 (key_nr == MAX_KEY &&
1599 (table->file->ha_table_flags() & HA_DUPLICATE_POS)));
1600
1601 DEBUG_SYNC(thd, "write_row_replace");
1602
1603 /* Read all columns for the row we are going to replace */
1604 table->use_all_columns();
1605 /*
1606 Don't allow REPLACE to replace a row when a auto_increment column
1607 was used. This ensures that we don't get a problem when the
1608 whole range of the key has been used.
1609 */
1610 if (duplicate_handling == DUP_REPLACE &&
1611 table->next_number_field &&
1612 key_nr == table->s->next_number_index &&
1613 (insert_id_for_cur_row > 0))
1614 goto err;
1615 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1616 {
1617 if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1618 goto err;
1619 }
1620 /*
1621 If the key index is equal to MAX_KEY it's treated as unknown key case
1622 and we shouldn't try to locate key info.
1623 */
1624 else if (key_nr < MAX_KEY)
1625 {
1626 if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1627 {
1628 error=my_errno();
1629 goto err;
1630 }
1631
1632 if (!key)
1633 {
1634 if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
1635 MAX_KEY_LENGTH)))
1636 {
1637 error=ENOMEM;
1638 goto err;
1639 }
1640 }
1641 /*
1642 If we convert INSERT operation internally to an UPDATE.
1643 An INSERT operation may update table->vfield for BLOB fields,
1644 So here we recalculate data for generated columns.
1645 */
1646 if (table->vfield) {
1647 update_generated_write_fields(table->write_set, table);
1648 }
1649 key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1650 if ((error=(table->file->ha_index_read_idx_map(table->record[1],key_nr,
1651 (uchar*) key, HA_WHOLE_KEY,
1652 HA_READ_KEY_EXACT))))
1653 goto err;
1654 }
1655 else
1656 {
1657 /*
1658 For the server built in non-debug mode returns error if
1659 handler::get_dup_key() returned MAX_KEY as the value of key index.
1660 */
1661 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1662 goto err;
1663 }
1664 if (duplicate_handling == DUP_UPDATE)
1665 {
1666 int res= 0;
1667 /*
1668 We don't check for other UNIQUE keys - the first row
1669 that matches, is updated. If update causes a conflict again,
1670 an error is returned
1671 */
1672 assert(table->insert_values != NULL);
1673 /*
1674 The insert has failed, store the insert_id generated for
1675 this row to be re-used for the next insert.
1676 */
1677 if (insert_id_for_cur_row > 0) prev_insert_id = insert_id_for_cur_row;
1678
1679 store_record(table,insert_values);
1680 /*
1681 Special check for BLOB/GEOMETRY field in statements with
1682 "ON DUPLICATE KEY UPDATE" clause.
1683 See mysql_prepare_blob_values() function for more details.
1684 */
1685 if (mysql_prepare_blob_values(thd,
1686 *update->get_changed_columns(),
1687 &mem_root))
1688 goto before_trg_err;
1689 restore_record(table,record[1]);
1690 assert(update->get_changed_columns()->elements ==
1691 update->update_values->elements);
1692 if (fill_record_n_invoke_before_triggers(thd, update,
1693 *update->get_changed_columns(),
1694 *update->update_values,
1695 table, TRG_EVENT_UPDATE, 0))
1696 goto before_trg_err;
1697
1698 bool insert_id_consumed= false;
1699 if (// UPDATE clause specifies a value for the auto increment field
1700 table->auto_increment_field_not_null &&
1701 // An auto increment value has been generated for this row
1702 (insert_id_for_cur_row > 0))
1703 {
1704 // After-update value:
1705 const ulonglong auto_incr_val= table->next_number_field->val_int();
1706 if (auto_incr_val == insert_id_for_cur_row)
1707 {
1708 // UPDATE wants to use the generated value
1709 insert_id_consumed= true;
1710 }
1711 else if (table->file->auto_inc_interval_for_cur_row.
1712 in_range(auto_incr_val))
1713 {
1714 /*
1715 UPDATE wants to use one auto generated value which we have already
1716 reserved for another (previous or following) row. That may cause
1717 a duplicate key error if we later try to insert the reserved
1718 value. Such conflicts on auto generated values would be strange
1719 behavior, so we return a clear error now.
1720 */
1721 my_error(ER_AUTO_INCREMENT_CONFLICT, MYF(0));
1722 goto before_trg_err;
1723 }
1724 }
1725
1726 if (!insert_id_consumed)
1727 table->file->restore_auto_increment(prev_insert_id);
1728
1729 /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1730 {
1731 const TABLE_LIST *inserted_view=
1732 table->pos_in_table_list->belong_to_view;
1733 if (inserted_view != NULL)
1734 {
1735 res= inserted_view->view_check_option(thd);
1736 if (res == VIEW_CHECK_SKIP)
1737 goto ok_or_after_trg_err;
1738 if (res == VIEW_CHECK_ERROR)
1739 goto before_trg_err;
1740 }
1741 }
1742
1743 info->stats.touched++;
1744 if (!records_are_comparable(table) || compare_records(table))
1745 {
1746 // Handle the INSERT ON DUPLICATE KEY UPDATE operation
1747 update->set_function_defaults(table);
1748
1749 if ((error=table->file->ha_update_row(table->record[1],
1750 table->record[0])) &&
1751 error != HA_ERR_RECORD_IS_THE_SAME)
1752 {
1753 info->last_errno= error;
1754 myf error_flags= MYF(0);
1755 if (table->file->is_fatal_error(error))
1756 error_flags|= ME_FATALERROR;
1757 table->file->print_error(error, error_flags);
1758 /*
1759 If IGNORE option is used, handler errors will be downgraded
1760 to warnings and don't have to stop the iteration.
1761 */
1762 if (thd->is_error())
1763 goto before_trg_err;
1764 goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1765 }
1766
1767 if (error != HA_ERR_RECORD_IS_THE_SAME)
1768 info->stats.updated++;
1769 else
1770 error= 0;
1771 /*
1772 If ON DUP KEY UPDATE updates a row instead of inserting one, it's
1773 like a regular UPDATE statement: it should not affect the value of a
1774 next SELECT LAST_INSERT_ID() or mysql_insert_id().
1775 Except if LAST_INSERT_ID(#) was in the INSERT query, which is
1776 handled separately by THD::arg_of_last_insert_id_function.
1777 */
1778 insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1779 info->stats.copied++;
1780 }
1781
1782 // Execute the 'AFTER, ON UPDATE' trigger
1783 trg_error= (table->triggers &&
1784 table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1785 TRG_ACTION_AFTER, TRUE));
1786 goto ok_or_after_trg_err;
1787 }
1788 else /* DUP_REPLACE */
1789 {
1790 TABLE_LIST *view= table->pos_in_table_list->belong_to_view;
1791
1792 if (view && view->replace_filter)
1793 {
1794 const size_t record_length= table->s->reclength;
1795
1796 void *record0_saved= my_malloc(PSI_NOT_INSTRUMENTED, record_length,
1797 MYF(MY_WME));
1798
1799 if (!record0_saved)
1800 {
1801 error= ENOMEM;
1802 goto err;
1803 }
1804
1805 // Save the record used for comparison.
1806 memcpy(record0_saved, table->record[0], record_length);
1807
1808 // Preparing the record for comparison.
1809 memcpy(table->record[0], table->record[1], record_length);
1810
1811 // Checking if the row being conflicted is visible by the view.
1812 bool found_row_in_view= view->replace_filter->val_int();
1813
1814 // Restoring the record back.
1815 memcpy(table->record[0], record0_saved, record_length);
1816
1817 my_free(record0_saved);
1818
1819 if (!found_row_in_view)
1820 {
1821 my_error(ER_REPLACE_INACCESSIBLE_ROWS, MYF(0));
1822 goto err;
1823 }
1824 }
1825
1826 /*
1827 The manual defines the REPLACE semantics that it is either
1828 an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
1829 InnoDB do not function in the defined way if we allow MySQL
1830 to convert the latter operation internally to an UPDATE.
1831 We also should not perform this conversion if we have
1832 timestamp field with ON UPDATE which is different from DEFAULT.
1833 Another case when conversion should not be performed is when
1834 we have ON DELETE trigger on table so user may notice that
1835 we cheat here. Note that it is ok to do such conversion for
1836 tables which have ON UPDATE but have no ON DELETE triggers,
1837 we just should not expose this fact to users by invoking
1838 ON UPDATE triggers.
1839 */
1840
1841 if (last_uniq_key(table,key_nr) &&
1842 !table->file->referenced_by_foreign_key() &&
1843 (!table->triggers || !table->triggers->has_delete_triggers()))
1844 {
1845 if ((error=table->file->ha_update_row(table->record[1],
1846 table->record[0])) &&
1847 error != HA_ERR_RECORD_IS_THE_SAME)
1848 goto err;
1849 if (error != HA_ERR_RECORD_IS_THE_SAME)
1850 info->stats.deleted++;
1851 else
1852 error= 0;
1853 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1854 /*
1855 Since we pretend that we have done insert we should call
1856 its after triggers.
1857 */
1858 goto after_trg_n_copied_inc;
1859 }
1860 else
1861 {
1862 if (table->triggers &&
1863 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1864 TRG_ACTION_BEFORE, TRUE))
1865 goto before_trg_err;
1866 if ((error=table->file->ha_delete_row(table->record[1])))
1867 goto err;
1868 info->stats.deleted++;
1869 if (!table->file->has_transactions())
1870 thd->get_transaction()->mark_modified_non_trans_table(
1871 Transaction_ctx::STMT);
1872 if (table->triggers &&
1873 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1874 TRG_ACTION_AFTER, TRUE))
1875 {
1876 trg_error= 1;
1877 goto ok_or_after_trg_err;
1878 }
1879 /* Let us attempt do write_row() once more */
1880 }
1881 }
1882 }
1883
1884 /*
1885 If more than one iteration of the above while loop is done, from the second
1886 one the row being inserted will have an explicit value in the autoinc field,
1887 which was set at the first call of handler::update_auto_increment(). This
1888 value is saved to avoid thd->insert_id_for_cur_row becoming 0. Use this saved
1889 autoinc value.
1890 */
1891 if (table->file->insert_id_for_cur_row == 0)
1892 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1893
1894 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1895 /*
1896 Restore column maps if they where replaced during an duplicate key
1897 problem.
1898 */
1899 if (table->read_set != save_read_set ||
1900 table->write_set != save_write_set)
1901 table->column_bitmaps_set(save_read_set, save_write_set);
1902 }
1903 else if ((error=table->file->ha_write_row(table->record[0])))
1904 {
1905 DEBUG_SYNC(thd, "write_row_noreplace");
1906 info->last_errno= error;
1907 myf error_flags= MYF(0);
1908 if (table->file->is_fatal_error(error))
1909 error_flags|= ME_FATALERROR;
1910 table->file->print_error(error, error_flags);
1911 /*
1912 If IGNORE option is used, handler errors will be downgraded
1913 to warnings and don't have to stop the iteration.
1914 */
1915 if (thd->is_error())
1916 goto before_trg_err;
1917 table->file->restore_auto_increment(prev_insert_id);
1918 goto ok_or_after_trg_err;
1919 }
1920
1921 after_trg_n_copied_inc:
1922 info->stats.copied++;
1923 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1924 trg_error= (table->triggers &&
1925 table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
1926 TRG_ACTION_AFTER, TRUE));
1927
1928 ok_or_after_trg_err:
1929 if (key)
1930 my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1931 if (!table->file->has_transactions())
1932 thd->get_transaction()->mark_modified_non_trans_table(
1933 Transaction_ctx::STMT);
1934 free_root(&mem_root, MYF(0));
1935 DBUG_RETURN(trg_error);
1936
1937 err:
1938 {
1939 myf error_flags= MYF(0); /**< Flag for fatal errors */
1940 info->last_errno= error;
1941 assert(thd->lex->current_select() != NULL);
1942 if (table->file->is_fatal_error(error))
1943 error_flags|= ME_FATALERROR;
1944
1945 table->file->print_error(error, error_flags);
1946 }
1947
1948 before_trg_err:
1949 table->file->restore_auto_increment(prev_insert_id);
1950 if (key)
1951 my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1952 table->column_bitmaps_set(save_read_set, save_write_set);
1953 free_root(&mem_root, MYF(0));
1954 DBUG_RETURN(1);
1955 }
1956
1957
1958 /******************************************************************************
1959 Check that all fields with arn't null_fields are used
1960 ******************************************************************************/
1961
check_that_all_fields_are_given_values(THD * thd,TABLE * entry,TABLE_LIST * table_list)1962 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
1963 TABLE_LIST *table_list)
1964 {
1965 int err= 0;
1966 MY_BITMAP *write_set= entry->fields_set_during_insert;
1967
1968 for (Field **field=entry->field ; *field ; field++)
1969 {
1970 if (!bitmap_is_set(write_set, (*field)->field_index) &&
1971 ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1972 ((*field)->real_type() != MYSQL_TYPE_ENUM))
1973 {
1974 bool view= false;
1975 if (table_list)
1976 {
1977 table_list= table_list->top_table();
1978 view= table_list->is_view();
1979 }
1980 if (view)
1981 (*field)->set_warning(Sql_condition::SL_WARNING,
1982 ER_NO_DEFAULT_FOR_VIEW_FIELD, 1,
1983 table_list->view_db.str,
1984 table_list->view_name.str);
1985 else
1986 (*field)->set_warning(Sql_condition::SL_WARNING,
1987 ER_NO_DEFAULT_FOR_FIELD, 1);
1988 err= 1;
1989 }
1990 }
1991 bitmap_clear_all(write_set);
1992 return (!thd->lex->is_ignore() && thd->is_strict_mode()) ? err : 0;
1993 }
1994
1995
1996 /***************************************************************************
1997 Store records in INSERT ... SELECT *
1998 ***************************************************************************/
1999
2000
2001 /*
2002 make insert specific preparation and checks after opening tables
2003
2004 SYNOPSIS
2005 mysql_insert_select_prepare()
2006 thd thread handler
2007
2008 RETURN
2009 FALSE OK
2010 TRUE Error
2011 */
2012
mysql_insert_select_prepare(THD * thd)2013 bool Sql_cmd_insert_select::mysql_insert_select_prepare(THD *thd)
2014 {
2015 LEX *lex= thd->lex;
2016 SELECT_LEX *select_lex= lex->select_lex;
2017 DBUG_ENTER("mysql_insert_select_prepare");
2018
2019 /*
2020 SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
2021 clause if table is VIEW
2022 */
2023 if (mysql_prepare_insert(thd, lex->query_tables, NULL, true))
2024 DBUG_RETURN(true);
2025
2026 /*
2027 exclude first table from leaf tables list, because it belong to
2028 INSERT
2029 */
2030 assert(select_lex->leaf_tables != NULL);
2031 assert(lex->insert_table == select_lex->leaf_tables->top_table());
2032
2033 select_lex->leaf_tables= lex->insert_table->next_local;
2034 if (select_lex->leaf_tables != NULL)
2035 select_lex->leaf_tables= select_lex->leaf_tables->first_leaf_table();
2036 select_lex->leaf_table_count-=
2037 lex->insert_table->is_view() ? lex->insert_table->leaf_tables_count() : 1;
2038 DBUG_RETURN(false);
2039 }
2040
2041
prepare(List<Item> & values,SELECT_LEX_UNIT * u)2042 int Query_result_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2043 {
2044 DBUG_ENTER("Query_result_insert::prepare");
2045
2046 LEX *const lex= thd->lex;
2047 bool res;
2048 SELECT_LEX *const lex_current_select_save= lex->current_select();
2049 const enum_duplicates duplicate_handling= info.get_duplicate_handling();
2050
2051 unit= u;
2052
2053 /*
2054 Since table in which we are going to insert is added to the first
2055 select, LEX::current_select() should point to the first select while
2056 we are fixing fields from insert list.
2057 */
2058 lex->set_current_select(lex->select_lex);
2059
2060 res= check_insert_fields(thd, table_list, *fields, values.elements, true,
2061 !insert_into_view);
2062 if (!res)
2063 res= setup_fields(thd, Ref_ptr_array(), values, SELECT_ACL, NULL,
2064 false, false);
2065
2066 if (!res && lex->insert_table_leaf->table->has_gcol())
2067 res= validate_gc_assignment(thd, fields, &values,
2068 lex->insert_table_leaf->table);
2069
2070 if (duplicate_handling == DUP_UPDATE && !res)
2071 {
2072 Name_resolution_context *const context= &lex->select_lex->context;
2073 Name_resolution_context_state ctx_state;
2074
2075 /* Save the state of the current name resolution context. */
2076 ctx_state.save_state(context, table_list);
2077
2078 /* Perform name resolution only in the first table - 'table_list'. */
2079 table_list->next_local= NULL;
2080 context->resolve_in_table_list_only(table_list);
2081
2082 #ifndef NO_EMBEDDED_ACCESS_CHECKS
2083 table_list->set_want_privilege(UPDATE_ACL);
2084 #endif
2085 if (!res)
2086 res= setup_fields(thd, Ref_ptr_array(), *update.get_changed_columns(),
2087 UPDATE_ACL, NULL, false, true);
2088
2089 if (!res && lex->insert_table_leaf->table->has_gcol())
2090 res= validate_gc_assignment(thd, update.get_changed_columns(),
2091 update.update_values,
2092 lex->insert_table_leaf->table);
2093 /*
2094 When we are not using GROUP BY and there are no ungrouped aggregate
2095 functions
2096 we can refer to other tables in the ON DUPLICATE KEY part.
2097 We use next_name_resolution_table destructively, so check it first
2098 (views?).
2099 */
2100 assert (!table_list->next_name_resolution_table);
2101 if (lex->select_lex->group_list.elements == 0 &&
2102 !lex->select_lex->with_sum_func)
2103 {
2104 /*
2105 We must make a single context out of the two separate name resolution
2106 contexts:
2107 the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
2108 To do that we must concatenate the two lists
2109 */
2110 table_list->next_name_resolution_table=
2111 ctx_state.get_first_name_resolution_table();
2112 }
2113 lex->in_update_value_clause= true;
2114 if (!res)
2115 res= setup_fields(thd, Ref_ptr_array(), *update.update_values,
2116 SELECT_ACL, NULL, false, false);
2117 lex->in_update_value_clause= false;
2118 if (!res)
2119 {
2120 /*
2121 Traverse the update values list and substitute fields from the
2122 select for references (Item_ref objects) to them. This is done in
2123 order to get correct values from those fields when the select
2124 employs a temporary table.
2125 */
2126 List_iterator<Item> li(*update.update_values);
2127 Item *item;
2128
2129 while ((item= li++))
2130 {
2131 item->transform(&Item::update_value_transformer,
2132 (uchar*)lex->current_select());
2133 }
2134 }
2135
2136 /* Restore the current context. */
2137 ctx_state.restore_state(context, table_list);
2138 }
2139
2140 lex->set_current_select(lex_current_select_save);
2141 if (res)
2142 DBUG_RETURN(1);
2143 /*
2144 if it is INSERT into join view then check_insert_fields already found
2145 real table for insert
2146 */
2147 table= lex->insert_table_leaf->table;
2148
2149 if (duplicate_handling == DUP_UPDATE || duplicate_handling == DUP_REPLACE)
2150 prepare_for_positional_update(table, table_list);
2151
2152 if (info.add_function_default_columns(table, table->write_set))
2153 DBUG_RETURN(1);
2154 if ((duplicate_handling == DUP_UPDATE) &&
2155 update.add_function_default_columns(table, table->write_set))
2156 DBUG_RETURN(1);
2157
2158 /*
2159 Is table which we are changing used somewhere in other parts of
2160 query
2161 */
2162 if (unique_table(thd, lex->insert_table_leaf, table_list->next_global, 0))
2163 {
2164 // Using same table for INSERT and SELECT
2165 /*
2166 @todo: Use add_base_options instead of add_active_options, and only
2167 if first_execution is true; but this can be implemented only when this
2168 function is called before first_execution is set to true.
2169 if (lex->current_select()->first_execution)
2170 lex->current_select()->add_base_options(OPTION_BUFFER_RESULT);
2171 */
2172 lex->current_select()->add_active_options(OPTION_BUFFER_RESULT);
2173 }
2174 restore_record(table,s->default_values); // Get empty record
2175 table->next_number_field=table->found_next_number_field;
2176
2177 #ifdef HAVE_REPLICATION
2178 if (thd->slave_thread)
2179 {
2180 /* Get SQL thread's rli, even for a slave worker thread */
2181 Relay_log_info *c_rli= thd->rli_slave->get_c_rli();
2182 assert(c_rli != NULL);
2183 if (duplicate_handling == DUP_UPDATE &&
2184 table->next_number_field != NULL &&
2185 rpl_master_has_bug(c_rli, 24432, TRUE, NULL, NULL))
2186 DBUG_RETURN(1);
2187 }
2188 #endif
2189
2190 thd->cuted_fields=0;
2191 if (thd->lex->is_ignore() || duplicate_handling != DUP_ERROR)
2192 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2193 if (duplicate_handling == DUP_REPLACE &&
2194 (!table->triggers || !table->triggers->has_delete_triggers()))
2195 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2196 if (duplicate_handling == DUP_UPDATE)
2197 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2198
2199 /* Decide the logging format prior to preparing table/record metadata */
2200 res= res || thd->decide_logging_format(table_list, false);
2201 if (!res)
2202 {
2203 prepare_triggers_for_insert_stmt(table);
2204 }
2205
2206 for (Field** next_field= table->field; *next_field; ++next_field)
2207 {
2208 (*next_field)->reset_warnings();
2209 (*next_field)->reset_tmp_null();
2210 }
2211
2212 DBUG_RETURN(res ? -1 : 0);
2213 }
2214
2215
2216 /*
2217 Finish the preparation of the result table.
2218
2219 SYNOPSIS
2220 Query_result_insert::prepare2()
2221 void
2222
2223 DESCRIPTION
2224 If the result table is the same as one of the source tables (INSERT SELECT),
2225 the result table is not finally prepared at the join prepair phase.
2226 Do the final preparation now.
2227
2228 RETURN
2229 0 OK
2230 */
2231
prepare2()2232 int Query_result_insert::prepare2()
2233 {
2234 DBUG_ENTER("Query_result_insert::prepare2");
2235 if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
2236 !thd->lex->describe)
2237 {
2238 assert(!bulk_insert_started);
2239 // TODO: Is there no better estimation than 0 == Unknown number of rows?
2240 table->file->ha_start_bulk_insert((ha_rows) 0);
2241 bulk_insert_started= true;
2242 }
2243 DBUG_RETURN(0);
2244 }
2245
2246
cleanup()2247 void Query_result_insert::cleanup()
2248 {
2249 /*
2250 Query_result_insert/Query_result_create are never re-used
2251 in prepared statement
2252 */
2253 assert(0);
2254 }
2255
2256
~Query_result_insert()2257 Query_result_insert::~Query_result_insert()
2258 {
2259 DBUG_ENTER("~Query_result_insert");
2260 if (table)
2261 {
2262 table->next_number_field=0;
2263 table->auto_increment_field_not_null= FALSE;
2264 table->file->ha_reset();
2265 }
2266 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
2267 DBUG_VOID_RETURN;
2268 }
2269
2270
send_data(List<Item> & values)2271 bool Query_result_insert::send_data(List<Item> &values)
2272 {
2273 DBUG_ENTER("Query_result_insert::send_data");
2274 bool error=0;
2275
2276 if (unit->offset_limit_cnt)
2277 { // using limit offset,count
2278 unit->offset_limit_cnt--;
2279 DBUG_RETURN(0);
2280 }
2281
2282 thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
2283 store_values(values);
2284 thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
2285 if (thd->is_error())
2286 {
2287 table->auto_increment_field_not_null= FALSE;
2288 DBUG_RETURN(1);
2289 }
2290 if (table_list) // Not CREATE ... SELECT
2291 {
2292 switch (table_list->view_check_option(thd)) {
2293 case VIEW_CHECK_SKIP:
2294 DBUG_RETURN(0);
2295 case VIEW_CHECK_ERROR:
2296 DBUG_RETURN(1);
2297 }
2298 }
2299
2300 // Release latches in case bulk insert takes a long time
2301 ha_release_temporary_latches(thd);
2302
2303 error= write_record(thd, table, &info, &update);
2304 table->auto_increment_field_not_null= FALSE;
2305
2306 DEBUG_SYNC(thd, "create_select_after_write_rows_event");
2307
2308 if (!error &&
2309 (table->triggers || info.get_duplicate_handling() == DUP_UPDATE))
2310 {
2311 /*
2312 Restore fields of the record since it is possible that they were
2313 changed by ON DUPLICATE KEY UPDATE clause.
2314 If triggers exist then whey can modify some fields which were not
2315 originally touched by INSERT ... SELECT, so we have to restore
2316 their original values for the next row.
2317 */
2318 restore_record(table, s->default_values);
2319 }
2320 if (!error && table->next_number_field)
2321 {
2322 /*
2323 If no value has been autogenerated so far, we need to remember the
2324 value we just saw, we may need to send it to client in the end.
2325 */
2326 if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
2327 autoinc_value_of_last_inserted_row= table->next_number_field->val_int();
2328 /*
2329 Clear auto-increment field for the next record, if triggers are used
2330 we will clear it twice, but this should be cheap.
2331 */
2332 table->next_number_field->reset();
2333 }
2334
2335 DBUG_RETURN(error);
2336 }
2337
2338
store_values(List<Item> & values)2339 void Query_result_insert::store_values(List<Item> &values)
2340 {
2341 if (fields->elements)
2342 {
2343 restore_record(table, s->default_values);
2344 if (!validate_default_values_of_unset_fields(thd, table))
2345 fill_record_n_invoke_before_triggers(thd, &info, *fields, values,
2346 table, TRG_EVENT_INSERT,
2347 table->s->fields);
2348 }
2349 else
2350 fill_record_n_invoke_before_triggers(thd, table->field, values,
2351 table, TRG_EVENT_INSERT,
2352 table->s->fields);
2353
2354 check_that_all_fields_are_given_values(thd, table, table_list);
2355 }
2356
send_error(uint errcode,const char * err)2357 void Query_result_insert::send_error(uint errcode,const char *err)
2358 {
2359 DBUG_ENTER("Query_result_insert::send_error");
2360
2361 my_message(errcode, err, MYF(0));
2362
2363 DBUG_VOID_RETURN;
2364 }
2365
2366
send_eof()2367 bool Query_result_insert::send_eof()
2368 {
2369 int error;
2370 bool const trans_table= table->file->has_transactions();
2371 ulonglong id, row_count;
2372 bool changed;
2373 THD::killed_state killed_status= thd->killed;
2374 DBUG_ENTER("Query_result_insert::send_eof");
2375 DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
2376 trans_table, table->file->table_type()));
2377
2378 error= (bulk_insert_started ?
2379 table->file->ha_end_bulk_insert() : 0);
2380 bulk_insert_started= false;
2381 if (!error && thd->is_error())
2382 error= thd->get_stmt_da()->mysql_errno();
2383
2384 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2385 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2386
2387 changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
2388 if (changed)
2389 {
2390 /*
2391 We must invalidate the table in the query cache before binlog writing
2392 and ha_autocommit_or_rollback.
2393 */
2394 query_cache.invalidate(thd, table, TRUE);
2395 }
2396
2397 assert(trans_table || !changed ||
2398 thd->get_transaction()->cannot_safely_rollback(
2399 Transaction_ctx::STMT));
2400
2401 /*
2402 Write to binlog before commiting transaction. No statement will
2403 be written by the binlog_query() below in RBR mode. All the
2404 events are in the transaction cache and will be written when
2405 ha_autocommit_or_rollback() is issued below.
2406 */
2407 if (mysql_bin_log.is_open() &&
2408 (!error || thd->get_transaction()->cannot_safely_rollback(
2409 Transaction_ctx::STMT)))
2410 {
2411 int errcode= 0;
2412 if (!error)
2413 thd->clear_error();
2414 else
2415 errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);
2416 if (thd->binlog_query(THD::ROW_QUERY_TYPE,
2417 thd->query().str, thd->query().length,
2418 trans_table, false, false, errcode))
2419 {
2420 table->file->ha_release_auto_increment();
2421 DBUG_RETURN(1);
2422 }
2423 }
2424 table->file->ha_release_auto_increment();
2425
2426 if (error)
2427 {
2428 myf error_flags= MYF(0);
2429 if (table->file->is_fatal_error(my_errno()))
2430 error_flags|= ME_FATALERROR;
2431
2432 table->file->print_error(my_errno(), error_flags);
2433 DBUG_RETURN(1);
2434 }
2435
2436 /*
2437 For the strict_mode call of push_warning above results to set
2438 error in Diagnostic_area. Therefore it is necessary to check whether
2439 the error was set and leave method if it is true. If we didn't do
2440 so we would failed later when my_ok is called.
2441 */
2442 if (thd->get_stmt_da()->is_error())
2443 DBUG_RETURN(true);
2444
2445 char buff[160];
2446 if (thd->lex->is_ignore())
2447 my_snprintf(buff, sizeof(buff),
2448 ER(ER_INSERT_INFO), (long) info.stats.records,
2449 (long) (info.stats.records - info.stats.copied),
2450 (long) thd->get_stmt_da()->current_statement_cond_count());
2451 else
2452 my_snprintf(buff, sizeof(buff),
2453 ER(ER_INSERT_INFO), (long) info.stats.records,
2454 (long) (info.stats.deleted+info.stats.updated),
2455 (long) thd->get_stmt_da()->current_statement_cond_count());
2456 row_count= info.stats.copied + info.stats.deleted +
2457 (thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
2458 info.stats.touched : info.stats.updated);
2459 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
2460 thd->first_successful_insert_id_in_cur_stmt :
2461 (thd->arg_of_last_insert_id_function ?
2462 thd->first_successful_insert_id_in_prev_stmt :
2463 (info.stats.copied ? autoinc_value_of_last_inserted_row : 0));
2464 my_ok(thd, row_count, id, buff);
2465 thd->updated_row_count+= row_count;
2466 DBUG_RETURN(0);
2467 }
2468
2469
abort_result_set()2470 void Query_result_insert::abort_result_set()
2471 {
2472 DBUG_ENTER("Query_result_insert::abort_result_set");
2473 /*
2474 If the creation of the table failed (due to a syntax error, for
2475 example), no table will have been opened and therefore 'table'
2476 will be NULL. In that case, we still need to execute the rollback
2477 and the end of the function.
2478 */
2479 if (table)
2480 {
2481 bool changed, transactional_table;
2482 /*
2483 Try to end the bulk insert which might have been started before.
2484 We don't need to do this if we are in prelocked mode (since we
2485 don't use bulk insert in this case). Also we should not do this
2486 if tables are not locked yet (bulk insert is not started yet
2487 in this case).
2488 */
2489 if (bulk_insert_started) {
2490 table->file->ha_end_bulk_insert();
2491 bulk_insert_started= false;
2492 }
2493
2494 /*
2495 If at least one row has been inserted/modified and will stay in
2496 the table (the table doesn't have transactions) we must write to
2497 the binlog (and the error code will make the slave stop).
2498
2499 For many errors (example: we got a duplicate key error while
2500 inserting into a MyISAM table), no row will be added to the table,
2501 so passing the error to the slave will not help since there will
2502 be an error code mismatch (the inserts will succeed on the slave
2503 with no error).
2504
2505 If table creation failed, the number of rows modified will also be
2506 zero, so no check for that is made.
2507 */
2508 changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
2509 transactional_table= table->file->has_transactions();
2510 if (thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::STMT))
2511 {
2512 if (mysql_bin_log.is_open())
2513 {
2514 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
2515 /* error of writing binary log is ignored */
2516 (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query().str,
2517 thd->query().length,
2518 transactional_table, FALSE, FALSE, errcode);
2519 }
2520 if (changed)
2521 query_cache.invalidate(thd, table, TRUE);
2522 }
2523 assert(transactional_table || !changed ||
2524 thd->get_transaction()->cannot_safely_rollback(
2525 Transaction_ctx::STMT));
2526 table->file->ha_release_auto_increment();
2527 }
2528
2529 DBUG_VOID_RETURN;
2530 }
2531
2532
2533 /***************************************************************************
2534 CREATE TABLE (SELECT) ...
2535 ***************************************************************************/
2536
2537 /**
2538 Create table from lists of fields and items (or just return TABLE
2539 object for pre-opened existing table). Used by CREATE SELECT.
2540
2541 Let "source table" be the table in the SELECT part.
2542
2543 Let "source table columns" be the set of columns in the SELECT list.
2544
2545 An interesting peculiarity in the syntax CREATE TABLE (<columns>) SELECT is
2546 that function defaults are stripped from the the source table columns, but
2547 not from the additional columns defined in the CREATE TABLE part. The first
2548 @c TIMESTAMP column there is also subject to promotion to @c TIMESTAMP @c
2549 DEFAULT @c CURRENT_TIMESTAMP @c ON @c UPDATE @c CURRENT_TIMESTAMP, as usual.
2550
2551
2552 @param thd [in] Thread object
2553 @param create_info [in] Create information (like MAX_ROWS, ENGINE or
2554 temporary table flag)
2555 @param create_table [in] Pointer to TABLE_LIST object providing database
2556 and name for table to be created or to be open
2557 @param alter_info [in/out] Initial list of columns and indexes for the
2558 table to be created
2559 @param items [in] The source table columns. Corresponding column
2560 definitions (Create_field's) will be added to
2561 the end of alter_info->create_list.
2562 @param lock [out] Pointer to the MYSQL_LOCK object for table
2563 created will be returned in this parameter.
2564 Since this table is not included in THD::lock
2565 caller is responsible for explicitly unlocking
2566 this table.
2567 @param hooks [in] Hooks to be invoked before and after obtaining
2568 table lock on the table being created.
2569
2570 @note
2571 This function assumes that either table exists and was pre-opened and
2572 locked at open_and_lock_tables() stage (and in this case we just emit
2573 error or warning and return pre-opened TABLE object) or an exclusive
2574 metadata lock was acquired on table so we can safely create, open and
2575 lock table in it (we don't acquire metadata lock if this create is
2576 for temporary table).
2577
2578 @note
2579 Since this function contains some logic specific to CREATE TABLE ...
2580 SELECT it should be changed before it can be used in other contexts.
2581
2582 @retval non-zero Pointer to TABLE object for table created or opened
2583 @retval 0 Error
2584 */
2585
create_table_from_items(THD * thd,HA_CREATE_INFO * create_info,TABLE_LIST * create_table,Alter_info * alter_info,List<Item> * items)2586 static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
2587 TABLE_LIST *create_table,
2588 Alter_info *alter_info,
2589 List<Item> *items)
2590 {
2591 TABLE tmp_table; // Used during 'Create_field()'
2592 TABLE_SHARE share;
2593 TABLE *table= 0;
2594 uint select_field_count= items->elements;
2595 /* Add selected items to field list */
2596 List_iterator_fast<Item> it(*items);
2597 Item *item;
2598
2599 DBUG_ENTER("create_table_from_items");
2600
2601 tmp_table.alias= 0;
2602 tmp_table.s= &share;
2603 init_tmp_table_share(thd, &share, "", 0, "", "");
2604
2605 tmp_table.s->db_create_options=0;
2606 tmp_table.s->db_low_byte_first=
2607 MY_TEST(create_info->db_type == myisam_hton ||
2608 create_info->db_type == heap_hton);
2609 tmp_table.reset_null_row();
2610
2611 if (!thd->variables.explicit_defaults_for_timestamp)
2612 promote_first_timestamp_column(&alter_info->create_list);
2613
2614 while ((item=it++))
2615 {
2616 Field *tmp_table_field;
2617 if (item->type() == Item::FUNC_ITEM)
2618 {
2619 if (item->result_type() != STRING_RESULT)
2620 tmp_table_field= item->tmp_table_field(&tmp_table);
2621 else
2622 tmp_table_field= item->tmp_table_field_from_field_type(&tmp_table,
2623 false);
2624 }
2625 else
2626 {
2627 Field *from_field, *default_field;
2628 tmp_table_field= create_tmp_field(thd, &tmp_table, item, item->type(),
2629 NULL,
2630 &from_field, &default_field,
2631 false, false, false, false);
2632 }
2633
2634 if (!tmp_table_field)
2635 DBUG_RETURN(NULL);
2636
2637 Field *table_field;
2638
2639 switch (item->type())
2640 {
2641 /*
2642 We have to take into account both the real table's fields and
2643 pseudo-fields used in trigger's body. These fields are used
2644 to copy defaults values later inside constructor of
2645 the class Create_field.
2646 */
2647 case Item::FIELD_ITEM:
2648 case Item::TRIGGER_FIELD_ITEM:
2649 table_field= ((Item_field *) item)->field;
2650 break;
2651 default:
2652 {
2653 /*
2654 If the expression is of temporal type having date and non-nullable,
2655 a zero date is generated. If in strict mode, then zero date is
2656 invalid. For such cases no default is generated.
2657 */
2658 table_field= NULL;
2659 if (tmp_table_field->is_temporal_with_date() &&
2660 thd->is_strict_mode() && !item->maybe_null)
2661 tmp_table_field->flags|= NO_DEFAULT_VALUE_FLAG;
2662 }
2663 }
2664
2665 assert(tmp_table_field->gcol_info== NULL && tmp_table_field->stored_in_db);
2666 Create_field *cr_field= new Create_field(tmp_table_field, table_field);
2667
2668 if (!cr_field)
2669 DBUG_RETURN(NULL);
2670
2671 if (item->maybe_null)
2672 cr_field->flags &= ~NOT_NULL_FLAG;
2673 alter_info->create_list.push_back(cr_field);
2674 }
2675
2676 DEBUG_SYNC(thd,"create_table_select_before_create");
2677
2678 /*
2679 Create and lock table.
2680
2681 Note that we either creating (or opening existing) temporary table or
2682 creating base table on which name we have exclusive lock. So code below
2683 should not cause deadlocks or races.
2684
2685 We don't log the statement, it will be logged later.
2686
2687 If this is a HEAP table, the automatic DELETE FROM which is written to the
2688 binlog when a HEAP table is opened for the first time since startup, must
2689 not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
2690 don't want to delete from it) 2) it would be written before the CREATE
2691 TABLE, which is a wrong order. So we keep binary logging disabled when we
2692 open_table().
2693 */
2694 {
2695 if (!mysql_create_table_no_lock(thd, create_table->db,
2696 create_table->table_name,
2697 create_info, alter_info,
2698 select_field_count, NULL))
2699 {
2700 DEBUG_SYNC(thd,"create_table_select_before_open");
2701
2702 if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
2703 {
2704 Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
2705 /*
2706 Here we open the destination table, on which we already have
2707 an exclusive metadata lock.
2708 */
2709 if (open_table(thd, create_table, &ot_ctx))
2710 {
2711 quick_rm_table(thd, create_info->db_type, create_table->db,
2712 table_case_name(create_info, create_table->table_name),
2713 0);
2714 }
2715 else
2716 table= create_table->table;
2717 }
2718 else
2719 {
2720 if (open_temporary_table(thd, create_table))
2721 {
2722 /*
2723 This shouldn't happen as creation of temporary table should make
2724 it preparable for open. Anyway we can't drop temporary table if
2725 we are unable to fint it.
2726 */
2727 assert(0);
2728 }
2729 else
2730 {
2731 table= create_table->table;
2732 }
2733 }
2734 }
2735 if (!table) // open failed
2736 DBUG_RETURN(NULL);
2737 }
2738 DBUG_RETURN(table);
2739 }
2740
2741
2742 /**
2743 Create the new table from the selected items.
2744
2745 @param values List of items to be used as new columns
2746 @param u Select
2747
2748 @return Operation status.
2749 @retval 0 Success
2750 @retval !=0 Failure
2751 */
2752
prepare(List<Item> & values,SELECT_LEX_UNIT * u)2753 int Query_result_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2754 {
2755 DBUG_ENTER("Query_result_create::prepare");
2756
2757 unit= u;
2758 assert(create_table->table == NULL);
2759
2760 DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
2761
2762 if (!(table= create_table_from_items(thd, create_info, create_table,
2763 alter_info, &values)))
2764 /* abort() deletes table */
2765 DBUG_RETURN(-1);
2766
2767 if (table->s->fields < values.elements)
2768 {
2769 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
2770 DBUG_RETURN(-1);
2771 }
2772 /* First field to copy */
2773 field= table->field+table->s->fields - values.elements;
2774 for (Field **f= field ; *f ; f++)
2775 {
2776 if ((*f)->gcol_info)
2777 {
2778 /*
2779 Generated columns are not allowed to be given a value for CREATE TABLE ..
2780 SELECT statment.
2781 */
2782 my_error(ER_NON_DEFAULT_VALUE_FOR_GENERATED_COLUMN, MYF(0),
2783 (*f)->field_name, (*f)->table->s->table_name.str);
2784 DBUG_RETURN(true);
2785 }
2786 }
2787
2788 // Turn off function defaults for columns filled from SELECT list:
2789 const bool retval= info.ignore_last_columns(table, values.elements);
2790 DBUG_RETURN(retval);
2791 }
2792
2793
2794 /**
2795 Lock the newly created table and prepare it for insertion.
2796
2797 @return Operation status.
2798 @retval 0 Success
2799 @retval !=0 Failure
2800 */
2801
prepare2()2802 int Query_result_create::prepare2()
2803 {
2804 DBUG_ENTER("Query_result_create::prepare2");
2805 DEBUG_SYNC(thd,"create_table_select_before_lock");
2806
2807 MYSQL_LOCK *extra_lock= NULL;
2808 /*
2809 For row-based replication, the CREATE-SELECT statement is written
2810 in two pieces: the first one contain the CREATE TABLE statement
2811 necessary to create the table and the second part contain the rows
2812 that should go into the table.
2813
2814 For non-temporary tables, the start of the CREATE-SELECT
2815 implicitly commits the previous transaction, and all events
2816 forming the statement will be stored the transaction cache. At end
2817 of the statement, the entire statement is committed as a
2818 transaction, and all events are written to the binary log.
2819
2820 On the master, the table is locked for the duration of the
2821 statement, but since the CREATE part is replicated as a simple
2822 statement, there is no way to lock the table for accesses on the
2823 slave. Hence, we have to hold on to the CREATE part of the
2824 statement until the statement has finished.
2825 */
2826 class MY_HOOKS : public TABLEOP_HOOKS {
2827 public:
2828 MY_HOOKS(Query_result_create *x, TABLE_LIST *create_table_arg,
2829 TABLE_LIST *select_tables_arg)
2830 : ptr(x),
2831 create_table(create_table_arg),
2832 select_tables(select_tables_arg)
2833 {
2834 }
2835
2836 private:
2837 virtual int do_postlock(TABLE **tables, uint count)
2838 {
2839 int error;
2840 THD *thd= const_cast<THD*>(ptr->get_thd());
2841 TABLE_LIST *save_next_global= create_table->next_global;
2842
2843 create_table->next_global= select_tables;
2844
2845 error= thd->decide_logging_format(create_table);
2846
2847 create_table->next_global= save_next_global;
2848
2849 if (error)
2850 return error;
2851
2852 create_table->table->set_binlog_drop_if_temp(
2853 !thd->is_current_stmt_binlog_disabled()
2854 && !thd->is_current_stmt_binlog_format_row());
2855
2856 TABLE const *const table = *tables;
2857 create_table->table->set_binlog_drop_if_temp(
2858 !thd->is_current_stmt_binlog_disabled()
2859 && !thd->is_current_stmt_binlog_format_row());
2860
2861 if (thd->is_current_stmt_binlog_format_row() &&
2862 !table->s->tmp_table)
2863 {
2864 if (int error= ptr->binlog_show_create_table(tables, count))
2865 return error;
2866 }
2867 return 0;
2868 }
2869 Query_result_create *ptr;
2870 TABLE_LIST *create_table;
2871 TABLE_LIST *select_tables;
2872 };
2873
2874 MY_HOOKS hooks(this, create_table, select_tables);
2875
2876 table->reginfo.lock_type=TL_WRITE;
2877 hooks.prelock(&table, 1); // Call prelock hooks
2878 /*
2879 mysql_lock_tables() below should never fail with request to reopen table
2880 since it won't wait for the table lock (we have exclusive metadata lock on
2881 the table) and thus can't get aborted.
2882 */
2883 if (! (extra_lock= mysql_lock_tables(thd, &table, 1, 0)) ||
2884 hooks.postlock(&table, 1))
2885 {
2886 if (extra_lock)
2887 {
2888 mysql_unlock_tables(thd, extra_lock);
2889 extra_lock= 0;
2890 }
2891 drop_open_table(thd, table, create_table->db, create_table->table_name);
2892 table= 0;
2893 DBUG_RETURN(1);
2894 }
2895 if (extra_lock)
2896 {
2897 assert(m_plock == NULL);
2898
2899 if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
2900 m_plock= &m_lock;
2901 else
2902 m_plock= &thd->extra_lock;
2903
2904 *m_plock= extra_lock;
2905 }
2906 /* Mark all fields that are given values */
2907 for (Field **f= field ; *f ; f++)
2908 {
2909 bitmap_set_bit(table->write_set, (*f)->field_index);
2910 bitmap_set_bit(table->fields_set_during_insert, (*f)->field_index);
2911 }
2912
2913 // Set up an empty bitmap of function defaults
2914 if (info.add_function_default_columns(table, table->write_set))
2915 DBUG_RETURN(1);
2916
2917 if (info.add_function_default_columns(table,
2918 table->fields_set_during_insert))
2919 DBUG_RETURN(1);
2920
2921 table->next_number_field=table->found_next_number_field;
2922
2923 restore_record(table,s->default_values); // Get empty record
2924 thd->cuted_fields=0;
2925
2926 const enum_duplicates duplicate_handling= info.get_duplicate_handling();
2927
2928 if (thd->lex->is_ignore() || duplicate_handling != DUP_ERROR)
2929 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2930 if (duplicate_handling == DUP_REPLACE &&
2931 (!table->triggers || !table->triggers->has_delete_triggers()))
2932 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2933 if (duplicate_handling == DUP_UPDATE)
2934 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2935 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
2936 {
2937 table->file->ha_start_bulk_insert((ha_rows) 0);
2938 bulk_insert_started= true;
2939 }
2940
2941 enum_check_fields save_count_cuted_fields= thd->count_cuted_fields;
2942 thd->count_cuted_fields= CHECK_FIELD_WARN;
2943
2944 if (check_that_all_fields_are_given_values(thd, table, table_list))
2945 DBUG_RETURN(1);
2946
2947 thd->count_cuted_fields= save_count_cuted_fields;
2948
2949 table->mark_columns_needed_for_insert();
2950 table->file->extra(HA_EXTRA_WRITE_CACHE);
2951 DBUG_RETURN(0);
2952 }
2953
2954
binlog_show_create_table(TABLE ** tables,uint count)2955 int Query_result_create::binlog_show_create_table(TABLE **tables, uint count)
2956 {
2957 DBUG_ENTER("select_create::binlog_show_create_table");
2958 /*
2959 Note 1: In RBR mode, we generate a CREATE TABLE statement for the
2960 created table by calling store_create_info() (behaves as SHOW
2961 CREATE TABLE). The 'CREATE TABLE' event will be put in the
2962 binlog statement cache with an Anonymous_gtid_log_event, and
2963 any subsequent events (e.g., table-map events and rows event)
2964 will be put in the binlog transaction cache with an
2965 Anonymous_gtid_log_event. So that the 'CREATE...SELECT'
2966 statement is logged as:
2967 Anonymous_gtid_log_event
2968 CREATE TABLE event
2969 Anonymous_gtid_log_event
2970 BEGIN
2971 rows event
2972 COMMIT
2973
2974 We write the CREATE TABLE statement here and not in prepare()
2975 since there potentially are sub-selects or accesses to information
2976 schema that will do a close_thread_tables(), destroying the
2977 statement transaction cache.
2978 */
2979 assert(thd->is_current_stmt_binlog_format_row());
2980 assert(tables && *tables && count > 0);
2981
2982 char buf[2048];
2983 String query(buf, sizeof(buf), system_charset_info);
2984 int result;
2985 TABLE_LIST tmp_table_list;
2986
2987 tmp_table_list.table = *tables;
2988 query.length(0); // Have to zero it since constructor doesn't
2989
2990 result= store_create_info(thd, &tmp_table_list, &query, create_info,
2991 /* show_database */ TRUE);
2992 assert(result == 0); /* store_create_info() always return 0 */
2993
2994 if (mysql_bin_log.is_open())
2995 {
2996 DEBUG_SYNC(thd, "create_select_before_write_create_event");
2997 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
2998 result= thd->binlog_query(THD::STMT_QUERY_TYPE,
2999 query.ptr(), query.length(),
3000 /* is_trans */ false,
3001 /* direct */ true,
3002 /* suppress_use */ FALSE,
3003 errcode);
3004 DEBUG_SYNC(thd, "create_select_after_write_create_event");
3005 }
3006 DBUG_RETURN(result);
3007 }
3008
3009
store_values(List<Item> & values)3010 void Query_result_create::store_values(List<Item> &values)
3011 {
3012 fill_record_n_invoke_before_triggers(thd, field, values,
3013 table, TRG_EVENT_INSERT,
3014 table->s->fields);
3015 }
3016
3017
send_error(uint errcode,const char * err)3018 void Query_result_create::send_error(uint errcode,const char *err)
3019 {
3020 DBUG_ENTER("Query_result_create::send_error");
3021
3022 DBUG_PRINT("info",
3023 ("Current statement %s row-based",
3024 thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT"));
3025 DBUG_PRINT("info",
3026 ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3027 (ulong) table,
3028 table && !table->s->tmp_table ? "is NOT" : "is"));
3029 /*
3030 This will execute any rollbacks that are necessary before writing
3031 the transcation cache.
3032
3033 We disable the binary log since nothing should be written to the
3034 binary log. This disabling is important, since we potentially do
3035 a "roll back" of non-transactional tables by removing the table,
3036 and the actual rollback might generate events that should not be
3037 written to the binary log.
3038
3039 */
3040 tmp_disable_binlog(thd);
3041 Query_result_insert::send_error(errcode, err);
3042 reenable_binlog(thd);
3043
3044 DBUG_VOID_RETURN;
3045 }
3046
3047
send_eof()3048 bool Query_result_create::send_eof()
3049 {
3050 /*
3051 The routine that writes the statement in the binary log
3052 is in Query_result_insert::send_eof(). For that reason, we
3053 mark the flag at this point.
3054 */
3055 if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
3056 thd->get_transaction()->mark_created_temp_table(Transaction_ctx::STMT);
3057
3058 bool tmp= Query_result_insert::send_eof();
3059 if (tmp)
3060 abort_result_set();
3061 else
3062 {
3063 /*
3064 Do an implicit commit at end of statement for non-temporary
3065 tables. This can fail, but we should unlock the table
3066 nevertheless.
3067 */
3068 if (!table->s->tmp_table)
3069 {
3070 trans_commit_stmt(thd);
3071 trans_commit_implicit(thd);
3072 }
3073
3074 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3075 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3076 if (m_plock)
3077 {
3078 mysql_unlock_tables(thd, *m_plock);
3079 *m_plock= NULL;
3080 m_plock= NULL;
3081 }
3082 }
3083 return tmp;
3084 }
3085
3086
abort_result_set()3087 void Query_result_create::abort_result_set()
3088 {
3089 DBUG_ENTER("Query_result_create::abort_result_set");
3090
3091 /*
3092 In Query_result_insert::abort_result_set() we roll back the statement, including
3093 truncating the transaction cache of the binary log. To do this, we
3094 pretend that the statement is transactional, even though it might
3095 be the case that it was not.
3096
3097 We roll back the statement prior to deleting the table and prior
3098 to releasing the lock on the table, since there might be potential
3099 for failure if the rollback is executed after the drop or after
3100 unlocking the table.
3101
3102 We also roll back the statement regardless of whether the creation
3103 of the table succeeded or not, since we need to reset the binary
3104 log state.
3105 */
3106 tmp_disable_binlog(thd);
3107 Query_result_insert::abort_result_set();
3108 thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::STMT);
3109 reenable_binlog(thd);
3110 /* possible error of writing binary log is ignored deliberately */
3111 (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
3112
3113 if (m_plock)
3114 {
3115 mysql_unlock_tables(thd, *m_plock);
3116 *m_plock= NULL;
3117 m_plock= NULL;
3118 }
3119
3120 if (table)
3121 {
3122 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3123 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3124 table->auto_increment_field_not_null= FALSE;
3125 drop_open_table(thd, table, create_table->db, create_table->table_name);
3126 table=0; // Safety
3127 }
3128 DBUG_VOID_RETURN;
3129 }
3130
3131
execute(THD * thd)3132 bool Sql_cmd_insert::execute(THD *thd)
3133 {
3134 assert(thd->lex->sql_command == SQLCOM_REPLACE ||
3135 thd->lex->sql_command == SQLCOM_INSERT);
3136
3137 bool res= false;
3138 LEX *const lex= thd->lex;
3139 SELECT_LEX *const select_lex= lex->select_lex;
3140 TABLE_LIST *const first_table= select_lex->get_table_list();
3141 TABLE_LIST *const all_tables= first_table;
3142
3143 if (open_temporary_tables(thd, all_tables))
3144 return true;
3145
3146 if (insert_precheck(thd, all_tables))
3147 return true;
3148
3149 /* Push ignore / strict error handler */
3150 Ignore_error_handler ignore_handler;
3151 Strict_error_handler strict_handler;
3152 if (thd->lex->is_ignore())
3153 thd->push_internal_handler(&ignore_handler);
3154 else if (thd->is_strict_mode())
3155 thd->push_internal_handler(&strict_handler);
3156
3157 MYSQL_INSERT_START(const_cast<char*>(thd->query().str));
3158 res= mysql_insert(thd, all_tables);
3159 MYSQL_INSERT_DONE(res, (ulong) thd->get_row_count_func());
3160
3161 /* Pop ignore / strict error handler */
3162 if (thd->lex->is_ignore() || thd->is_strict_mode())
3163 thd->pop_internal_handler();
3164
3165 /*
3166 If we have inserted into a VIEW, and the base table has
3167 AUTO_INCREMENT column, but this column is not accessible through
3168 a view, then we should restore LAST_INSERT_ID to the value it
3169 had before the statement.
3170 */
3171 if (first_table->is_view() && !first_table->contain_auto_increment)
3172 thd->first_successful_insert_id_in_cur_stmt=
3173 thd->first_successful_insert_id_in_prev_stmt;
3174
3175 DBUG_EXECUTE_IF("after_mysql_insert",
3176 {
3177 const char act[]=
3178 "now "
3179 "wait_for signal.continue";
3180 assert(opt_debug_sync_timeout > 0);
3181 assert(!debug_sync_set_action(current_thd,
3182 STRING_WITH_LEN(act)));
3183 };);
3184
3185 thd->lex->clear_values_map();
3186 DEBUG_SYNC(thd, "after_mysql_insert");
3187 return res;
3188 }
3189
3190
execute(THD * thd)3191 bool Sql_cmd_insert_select::execute(THD *thd)
3192 {
3193 assert(thd->lex->sql_command == SQLCOM_REPLACE_SELECT ||
3194 thd->lex->sql_command == SQLCOM_INSERT_SELECT);
3195
3196 bool res= false;
3197 LEX *const lex= thd->lex;
3198 SELECT_LEX *const select_lex= lex->select_lex;
3199 SELECT_LEX_UNIT *const unit= lex->unit;
3200 TABLE_LIST *const first_table= select_lex->get_table_list();
3201 TABLE_LIST *const all_tables= first_table;
3202
3203 Query_result_insert *sel_result;
3204 if (insert_precheck(thd, all_tables))
3205 return true;
3206 /*
3207 INSERT...SELECT...ON DUPLICATE KEY UPDATE/REPLACE SELECT/
3208 INSERT...IGNORE...SELECT can be unsafe, unless ORDER BY PRIMARY KEY
3209 clause is used in SELECT statement. We therefore use row based
3210 logging if mixed or row based logging is available.
3211 TODO: Check if the order of the output of the select statement is
3212 deterministic. Waiting for BUG#42415
3213 */
3214 if (lex->sql_command == SQLCOM_INSERT_SELECT &&
3215 lex->duplicates == DUP_UPDATE)
3216 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_SELECT_UPDATE);
3217
3218 if (lex->sql_command == SQLCOM_INSERT_SELECT && lex->is_ignore())
3219 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_IGNORE_SELECT);
3220
3221 if (lex->sql_command == SQLCOM_REPLACE_SELECT)
3222 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_REPLACE_SELECT);
3223
3224 unit->set_limit(select_lex);
3225
3226 if (!(res= open_tables_for_query(thd, all_tables, 0)))
3227 {
3228 if (run_before_dml_hook(thd))
3229 return true;
3230
3231 MYSQL_INSERT_SELECT_START(const_cast<char*>(thd->query().str));
3232
3233 /* Skip first table, which is the table we are inserting in */
3234 TABLE_LIST *second_table= first_table->next_local;
3235 select_lex->table_list.first= second_table;
3236 select_lex->context.table_list=
3237 select_lex->context.first_name_resolution_table= second_table;
3238
3239 res= mysql_insert_select_prepare(thd);
3240 if (!res && (sel_result= new Query_result_insert(first_table,
3241 first_table->table,
3242 &insert_field_list,
3243 &insert_field_list,
3244 &insert_update_list,
3245 &insert_value_list,
3246 lex->duplicates)))
3247 {
3248 Ignore_error_handler ignore_handler;
3249 Strict_error_handler strict_handler;
3250 if (thd->lex->is_ignore())
3251 thd->push_internal_handler(&ignore_handler);
3252 else if (thd->is_strict_mode())
3253 thd->push_internal_handler(&strict_handler);
3254
3255 res= handle_query(thd, lex, sel_result,
3256 // Don't unlock tables until command is written
3257 // to binary log
3258 OPTION_SETUP_TABLES_DONE | SELECT_NO_UNLOCK,
3259 0);
3260
3261 if (thd->lex->is_ignore() || thd->is_strict_mode())
3262 thd->pop_internal_handler();
3263
3264 delete sel_result;
3265 }
3266 /* revert changes for SP */
3267 MYSQL_INSERT_SELECT_DONE(res, (ulong) thd->get_row_count_func());
3268 select_lex->table_list.first= first_table;
3269 }
3270 /*
3271 If we have inserted into a VIEW, and the base table has
3272 AUTO_INCREMENT column, but this column is not accessible through
3273 a view, then we should restore LAST_INSERT_ID to the value it
3274 had before the statement.
3275 */
3276 if (first_table->is_view() && !first_table->contain_auto_increment)
3277 thd->first_successful_insert_id_in_cur_stmt=
3278 thd->first_successful_insert_id_in_prev_stmt;
3279
3280 thd->lex->clear_values_map();
3281 return res;
3282 }
3283
3284
prepared_statement_test(THD * thd)3285 bool Sql_cmd_insert::prepared_statement_test(THD *thd)
3286 {
3287 LEX *lex= thd->lex;
3288 return mysql_test_insert(thd, lex->query_tables);
3289 }
3290