1 /*
2 Copyright (c) 2000, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 /* Insert of records */
26
27 #include "sql_insert.h"
28
29 #include "auth_common.h" // check_grant_all_columns
30 #include "debug_sync.h" // DEBUG_SYNC
31 #include "item.h" // Item
32 #include "lock.h" // mysql_unlock_tables
33 #include "opt_explain.h" // Modification_plan
34 #include "opt_explain_format.h" // enum_mod_type
35 #include "rpl_rli.h" // Relay_log_info
36 #include "rpl_slave.h" // rpl_master_has_bug
37 #include "sql_base.h" // setup_fields
38 #include "sql_resolver.h" // Column_privilege_tracker
39 #include "sql_select.h" // free_underlaid_joins
40 #include "sql_show.h" // store_create_info
41 #include "sql_table.h" // quick_rm_table
42 #include "sql_tmp_table.h" // create_tmp_field
43 #include "sql_update.h" // records_are_comparable
44 #include "sql_view.h" // check_key_in_view
45 #include "table_trigger_dispatcher.h" // Table_trigger_dispatcher
46 #include "transaction.h" // trans_commit_stmt
47 #include "sql_resolver.h" // validate_gc_assignment
48 #include "partition_info.h" // partition_info
49 #include "probes_mysql.h" // MYSQL_INSERT_START
50
51 static bool check_view_insertability(THD *thd, TABLE_LIST *view,
52 const TABLE_LIST *insert_table_ref);
53
54 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables);
55
56 /**
57 Check that insert fields are from a single table of a multi-table view.
58
59 @param fields The insert fields to be checked.
60 @param view The view for insert.
61 @param insert_table_ref[out] Reference to table to insert into
62
63 This function is called to check that the fields being inserted into
64 are from a single base table. This must be checked when the table to
65 be inserted into is a multi-table view.
66
67 @return false if success, true if an error was raised.
68 */
69
check_single_table_insert(List<Item> & fields,TABLE_LIST * view,TABLE_LIST ** insert_table_ref)70 static bool check_single_table_insert(List<Item> &fields, TABLE_LIST *view,
71 TABLE_LIST **insert_table_ref)
72 {
73 // It is join view => we need to find the table for insert
74 List_iterator_fast<Item> it(fields);
75 Item *item;
76 *insert_table_ref= NULL; // reset for call to check_single_table()
77 table_map tables= 0;
78
79 while ((item= it++))
80 tables|= item->used_tables();
81
82 if (view->check_single_table(insert_table_ref, tables))
83 {
84 my_error(ER_VIEW_MULTIUPDATE, MYF(0),
85 view->view_db.str, view->view_name.str);
86 return true;
87 }
88 assert(*insert_table_ref && (*insert_table_ref)->is_insertable());
89
90 return false;
91 }
92
93
94 /**
95 Check insert fields.
96
97 @param thd The current thread.
98 @param table_list The table for insert.
99 @param fields The insert fields.
100 @param value_count Number of values supplied
101 @param value_count_known if false, delay field count check
102 @todo: Eliminate this when preparation is properly phased
103 @param check_unique If duplicate values should be rejected.
104
105 @return false if success, true if error
106
107 Resolved reference to base table is returned in lex->insert_table_leaf.
108
109 @todo check_insert_fields() should be refactored as follows:
110 - Remove the argument value_count_known and all predicates involving it.
111 - Rearrange the call to check_insert_fields() from
112 mysql_prepare_insert() so that the value_count is known also when
113 processing a prepared statement.
114 */
115
check_insert_fields(THD * thd,TABLE_LIST * table_list,List<Item> & fields,uint value_count,bool value_count_known,bool check_unique)116 static bool check_insert_fields(THD *thd, TABLE_LIST *table_list,
117 List<Item> &fields, uint value_count,
118 bool value_count_known, bool check_unique)
119 {
120 LEX *const lex= thd->lex;
121
122 #ifndef NDEBUG
123 TABLE_LIST *const saved_insert_table_leaf= lex->insert_table_leaf;
124 #endif
125
126 TABLE *table= table_list->table;
127
128 assert(table_list->is_insertable());
129
130 if (fields.elements == 0 && value_count_known && value_count > 0)
131 {
132 /*
133 No field list supplied, but a value list has been supplied.
134 Use field list of table being updated.
135 */
136 assert(table); // This branch is not reached with a view:
137
138 lex->insert_table_leaf= table_list;
139
140 // Values for all fields in table are needed
141 if (value_count != table->s->fields)
142 {
143 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
144 return true;
145 }
146 #ifndef NO_EMBEDDED_ACCESS_CHECKS
147 Field_iterator_table_ref field_it;
148 field_it.set(table_list);
149 if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
150 return true;
151 #endif
152 /*
153 No fields are provided so all fields must be provided in the values.
154 Thus we set all bits in the write set.
155 */
156 bitmap_set_all(table->write_set);
157 }
158 else
159 {
160 // INSERT with explicit field list.
161 SELECT_LEX *select_lex= thd->lex->select_lex;
162 Name_resolution_context *context= &select_lex->context;
163 Name_resolution_context_state ctx_state;
164 int res;
165
166 if (value_count_known && fields.elements != value_count)
167 {
168 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
169 return true;
170 }
171
172 thd->dup_field= 0;
173
174 /* Save the state of the current name resolution context. */
175 ctx_state.save_state(context, table_list);
176
177 /*
178 Perform name resolution only in the first table - 'table_list',
179 which is the table that is inserted into.
180 */
181 table_list->next_local= NULL;
182 context->resolve_in_table_list_only(table_list);
183 res= setup_fields(thd, Ref_ptr_array(), fields, INSERT_ACL, NULL,
184 false, true);
185
186 /* Restore the current context. */
187 ctx_state.restore_state(context, table_list);
188
189 if (res)
190 return true;
191
192 if (table_list->is_merged())
193 {
194 if (check_single_table_insert(fields, table_list,
195 &lex->insert_table_leaf))
196 return true;
197 table= lex->insert_table_leaf->table;
198 }
199 else
200 {
201 lex->insert_table_leaf= table_list;
202 }
203
204 if (check_unique && thd->dup_field)
205 {
206 my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
207 return true;
208 }
209 }
210 /* Mark all generated columns for write*/
211 if (table->vfield)
212 table->mark_generated_columns(false);
213
214 if (check_key_in_view(thd, table_list, lex->insert_table_leaf) ||
215 (table_list->is_view() &&
216 check_view_insertability(thd, table_list, lex->insert_table_leaf)))
217 {
218 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
219 return true;
220 }
221
222 assert(saved_insert_table_leaf == NULL ||
223 lex->insert_table_leaf == saved_insert_table_leaf);
224
225 return false;
226 }
227
228
229 /**
230 Check that table references are restricted to the supplied table map.
231 The check can be ignored if the supplied table is a base table.
232
233 @param view Table being specified
234 @param values Values whose used tables are to be matched against table map
235 @param map Table map to match against
236
237 @return false if success, true if error
238 */
239
check_valid_table_refs(const TABLE_LIST * view,List<Item> & values,table_map map)240 static bool check_valid_table_refs(const TABLE_LIST *view, List<Item> &values,
241 table_map map)
242 {
243 List_iterator_fast<Item> it(values);
244 Item *item;
245
246 // A base table will always match the supplied map.
247 assert(view->is_view() || (view->table && map));
248
249 if (!view->is_view()) // Ignore check if called with base table.
250 return false;
251
252 map|= PSEUDO_TABLE_BITS;
253
254 while ((item= it++))
255 {
256 if (item->used_tables() & ~map)
257 {
258 my_error(ER_VIEW_MULTIUPDATE, MYF(0),
259 view->view_db.str, view->view_name.str);
260 return true;
261 }
262 }
263 return false;
264 }
265
266
267 /**
268 Validates default value of fields which are not specified in
269 the column list of INSERT statement.
270
271 @Note table->record[0] should be be populated with default values
272 before calling this function.
273
274 @param thd thread context
275 @param table table to which values are inserted.
276
277 @return
278 @retval false Success.
279 @retval true Failure.
280 */
281
validate_default_values_of_unset_fields(THD * thd,TABLE * table)282 bool validate_default_values_of_unset_fields(THD *thd, TABLE *table)
283 {
284 MY_BITMAP *write_set= table->write_set;
285 DBUG_ENTER("validate_default_values_of_unset_fields");
286
287 for (Field **field= table->field; *field; field++)
288 {
289 if (!bitmap_is_set(write_set, (*field)->field_index) &&
290 !((*field)->flags & NO_DEFAULT_VALUE_FLAG))
291 {
292 if ((*field)->validate_stored_val(thd) && thd->is_error())
293 DBUG_RETURN(true);
294 }
295 }
296
297 DBUG_RETURN(false);
298 }
299
300
301 /*
302 Prepare triggers for INSERT-like statement.
303
304 SYNOPSIS
305 prepare_triggers_for_insert_stmt()
306 table Table to which insert will happen
307
308 NOTE
309 Prepare triggers for INSERT-like statement by marking fields
310 used by triggers and inform handlers that batching of UPDATE/DELETE
311 cannot be done if there are BEFORE UPDATE/DELETE triggers.
312 */
313
prepare_triggers_for_insert_stmt(TABLE * table)314 void prepare_triggers_for_insert_stmt(TABLE *table)
315 {
316 if (table->triggers)
317 {
318 if (table->triggers->has_triggers(TRG_EVENT_DELETE,
319 TRG_ACTION_AFTER))
320 {
321 /*
322 The table has AFTER DELETE triggers that might access to
323 subject table and therefore might need delete to be done
324 immediately. So we turn-off the batching.
325 */
326 (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
327 }
328 if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
329 TRG_ACTION_AFTER))
330 {
331 /*
332 The table has AFTER UPDATE triggers that might access to subject
333 table and therefore might need update to be done immediately.
334 So we turn-off the batching.
335 */
336 (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
337 }
338 }
339 table->mark_columns_needed_for_insert();
340 }
341
342 /**
343 Setup data for field BLOB/GEOMETRY field types for execution of
344 "INSERT...UPDATE" statement. For a expression in 'UPDATE' clause
345 like "a= VALUES(a)", let as call Field* referring 'a' as LHS_FIELD
346 and Field* referring field 'a' in "VALUES(a)" as RHS_FIELD
347
348 This function creates a separate copy of the blob value for RHS_FIELD,
349 if the field is updated as well as accessed through VALUES()
350 function in 'UPDATE' clause of "INSERT...UPDATE" statement.
351
352 @param [in] thd
353 Pointer to THD object.
354
355 @param [in] fields
356 List of fields representing LHS_FIELD of all expressions
357 in 'UPDATE' clause.
358
359 @return - Can fail only when we are out of memory.
360 @retval false Success
361 @retval true Failure
362 */
363
mysql_prepare_blob_values(THD * thd,List<Item> & fields,MEM_ROOT * mem_root)364 bool mysql_prepare_blob_values(THD *thd, List<Item> &fields, MEM_ROOT *mem_root)
365 {
366 DBUG_ENTER("mysql_prepare_blob_values");
367
368 if (fields.elements <= 1)
369 DBUG_RETURN(false);
370
371 // Collect LHS_FIELD's which are updated in a 'set'.
372 // This 'set' helps decide if we need to make copy of BLOB value
373 // or not.
374
375 Prealloced_array<Field_blob *, 16, true>
376 blob_update_field_set(PSI_NOT_INSTRUMENTED);
377 if (blob_update_field_set.reserve(fields.elements))
378 DBUG_RETURN(true);
379
380 List_iterator_fast<Item> f(fields);
381 Item *fld;
382 while ((fld= f++))
383 {
384 Item_field *field= fld->field_for_view_update();
385 Field *lhs_field= field->field;
386
387 if (lhs_field->type() == MYSQL_TYPE_BLOB ||
388 lhs_field->type() == MYSQL_TYPE_GEOMETRY)
389 blob_update_field_set.insert_unique(down_cast<Field_blob *>(lhs_field));
390 }
391
392 // Traverse through thd->lex->insert_update_values_map
393 // and make copy of BLOB values in RHS_FIELD, if the same field is
394 // modified (present in above 'set' prepared).
395 if (thd->lex->has_values_map())
396 {
397 std::map<Field *, Field *>::iterator iter;
398 for(iter= thd->lex->begin_values_map();
399 iter != thd->lex->end_values_map();
400 ++iter)
401 {
402 // Retrieve the Field_blob pointers from the map.
403 // and initialize newly declared variables immediately.
404 Field_blob *lhs_field= down_cast<Field_blob *>(iter->first);
405 Field_blob *rhs_field= down_cast<Field_blob *>(iter->second);
406
407 // Check if the Field_blob object is updated before making a copy.
408 if (blob_update_field_set.count_unique(lhs_field) == 0)
409 continue;
410
411 // Copy blob value
412 if(rhs_field->copy_blob_value(mem_root))
413 DBUG_RETURN(true);
414 }
415 }
416
417 DBUG_RETURN(false);
418 }
419
420 /**
421 INSERT statement implementation
422
423 @note Like implementations of other DDL/DML in MySQL, this function
424 relies on the caller to close the thread tables. This is done in the
425 end of dispatch_command().
426 */
427
mysql_insert(THD * thd,TABLE_LIST * table_list)428 bool Sql_cmd_insert::mysql_insert(THD *thd,TABLE_LIST *table_list)
429 {
430 DBUG_ENTER("mysql_insert");
431
432 LEX *const lex= thd->lex;
433 int error, res;
434 bool err= true;
435 bool transactional_table, joins_freed= FALSE;
436 bool changed;
437 bool is_locked= false;
438 ulong counter= 0;
439 ulonglong id;
440 /*
441 We have three alternative syntax rules for the INSERT statement:
442 1) "INSERT (columns) VALUES ...", so non-listed columns need a default
443 2) "INSERT VALUES (), ..." so all columns need a default;
444 note that "VALUES (),(expr_1, ..., expr_n)" is not allowed, so checking
445 emptiness of the first row is enough
446 3) "INSERT VALUES (expr_1, ...), ..." so no defaults are needed; even if
447 expr_i is "DEFAULT" (in which case the column is set by
448 Item_default_value::save_in_field_inner()).
449 */
450 const bool manage_defaults=
451 insert_field_list.elements != 0 || // 1)
452 insert_many_values.head()->elements == 0; // 2)
453 COPY_INFO info(COPY_INFO::INSERT_OPERATION,
454 &insert_field_list,
455 manage_defaults,
456 duplicates);
457 COPY_INFO update(COPY_INFO::UPDATE_OPERATION, &insert_update_list,
458 &insert_value_list);
459 Name_resolution_context *context;
460 Name_resolution_context_state ctx_state;
461 uint num_partitions= 0;
462 enum partition_info::enum_can_prune can_prune_partitions=
463 partition_info::PRUNE_NO;
464 MY_BITMAP used_partitions;
465 bool prune_needs_default_values;
466
467 SELECT_LEX *const select_lex= lex->select_lex;
468
469 select_lex->make_active_options(0, 0);
470
471 if (open_tables_for_query(thd, table_list, 0))
472 DBUG_RETURN(true);
473
474 if (run_before_dml_hook(thd))
475 DBUG_RETURN(true);
476
477 THD_STAGE_INFO(thd, stage_init);
478 lex->used_tables=0;
479
480 List_iterator_fast<List_item> its(insert_many_values);
481 List_item *values= its++;
482 const uint value_count= values->elements;
483 TABLE *insert_table= NULL;
484 if (mysql_prepare_insert(thd, table_list, values, false))
485 goto exit_without_my_ok;
486
487 insert_table= lex->insert_table_leaf->table;
488
489 if (duplicates == DUP_UPDATE || duplicates == DUP_REPLACE)
490 prepare_for_positional_update(insert_table, table_list);
491
492 /* Must be done before can_prune_insert, due to internal initialization. */
493 if (info.add_function_default_columns(insert_table, insert_table->write_set))
494 goto exit_without_my_ok; /* purecov: inspected */
495 if (duplicates == DUP_UPDATE &&
496 update.add_function_default_columns(insert_table,
497 insert_table->write_set))
498 goto exit_without_my_ok; /* purecov: inspected */
499
500 context= &select_lex->context;
501 /*
502 These three asserts test the hypothesis that the resetting of the name
503 resolution context below is not necessary at all since the list of local
504 tables for INSERT always consists of one table.
505 */
506 assert(!table_list->next_local);
507 assert(!context->table_list->next_local);
508 assert(!context->first_name_resolution_table->next_name_resolution_table);
509
510 /* Save the state of the current name resolution context. */
511 ctx_state.save_state(context, table_list);
512
513 /*
514 Perform name resolution only in the first table - 'table_list',
515 which is the table that is inserted into.
516 */
517 assert(table_list->next_local == 0);
518 context->resolve_in_table_list_only(table_list);
519
520 if (!is_locked && insert_table->part_info)
521 {
522 if (insert_table->part_info->can_prune_insert(thd,
523 duplicates,
524 update,
525 insert_update_list,
526 insert_field_list,
527 !MY_TEST(values->elements),
528 &can_prune_partitions,
529 &prune_needs_default_values,
530 &used_partitions))
531 goto exit_without_my_ok; /* purecov: inspected */
532
533 if (can_prune_partitions != partition_info::PRUNE_NO)
534 {
535 num_partitions= insert_table->part_info->lock_partitions.n_bits;
536 /*
537 Pruning probably possible, all partitions is unmarked for read/lock,
538 and we must now add them on row by row basis.
539
540 Check the first INSERT value.
541 Do not fail here, since that would break MyISAM behavior of inserting
542 all rows before the failing row.
543
544 PRUNE_DEFAULTS means the partitioning fields are only set to DEFAULT
545 values, so we only need to check the first INSERT value, since all the
546 rest will be in the same partition.
547 */
548 if (insert_table->part_info->set_used_partition(insert_field_list,
549 *values,
550 info,
551 prune_needs_default_values,
552 &used_partitions))
553 can_prune_partitions= partition_info::PRUNE_NO;
554 }
555 }
556
557 its.rewind();
558 while ((values= its++))
559 {
560 counter++;
561 /*
562 To make it possible to increase concurrency on table level locking
563 engines such as MyISAM, we check pruning for each row until we will use
564 all partitions, Even if the number of rows is much higher than the
565 number of partitions.
566 TODO: Cache the calculated part_id and reuse in
567 ha_partition::write_row() if possible.
568 */
569 if (can_prune_partitions == partition_info::PRUNE_YES)
570 {
571 if (insert_table->part_info->set_used_partition(insert_field_list,
572 *values,
573 info,
574 prune_needs_default_values,
575 &used_partitions))
576 can_prune_partitions= partition_info::PRUNE_NO;
577 if (!(counter % num_partitions))
578 {
579 /*
580 Check if we using all partitions in table after adding partition
581 for current row to the set of used partitions. Do it only from
582 time to time to avoid overhead from bitmap_is_set_all() call.
583 */
584 if (bitmap_is_set_all(&used_partitions))
585 can_prune_partitions= partition_info::PRUNE_NO;
586 }
587 }
588 }
589 insert_table->auto_increment_field_not_null= false;
590 its.rewind ();
591
592 /* Restore the current context. */
593 ctx_state.restore_state(context, table_list);
594
595 { // Statement plan is available within these braces
596 Modification_plan plan(thd,
597 (lex->sql_command == SQLCOM_INSERT) ?
598 MT_INSERT : MT_REPLACE, insert_table,
599 NULL, false, 0);
600 DEBUG_SYNC(thd, "planned_single_insert");
601
602 if (can_prune_partitions != partition_info::PRUNE_NO)
603 {
604 /*
605 Only lock the partitions we will insert into.
606 And also only read from those partitions (duplicates etc.).
607 If explicit partition selection 'INSERT INTO t PARTITION (p1)' is used,
608 the new set of read/lock partitions is the intersection of read/lock
609 partitions and used partitions, i.e only the partitions that exists in
610 both sets will be marked for read/lock.
611 It is also safe for REPLACE, since all potentially conflicting records
612 always belong to the same partition as the one which we try to
613 insert a row. This is because ALL unique/primary keys must
614 include ALL partitioning columns.
615 */
616 bitmap_intersect(&insert_table->part_info->read_partitions,
617 &used_partitions);
618 bitmap_intersect(&insert_table->part_info->lock_partitions,
619 &used_partitions);
620 }
621
622 // Lock the tables now if not locked already.
623 if (!is_locked &&
624 lock_tables(thd, table_list, lex->table_count, 0))
625 DBUG_RETURN(true);
626
627 if (lex->describe)
628 {
629 err= explain_single_table_modification(thd, &plan, select_lex);
630 goto exit_without_my_ok;
631 }
632
633 /*
634 Count warnings for all inserts.
635 For single line insert, generate an error if try to set a NOT NULL field
636 to NULL.
637 */
638 thd->count_cuted_fields= ((insert_many_values.elements == 1 &&
639 !lex->is_ignore()) ?
640 CHECK_FIELD_ERROR_FOR_NULL :
641 CHECK_FIELD_WARN);
642 thd->cuted_fields = 0L;
643 insert_table->next_number_field= insert_table->found_next_number_field;
644
645 #ifdef HAVE_REPLICATION
646 if (thd->slave_thread)
647 {
648 /* Get SQL thread's rli, even for a slave worker thread */
649 Relay_log_info* c_rli= thd->rli_slave->get_c_rli();
650 assert(c_rli != NULL);
651 if(info.get_duplicate_handling() == DUP_UPDATE &&
652 insert_table->next_number_field != NULL &&
653 rpl_master_has_bug(c_rli, 24432, TRUE, NULL, NULL))
654 goto exit_without_my_ok;
655 }
656 #endif
657
658 error=0;
659 THD_STAGE_INFO(thd, stage_update);
660 if (duplicates == DUP_REPLACE &&
661 (!insert_table->triggers ||
662 !insert_table->triggers->has_delete_triggers()))
663 insert_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
664 if (duplicates == DUP_UPDATE)
665 insert_table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
666 /*
667 let's *try* to start bulk inserts. It won't necessary
668 start them as insert_many_values.elements should be greater than
669 some - handler dependent - threshold.
670 We should not start bulk inserts if this statement uses
671 functions or invokes triggers since they may access
672 to the same table and therefore should not see its
673 inconsistent state created by this optimization.
674 So we call start_bulk_insert to perform nesessary checks on
675 insert_many_values.elements, and - if nothing else - to initialize
676 the code to make the call of end_bulk_insert() below safe.
677 */
678 if (duplicates != DUP_ERROR || lex->is_ignore())
679 insert_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
680 /**
681 This is a simple check for the case when the table has a trigger
682 that reads from it, or when the statement invokes a stored function
683 that reads from the table being inserted to.
684 Engines can't handle a bulk insert in parallel with a read form the
685 same table in the same connection.
686 */
687 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
688 insert_table->file->ha_start_bulk_insert(insert_many_values.elements);
689
690 prepare_triggers_for_insert_stmt(insert_table);
691
692 for (Field** next_field= insert_table->field; *next_field; ++next_field)
693 {
694 (*next_field)->reset_warnings();
695 }
696
697 while ((values= its++))
698 {
699 if (insert_field_list.elements || !value_count)
700 {
701 restore_record(insert_table, s->default_values); // Get empty record
702
703 /*
704 Check whether default values of the insert_field_list not specified in
705 column list are correct or not.
706 */
707 if (validate_default_values_of_unset_fields(thd, insert_table))
708 {
709 error= 1;
710 break;
711 }
712 if (fill_record_n_invoke_before_triggers(thd, &info, insert_field_list,
713 *values, insert_table,
714 TRG_EVENT_INSERT,
715 insert_table->s->fields))
716 {
717 assert(thd->is_error());
718 /*
719 TODO: Convert warnings to errors if values_list.elements == 1
720 and check that all items return warning in case of problem with
721 storing field.
722 */
723 error= 1;
724 break;
725 }
726
727 res= check_that_all_fields_are_given_values(thd, insert_table,
728 table_list);
729 if (res)
730 {
731 assert(thd->is_error());
732 error= 1;
733 break;
734 }
735 }
736 else
737 {
738 if (lex->used_tables) // Column used in values()
739 restore_record(insert_table, s->default_values); // Get empty record
740 else
741 {
742 TABLE_SHARE *share= insert_table->s;
743
744 /*
745 Fix delete marker. No need to restore rest of record since it will
746 be overwritten by fill_record() anyway (and fill_record() does not
747 use default values in this case).
748 */
749 insert_table->record[0][0]= share->default_values[0];
750
751 /* Fix undefined null_bits. */
752 if (share->null_bytes > 1 && share->last_null_bit_pos)
753 {
754 insert_table->record[0][share->null_bytes - 1]=
755 share->default_values[share->null_bytes - 1];
756 }
757 }
758 if (fill_record_n_invoke_before_triggers(thd, insert_table->field,
759 *values, insert_table,
760 TRG_EVENT_INSERT,
761 insert_table->s->fields))
762 {
763 assert(thd->is_error());
764 error= 1;
765 break;
766 }
767 }
768
769 if ((res= table_list->view_check_option(thd)) == VIEW_CHECK_SKIP)
770 continue;
771 else if (res == VIEW_CHECK_ERROR)
772 {
773 error= 1;
774 break;
775 }
776 error= write_record(thd, insert_table, &info, &update);
777 if (error)
778 break;
779 thd->get_stmt_da()->inc_current_row_for_condition();
780 }
781 } // Statement plan is available within these braces
782
783 error= thd->get_stmt_da()->is_error();
784 free_underlaid_joins(thd, select_lex);
785 joins_freed= true;
786
787 /*
788 Now all rows are inserted. Time to update logs and sends response to
789 user
790 */
791 {
792 /* TODO: Only call this if insert_table->found_next_number_field.*/
793 insert_table->file->ha_release_auto_increment();
794 /*
795 Make sure 'end_bulk_insert()' is called regardless of current error
796 */
797 int loc_error= 0;
798 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
799 loc_error= insert_table->file->ha_end_bulk_insert();
800 /*
801 Report error if 'end_bulk_insert()' failed, and set 'error' to 1
802 */
803 if (loc_error && !error)
804 {
805 /* purecov: begin inspected */
806 myf error_flags= MYF(0);
807 if (insert_table->file->is_fatal_error(loc_error))
808 error_flags|= ME_FATALERROR;
809
810 insert_table->file->print_error(loc_error, error_flags);
811 error= 1;
812 /* purecov: end */
813 }
814 if (duplicates != DUP_ERROR || lex->is_ignore())
815 insert_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
816
817 transactional_table= insert_table->file->has_transactions();
818
819 if ((changed= (info.stats.copied || info.stats.deleted || info.stats.updated)))
820 {
821 /*
822 Invalidate the table in the query cache if something changed.
823 For the transactional algorithm to work the invalidation must be
824 before binlog writing and ha_autocommit_or_rollback
825 */
826 query_cache.invalidate_single(thd, lex->insert_table_leaf, true);
827 DEBUG_SYNC(thd, "wait_after_query_cache_invalidate");
828 }
829
830 if (error <= 0 || thd->get_transaction()->cannot_safely_rollback(
831 Transaction_ctx::STMT))
832 {
833 if (mysql_bin_log.is_open())
834 {
835 int errcode= 0;
836 if (error <= 0)
837 {
838 /*
839 [Guilhem wrote] Temporary errors may have filled
840 thd->net.last_error/errno. For example if there has
841 been a disk full error when writing the row, and it was
842 MyISAM, then thd->net.last_error/errno will be set to
843 "disk full"... and the mysql_file_pwrite() will wait until free
844 space appears, and so when it finishes then the
845 write_row() was entirely successful
846 */
847 /* todo: consider removing */
848 thd->clear_error();
849 }
850 else
851 errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
852
853 /* bug#22725:
854
855 A query which per-row-loop can not be interrupted with
856 KILLED, like INSERT, and that does not invoke stored
857 routines can be binlogged with neglecting the KILLED error.
858
859 If there was no error (error == zero) until after the end of
860 inserting loop the KILLED flag that appeared later can be
861 disregarded since previously possible invocation of stored
862 routines did not result in any error due to the KILLED. In
863 such case the flag is ignored for constructing binlog event.
864 */
865 assert(thd->killed != THD::KILL_BAD_DATA || error > 0);
866 if (thd->binlog_query(THD::ROW_QUERY_TYPE,
867 thd->query().str, thd->query().length,
868 transactional_table, FALSE, FALSE,
869 errcode))
870 error= 1;
871 }
872 }
873 assert(transactional_table || !changed ||
874 thd->get_transaction()->cannot_safely_rollback(
875 Transaction_ctx::STMT));
876 }
877 THD_STAGE_INFO(thd, stage_end);
878 /*
879 We'll report to the client this id:
880 - if the table contains an autoincrement column and we successfully
881 inserted an autogenerated value, the autogenerated value.
882 - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
883 called, X.
884 - if the table contains an autoincrement column, and some rows were
885 inserted, the id of the last "inserted" row (if IGNORE, that value may not
886 have been really inserted but ignored).
887 */
888 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
889 thd->first_successful_insert_id_in_cur_stmt :
890 (thd->arg_of_last_insert_id_function ?
891 thd->first_successful_insert_id_in_prev_stmt :
892 ((insert_table->next_number_field && info.stats.copied) ?
893 insert_table->next_number_field->val_int() : 0));
894 insert_table->next_number_field= 0;
895 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
896 insert_table->auto_increment_field_not_null= FALSE;
897 if (duplicates == DUP_REPLACE &&
898 (!insert_table->triggers ||
899 !insert_table->triggers->has_delete_triggers()))
900 insert_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
901
902 if (thd->is_error())
903 goto exit_without_my_ok;
904
905 if (insert_many_values.elements == 1 &&
906 (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
907 {
908 my_ok(thd, info.stats.copied + info.stats.deleted +
909 (thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
910 info.stats.touched : info.stats.updated),
911 id);
912 }
913 else
914 {
915 char buff[160];
916 ha_rows updated=
917 thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
918 info.stats.touched : info.stats.updated;
919 if (lex->is_ignore())
920 my_snprintf(buff, sizeof(buff),
921 ER(ER_INSERT_INFO), (long) info.stats.records,
922 (long) (info.stats.records - info.stats.copied),
923 (long) thd->get_stmt_da()->current_statement_cond_count());
924 else
925 my_snprintf(buff, sizeof(buff),
926 ER(ER_INSERT_INFO), (long) info.stats.records,
927 (long) (info.stats.deleted + updated),
928 (long) thd->get_stmt_da()->current_statement_cond_count());
929 my_ok(thd, info.stats.copied + info.stats.deleted + updated, id, buff);
930 }
931 DBUG_RETURN(FALSE);
932
933 exit_without_my_ok:
934 thd->lex->clear_values_map();
935 if (!joins_freed)
936 free_underlaid_joins(thd, select_lex);
937 DBUG_RETURN(err);
938 }
939
940
941 /**
942 Additional check for insertability for VIEW
943
944 A view is insertable if the following conditions are true:
945 - All columns being inserted into are from a single table.
946 - All not used columns in table have default values.
947 - All columns in view are distinct (not referring to the same column).
948
949 @param thd thread handler
950 @param[in,out] view reference to view being inserted into.
951 view->contain_auto_increment is true if and only if
952 the view contains an auto_increment field.
953 @param insert_table_ref reference to underlying table being inserted into
954
955 @return false if success, true if error
956 */
957
check_view_insertability(THD * thd,TABLE_LIST * view,const TABLE_LIST * insert_table_ref)958 static bool check_view_insertability(THD *thd, TABLE_LIST *view,
959 const TABLE_LIST *insert_table_ref)
960 {
961 DBUG_ENTER("check_view_insertability");
962
963 const uint num= view->view_query()->select_lex->item_list.elements;
964 TABLE *const table= insert_table_ref->table;
965 MY_BITMAP used_fields;
966 enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
967
968 const uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
969 uint32 *const used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
970 if (!used_fields_buff)
971 DBUG_RETURN(true); /* purecov: inspected */
972
973 assert(view->table == NULL &&
974 table != NULL &&
975 view->field_translation != 0);
976
977 (void) bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
978 bitmap_clear_all(&used_fields);
979
980 view->contain_auto_increment= false;
981
982 thd->mark_used_columns= MARK_COLUMNS_NONE;
983
984 // No privilege checking is done for these columns
985 Column_privilege_tracker column_privilege(thd, 0);
986
987 /* check simplicity and prepare unique test of view */
988 Field_translator *const trans_start= view->field_translation;
989 Field_translator *const trans_end= trans_start + num;
990
991 for (Field_translator *trans= trans_start; trans != trans_end; trans++)
992 {
993 if (trans->item == NULL)
994 continue;
995 /*
996 @todo
997 This fix_fields() call is necessary for execution of prepared statements.
998 When repeated preparation is eliminated the call can be deleted.
999 */
1000 if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1001 DBUG_RETURN(true); /* purecov: inspected */
1002
1003 Item_field *field;
1004 /* simple SELECT list entry (field without expression) */
1005 if (!(field= trans->item->field_for_view_update()))
1006 DBUG_RETURN(true);
1007
1008 if (field->field->unireg_check == Field::NEXT_NUMBER)
1009 view->contain_auto_increment= true;
1010 /* prepare unique test */
1011 /*
1012 remove collation (or other transparent for update function) if we have
1013 it
1014 */
1015 trans->item= field;
1016 }
1017 thd->mark_used_columns= save_mark_used_columns;
1018
1019 /* unique test */
1020 for (Field_translator *trans= trans_start; trans != trans_end; trans++)
1021 {
1022 if (trans->item == NULL)
1023 continue;
1024 /* Thanks to test above, we know that all columns are of type Item_field */
1025 Item_field *field= (Item_field *)trans->item;
1026 /* check fields belong to table in which we are inserting */
1027 if (field->field->table == table &&
1028 bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1029 DBUG_RETURN(true);
1030 }
1031
1032 DBUG_RETURN(false);
1033 }
1034
1035
1036 /**
1037 Recursive helper function for resolving join conditions for
1038 insertion into view for prepared statements.
1039
1040 @param thd Thread handler
1041 @param tr Table structure which is traversed recursively
1042
1043 @return false if success, true if error
1044 */
fix_join_cond_for_insert(THD * thd,TABLE_LIST * tr)1045 static bool fix_join_cond_for_insert(THD *thd, TABLE_LIST *tr)
1046 {
1047 if (tr->join_cond() && !tr->join_cond()->fixed)
1048 {
1049 Column_privilege_tracker column_privilege(thd, SELECT_ACL);
1050
1051 if (tr->join_cond()->fix_fields(thd, NULL))
1052 return true; /* purecov: inspected */
1053 }
1054
1055 if (tr->nested_join == NULL)
1056 return false;
1057
1058 List_iterator<TABLE_LIST> li(tr->nested_join->join_list);
1059 TABLE_LIST *ti;
1060
1061 while ((ti= li++))
1062 {
1063 if (fix_join_cond_for_insert(thd, ti))
1064 return true; /* purecov: inspected */
1065 }
1066 return false;
1067 }
1068
1069 /**
1070 Check if table can be updated
1071
1072 @param thd Thread handle
1073 @param table_list Table reference
1074 @param fields List of fields to be inserted
1075 @param select_insert True if processing INSERT ... SELECT statement
1076
1077 @return false if success, true if error
1078 */
1079
1080 bool
mysql_prepare_insert_check_table(THD * thd,TABLE_LIST * table_list,List<Item> & fields,bool select_insert)1081 Sql_cmd_insert_base::mysql_prepare_insert_check_table(THD *thd,
1082 TABLE_LIST *table_list,
1083 List<Item> &fields,
1084 bool select_insert)
1085 {
1086 DBUG_ENTER("mysql_prepare_insert_check_table");
1087
1088 SELECT_LEX *const select= thd->lex->select_lex;
1089 const bool insert_into_view= table_list->is_view();
1090
1091 if (select->setup_tables(thd, table_list, select_insert))
1092 DBUG_RETURN(true); /* purecov: inspected */
1093
1094 if (insert_into_view)
1095 {
1096 // Allowing semi-join would transform this table into a "join view"
1097 if (table_list->resolve_derived(thd, false))
1098 DBUG_RETURN(true);
1099
1100 if (select->merge_derived(thd, table_list))
1101 DBUG_RETURN(true); /* purecov: inspected */
1102
1103 /*
1104 On second preparation, we may need to resolve view condition generated
1105 when merging the view.
1106 */
1107 if (!select->first_execution && table_list->is_merged() &&
1108 fix_join_cond_for_insert(thd, table_list))
1109 DBUG_RETURN(true); /* purecov: inspected */
1110 }
1111
1112 if (!table_list->is_insertable())
1113 {
1114 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
1115 DBUG_RETURN(true);
1116 }
1117
1118 // Allow semi-join for selected tables containing subqueries
1119 if (select->derived_table_count && select->resolve_derived(thd, true))
1120 DBUG_RETURN(true);
1121
1122 /*
1123 First table in list is the one being inserted into, requires INSERT_ACL.
1124 All other tables require SELECT_ACL only.
1125 */
1126 if (select->derived_table_count &&
1127 select->check_view_privileges(thd, INSERT_ACL, SELECT_ACL))
1128 DBUG_RETURN(true);
1129
1130 // Precompute and store the row types of NATURAL/USING joins.
1131 if (setup_natural_join_row_types(thd, select->join_list, &select->context))
1132 DBUG_RETURN(true);
1133
1134 if (insert_into_view && !fields.elements)
1135 {
1136 empty_field_list_on_rset= true;
1137 if (table_list->is_multiple_tables())
1138 {
1139 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1140 table_list->view_db.str, table_list->view_name.str);
1141 DBUG_RETURN(true);
1142 }
1143 if (insert_view_fields(thd, &fields, table_list))
1144 DBUG_RETURN(true);
1145 /*
1146 Item_fields inserted above from field_translation list have been
1147 already fixed in resolved_derived(), thus setup_fields() in
1148 check_insert_fields() will not process them, not mark them in write_set;
1149 we have to do it:
1150 */
1151 bitmap_set_all(table_list->updatable_base_table()->table->write_set);
1152 }
1153
1154 DBUG_RETURN(false);
1155 }
1156
1157
1158 /**
1159 Get extra info for tables we insert into
1160
1161 @param table table(TABLE object) we insert into,
1162 might be NULL in case of view
1163 @param table(TABLE_LIST object) or view we insert into
1164 */
1165
prepare_for_positional_update(TABLE * table,TABLE_LIST * tables)1166 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1167 {
1168 if (table)
1169 {
1170 table->prepare_for_position();
1171 return;
1172 }
1173
1174 assert(tables->is_view());
1175 List_iterator<TABLE_LIST> it(*tables->view_tables);
1176 TABLE_LIST *tbl;
1177 while ((tbl= it++))
1178 prepare_for_positional_update(tbl->table, tbl);
1179
1180 return;
1181 }
1182
1183
1184 /**
1185 Prepare items in INSERT statement
1186
1187 @param thd Thread handler
1188 @param table_list Global/local table list
1189 @param values List of values to be inserted
1190 @param duplic What to do on duplicate key error
1191 @param where Where clause (for insert ... select)
1192 @param select_insert TRUE if INSERT ... SELECT statement
1193
1194 @todo (in far future)
1195 In cases of:
1196 INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1197 ON DUPLICATE KEY ...
1198 we should be able to refer to sum1 in the ON DUPLICATE KEY part
1199
1200 WARNING
1201 You MUST set table->insert_values to 0 after calling this function
1202 before releasing the table object.
1203
1204 @return false if success, true if error
1205 */
1206
mysql_prepare_insert(THD * thd,TABLE_LIST * table_list,List_item * values,bool select_insert)1207 bool Sql_cmd_insert_base::mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1208 List_item *values,
1209 bool select_insert)
1210 {
1211 DBUG_ENTER("mysql_prepare_insert");
1212
1213 // INSERT should have a SELECT or VALUES clause
1214 assert (!select_insert || !values);
1215
1216 // Number of update fields must match number of update values
1217 assert(insert_update_list.elements == insert_value_list.elements);
1218
1219 LEX * const lex= thd->lex;
1220 SELECT_LEX *const select_lex= lex->select_lex;
1221 Name_resolution_context *const context= &select_lex->context;
1222 Name_resolution_context_state ctx_state;
1223 const bool insert_into_view= table_list->is_view();
1224 bool res= false;
1225
1226 DBUG_PRINT("enter", ("table_list 0x%lx, view %d",
1227 (ulong)table_list,
1228 (int)insert_into_view));
1229 /*
1230 For subqueries in VALUES() we should not see the table in which we are
1231 inserting (for INSERT ... SELECT this is done by changing table_list,
1232 because INSERT ... SELECT share SELECT_LEX it with SELECT.
1233 */
1234 if (!select_insert)
1235 {
1236 for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1237 un;
1238 un= un->next_unit())
1239 {
1240 for (SELECT_LEX *sl= un->first_select();
1241 sl;
1242 sl= sl->next_select())
1243 {
1244 sl->context.outer_context= 0;
1245 }
1246 }
1247 }
1248
1249 if (mysql_prepare_insert_check_table(thd, table_list, insert_field_list,
1250 select_insert))
1251 DBUG_RETURN(true);
1252
1253 // REPLACE for a JOIN view is not permitted.
1254 if (table_list->is_multiple_tables() && duplicates == DUP_REPLACE)
1255 {
1256 my_error(ER_VIEW_DELETE_MERGE_VIEW, MYF(0),
1257 table_list->view_db.str, table_list->view_name.str);
1258 DBUG_RETURN(true);
1259 }
1260
1261 if (duplicates == DUP_UPDATE)
1262 {
1263 /* it should be allocated before Item::fix_fields() */
1264 if (table_list->set_insert_values(thd->mem_root))
1265 DBUG_RETURN(true); /* purecov: inspected */
1266 }
1267
1268 // Save the state of the current name resolution context.
1269 ctx_state.save_state(context, table_list);
1270
1271 // Prepare the lists of columns and values in the statement.
1272 if (values)
1273 {
1274 // if we have INSERT ... VALUES () we cannot have a GROUP BY clause
1275 assert (!select_lex->group_list.elements);
1276
1277 /*
1278 Perform name resolution only in the first table - 'table_list',
1279 which is the table that is inserted into.
1280 */
1281 assert(table_list->next_local == NULL);
1282 table_list->next_local= NULL;
1283 context->resolve_in_table_list_only(table_list);
1284
1285 if (!res)
1286 res= check_insert_fields(thd, context->table_list, insert_field_list,
1287 values->elements, true, !insert_into_view);
1288 table_map map= 0;
1289 if (!res)
1290 map= lex->insert_table_leaf->map();
1291
1292 // values is reset here to cover all the rows in the VALUES-list.
1293 List_iterator_fast<List_item> its(insert_many_values);
1294
1295 // Check whether all rows have the same number of fields.
1296 const uint value_count= values->elements;
1297 ulong counter= 0;
1298 while ((values= its++))
1299 {
1300 counter++;
1301 if (values->elements != value_count)
1302 {
1303 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
1304 DBUG_RETURN(true);
1305 }
1306
1307 if (!res)
1308 res= setup_fields(thd, Ref_ptr_array(), *values, SELECT_ACL, NULL,
1309 false, false);
1310 if (!res)
1311 res= check_valid_table_refs(table_list, *values, map);
1312
1313 if (!res && lex->insert_table_leaf->table->has_gcol())
1314 res= validate_gc_assignment(thd, &insert_field_list, values,
1315 lex->insert_table_leaf->table);
1316 }
1317 its.rewind();
1318 values= its++;
1319
1320 if (!res && duplicates == DUP_UPDATE)
1321 {
1322 #ifndef NO_EMBEDDED_ACCESS_CHECKS
1323 table_list->set_want_privilege(UPDATE_ACL);
1324 #endif
1325 // Setup the columns to be updated
1326 res= setup_fields(thd, Ref_ptr_array(),
1327 insert_update_list, UPDATE_ACL, NULL, false, true);
1328 if (!res)
1329 res= check_valid_table_refs(table_list, insert_update_list, map);
1330
1331 // Setup the corresponding values
1332 thd->lex->in_update_value_clause= true;
1333 if (!res)
1334 res= setup_fields(thd, Ref_ptr_array(), insert_value_list, SELECT_ACL,
1335 NULL, false, false);
1336 thd->lex->in_update_value_clause= false;
1337
1338 if (!res)
1339 res= check_valid_table_refs(table_list, insert_value_list, map);
1340
1341 if (!res && lex->insert_table_leaf->table->has_gcol())
1342 res= validate_gc_assignment(thd, &insert_update_list,
1343 &insert_value_list,
1344 lex->insert_table_leaf->table);
1345 }
1346 }
1347 else if (thd->stmt_arena->is_stmt_prepare())
1348 {
1349 /*
1350 This section of code is more or less a duplicate of the code in
1351 Query_result_insert::prepare, and the 'if' branch above.
1352 @todo Consolidate these three sections into one.
1353 */
1354 /*
1355 Perform name resolution only in the first table - 'table_list',
1356 which is the table that is inserted into.
1357 */
1358 table_list->next_local= NULL;
1359 thd->dup_field= NULL;
1360 context->resolve_in_table_list_only(table_list);
1361
1362 /*
1363 When processing a prepared INSERT ... SELECT statement,
1364 mysql_prepare_insert() is called from
1365 mysql_insert_select_prepare_tester(), when the values list (aka the
1366 SELECT list from the SELECT) is not resolved yet, so pass "false"
1367 for value_count_known.
1368 */
1369 res= check_insert_fields(thd, context->table_list, insert_field_list, 0,
1370 false, !insert_into_view);
1371 table_map map= 0;
1372 if (!res)
1373 map= lex->insert_table_leaf->map();
1374
1375 if (!res && lex->insert_table_leaf->table->vfield)
1376 res= validate_gc_assignment(thd, &insert_field_list, values,
1377 lex->insert_table_leaf->table);
1378
1379 if (!res && duplicates == DUP_UPDATE)
1380 {
1381 #ifndef NO_EMBEDDED_ACCESS_CHECKS
1382 table_list->set_want_privilege(UPDATE_ACL);
1383 #endif
1384 // Setup the columns to be modified
1385 res= setup_fields(thd, Ref_ptr_array(),
1386 insert_update_list, UPDATE_ACL, NULL, false, true);
1387 if (!res)
1388 res= check_valid_table_refs(table_list, insert_update_list, map);
1389
1390 if (!res && lex->insert_table_leaf->table->vfield)
1391 res= validate_gc_assignment(thd, &insert_update_list,
1392 &insert_value_list,
1393 lex->insert_table_leaf->table);
1394 assert(!table_list->next_name_resolution_table);
1395 if (select_lex->group_list.elements == 0 && !select_lex->with_sum_func)
1396 {
1397 /*
1398 There are two separata name resolution contexts:
1399 the INSERT table and the tables in the SELECT expression
1400 Make a single context out of them by concatenating the lists:
1401 */
1402 table_list->next_name_resolution_table=
1403 ctx_state.get_first_name_resolution_table();
1404 }
1405 thd->lex->in_update_value_clause= true;
1406 if (!res)
1407 res= setup_fields(thd, Ref_ptr_array(), insert_value_list,
1408 SELECT_ACL, NULL, false, false);
1409 thd->lex->in_update_value_clause= false;
1410
1411 /*
1412 Notice that there is no need to apply the Item::update_value_transformer
1413 here, as this will be done during EXECUTE in
1414 Query_result_insert::prepare().
1415 */
1416 }
1417 }
1418
1419 // Restore the current name resolution context
1420 ctx_state.restore_state(context, table_list);
1421
1422 if (res)
1423 DBUG_RETURN(res);
1424
1425 if (!select_insert)
1426 {
1427 TABLE_LIST *const duplicate=
1428 unique_table(thd, lex->insert_table_leaf, table_list->next_global, true);
1429 if (duplicate)
1430 {
1431 update_non_unique_table_error(table_list, "INSERT", duplicate);
1432 DBUG_RETURN(true);
1433 }
1434 }
1435
1436 if (table_list->is_merged())
1437 {
1438 Column_privilege_tracker column_privilege(thd, SELECT_ACL);
1439
1440 if (table_list->prepare_check_option(thd))
1441 DBUG_RETURN(true);
1442
1443 if (duplicates == DUP_REPLACE &&
1444 table_list->prepare_replace_filter(thd))
1445 DBUG_RETURN(true);
1446 }
1447
1448 if (!select_insert && select_lex->apply_local_transforms(thd, false))
1449 DBUG_RETURN(true);
1450
1451 DBUG_RETURN(false);
1452 }
1453
1454
1455 /* Check if there is more uniq keys after field */
1456
last_uniq_key(TABLE * table,uint keynr)1457 static int last_uniq_key(TABLE *table,uint keynr)
1458 {
1459 /*
1460 When an underlying storage engine informs that the unique key
1461 conflicts are not reported in the ascending order by setting
1462 the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1463 information to determine the last key conflict.
1464
1465 The information about the last key conflict will be used to
1466 do a replace of the new row on the conflicting row, rather
1467 than doing a delete (of old row) + insert (of new row).
1468
1469 Hence check for this flag and disable replacing the last row
1470 by returning 0 always. Returning 0 will result in doing
1471 a delete + insert always.
1472 */
1473 if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1474 return 0;
1475
1476 while (++keynr < table->s->keys)
1477 if (table->key_info[keynr].flags & HA_NOSAME)
1478 return 0;
1479 return 1;
1480 }
1481
1482
1483 /**
1484 Write a record to table with optional deletion of conflicting records,
1485 invoke proper triggers if needed.
1486
1487 SYNOPSIS
1488 write_record()
1489 thd - thread context
1490 table - table to which record should be written
1491 info - COPY_INFO structure describing handling of duplicates
1492 and which is used for counting number of records inserted
1493 and deleted.
1494 update - COPY_INFO structure describing the UPDATE part (only used for
1495 INSERT ON DUPLICATE KEY UPDATE)
1496
1497 @note
1498
1499 Once this record is written to the table buffer, any AFTER INSERT trigger
1500 will be invoked. If instead of inserting a new record we end up updating an
1501 old one, both ON UPDATE triggers will fire instead. Similarly both ON
1502 DELETE triggers will be invoked if are to delete conflicting records.
1503
1504 Call thd->transaction.stmt.mark_modified_non_trans_table() if table is a
1505 non-transactional table.
1506
1507 RETURN VALUE
1508 0 - success
1509 non-0 - error
1510 */
1511
write_record(THD * thd,TABLE * table,COPY_INFO * info,COPY_INFO * update)1512 int write_record(THD *thd, TABLE *table, COPY_INFO *info, COPY_INFO *update)
1513 {
1514 int error, trg_error= 0;
1515 char *key=0;
1516 MY_BITMAP *save_read_set, *save_write_set;
1517 ulonglong prev_insert_id= table->file->next_insert_id;
1518 ulonglong insert_id_for_cur_row= 0;
1519 MEM_ROOT mem_root;
1520 DBUG_ENTER("write_record");
1521
1522 /* Here we are using separate MEM_ROOT as this memory should be freed once we
1523 exit write_record() function. This is marked as not instumented as it is
1524 allocated for very short time in a very specific case.
1525 */
1526 init_sql_alloc(PSI_NOT_INSTRUMENTED, &mem_root, 256, 0);
1527 info->stats.records++;
1528 save_read_set= table->read_set;
1529 save_write_set= table->write_set;
1530
1531 info->set_function_defaults(table);
1532
1533 const enum_duplicates duplicate_handling= info->get_duplicate_handling();
1534
1535 if (duplicate_handling == DUP_REPLACE || duplicate_handling == DUP_UPDATE)
1536 {
1537 assert(duplicate_handling != DUP_UPDATE || update != NULL);
1538 while ((error=table->file->ha_write_row(table->record[0])))
1539 {
1540 uint key_nr;
1541 /*
1542 If we do more than one iteration of this loop, from the second one the
1543 row will have an explicit value in the autoinc field, which was set at
1544 the first call of handler::update_auto_increment(). So we must save
1545 the autogenerated value to avoid thd->insert_id_for_cur_row to become
1546 0.
1547 */
1548 if (table->file->insert_id_for_cur_row > 0)
1549 insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1550 else
1551 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1552 bool is_duplicate_key_error;
1553 if (!table->file->is_ignorable_error(error))
1554 goto err;
1555 is_duplicate_key_error= (error == HA_ERR_FOUND_DUPP_KEY ||
1556 error == HA_ERR_FOUND_DUPP_UNIQUE);
1557 if (!is_duplicate_key_error)
1558 {
1559 /*
1560 We come here when we had an ignorable error which is not a duplicate
1561 key error. In this we ignore error if ignore flag is set, otherwise
1562 report error as usual. We will not do any duplicate key processing.
1563 */
1564 info->last_errno= error;
1565 table->file->print_error(error, MYF(0));
1566 /*
1567 If IGNORE option is used, handler errors will be downgraded
1568 to warnings and don't have to stop the iteration.
1569 */
1570 if (thd->is_error())
1571 goto before_trg_err;
1572 goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1573 }
1574 if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
1575 {
1576 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1577 goto err;
1578 }
1579 /*
1580 key index value is either valid in the range [0-MAX_KEY) or
1581 has value MAX_KEY as a marker for the case when no information
1582 about key can be found. In the last case we have to require
1583 that storage engine has the flag HA_DUPLICATE_POS turned on.
1584 If this invariant is false then assert will crash
1585 the server built in debug mode. For the server that was built
1586 without DEBUG we have additional check for the value of key_nr
1587 in the code below in order to report about error in any case.
1588 */
1589 assert(key_nr != MAX_KEY ||
1590 (key_nr == MAX_KEY &&
1591 (table->file->ha_table_flags() & HA_DUPLICATE_POS)));
1592
1593 DEBUG_SYNC(thd, "write_row_replace");
1594
1595 /* Read all columns for the row we are going to replace */
1596 table->use_all_columns();
1597 /*
1598 Don't allow REPLACE to replace a row when a auto_increment column
1599 was used. This ensures that we don't get a problem when the
1600 whole range of the key has been used.
1601 */
1602 if (duplicate_handling == DUP_REPLACE &&
1603 table->next_number_field &&
1604 key_nr == table->s->next_number_index &&
1605 (insert_id_for_cur_row > 0))
1606 goto err;
1607 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1608 {
1609 if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1610 goto err;
1611 }
1612 /*
1613 If the key index is equal to MAX_KEY it's treated as unknown key case
1614 and we shouldn't try to locate key info.
1615 */
1616 else if (key_nr < MAX_KEY)
1617 {
1618 if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1619 {
1620 error=my_errno();
1621 goto err;
1622 }
1623
1624 if (!key)
1625 {
1626 if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
1627 MAX_KEY_LENGTH)))
1628 {
1629 error=ENOMEM;
1630 goto err;
1631 }
1632 }
1633 /*
1634 If we convert INSERT operation internally to an UPDATE.
1635 An INSERT operation may update table->vfield for BLOB fields,
1636 So here we recalculate data for generated columns.
1637 */
1638 if (table->vfield) {
1639 update_generated_write_fields(table->write_set, table);
1640 }
1641 key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1642 if ((error=(table->file->ha_index_read_idx_map(table->record[1],key_nr,
1643 (uchar*) key, HA_WHOLE_KEY,
1644 HA_READ_KEY_EXACT))))
1645 goto err;
1646 }
1647 else
1648 {
1649 /*
1650 For the server built in non-debug mode returns error if
1651 handler::get_dup_key() returned MAX_KEY as the value of key index.
1652 */
1653 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1654 goto err;
1655 }
1656 if (duplicate_handling == DUP_UPDATE)
1657 {
1658 int res= 0;
1659 /*
1660 We don't check for other UNIQUE keys - the first row
1661 that matches, is updated. If update causes a conflict again,
1662 an error is returned
1663 */
1664 assert(table->insert_values != NULL);
1665 /*
1666 The insert has failed, store the insert_id generated for
1667 this row to be re-used for the next insert.
1668 */
1669 if (insert_id_for_cur_row > 0) prev_insert_id = insert_id_for_cur_row;
1670
1671 store_record(table,insert_values);
1672 /*
1673 Special check for BLOB/GEOMETRY field in statements with
1674 "ON DUPLICATE KEY UPDATE" clause.
1675 See mysql_prepare_blob_values() function for more details.
1676 */
1677 if (mysql_prepare_blob_values(thd,
1678 *update->get_changed_columns(),
1679 &mem_root))
1680 goto before_trg_err;
1681 restore_record(table,record[1]);
1682 assert(update->get_changed_columns()->elements ==
1683 update->update_values->elements);
1684 if (fill_record_n_invoke_before_triggers(thd, update,
1685 *update->get_changed_columns(),
1686 *update->update_values,
1687 table, TRG_EVENT_UPDATE, 0))
1688 goto before_trg_err;
1689
1690 bool insert_id_consumed= false;
1691 if (// UPDATE clause specifies a value for the auto increment field
1692 table->auto_increment_field_not_null &&
1693 // An auto increment value has been generated for this row
1694 (insert_id_for_cur_row > 0))
1695 {
1696 // After-update value:
1697 const ulonglong auto_incr_val= table->next_number_field->val_int();
1698 if (auto_incr_val == insert_id_for_cur_row)
1699 {
1700 // UPDATE wants to use the generated value
1701 insert_id_consumed= true;
1702 }
1703 else if (table->file->auto_inc_interval_for_cur_row.
1704 in_range(auto_incr_val))
1705 {
1706 /*
1707 UPDATE wants to use one auto generated value which we have already
1708 reserved for another (previous or following) row. That may cause
1709 a duplicate key error if we later try to insert the reserved
1710 value. Such conflicts on auto generated values would be strange
1711 behavior, so we return a clear error now.
1712 */
1713 my_error(ER_AUTO_INCREMENT_CONFLICT, MYF(0));
1714 goto before_trg_err;
1715 }
1716 }
1717
1718 if (!insert_id_consumed)
1719 table->file->restore_auto_increment(prev_insert_id);
1720
1721 /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1722 {
1723 const TABLE_LIST *inserted_view=
1724 table->pos_in_table_list->belong_to_view;
1725 if (inserted_view != NULL)
1726 {
1727 res= inserted_view->view_check_option(thd);
1728 if (res == VIEW_CHECK_SKIP)
1729 goto ok_or_after_trg_err;
1730 if (res == VIEW_CHECK_ERROR)
1731 goto before_trg_err;
1732 }
1733 }
1734
1735 info->stats.touched++;
1736 if (!records_are_comparable(table) || compare_records(table))
1737 {
1738 // Handle the INSERT ON DUPLICATE KEY UPDATE operation
1739 update->set_function_defaults(table);
1740
1741 if ((error=table->file->ha_update_row(table->record[1],
1742 table->record[0])) &&
1743 error != HA_ERR_RECORD_IS_THE_SAME)
1744 {
1745 info->last_errno= error;
1746 myf error_flags= MYF(0);
1747 if (table->file->is_fatal_error(error))
1748 error_flags|= ME_FATALERROR;
1749 table->file->print_error(error, error_flags);
1750 /*
1751 If IGNORE option is used, handler errors will be downgraded
1752 to warnings and don't have to stop the iteration.
1753 */
1754 if (thd->is_error())
1755 goto before_trg_err;
1756 goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1757 }
1758
1759 if (error != HA_ERR_RECORD_IS_THE_SAME)
1760 info->stats.updated++;
1761 else
1762 error= 0;
1763 /*
1764 If ON DUP KEY UPDATE updates a row instead of inserting one, it's
1765 like a regular UPDATE statement: it should not affect the value of a
1766 next SELECT LAST_INSERT_ID() or mysql_insert_id().
1767 Except if LAST_INSERT_ID(#) was in the INSERT query, which is
1768 handled separately by THD::arg_of_last_insert_id_function.
1769 */
1770 insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1771 info->stats.copied++;
1772 }
1773
1774 // Execute the 'AFTER, ON UPDATE' trigger
1775 trg_error= (table->triggers &&
1776 table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1777 TRG_ACTION_AFTER, TRUE));
1778 goto ok_or_after_trg_err;
1779 }
1780 else /* DUP_REPLACE */
1781 {
1782 TABLE_LIST *view= table->pos_in_table_list->belong_to_view;
1783
1784 if (view && view->replace_filter)
1785 {
1786 const size_t record_length= table->s->reclength;
1787
1788 void *record0_saved= my_malloc(PSI_NOT_INSTRUMENTED, record_length,
1789 MYF(MY_WME));
1790
1791 if (!record0_saved)
1792 {
1793 error= ENOMEM;
1794 goto err;
1795 }
1796
1797 // Save the record used for comparison.
1798 memcpy(record0_saved, table->record[0], record_length);
1799
1800 // Preparing the record for comparison.
1801 memcpy(table->record[0], table->record[1], record_length);
1802
1803 // Checking if the row being conflicted is visible by the view.
1804 bool found_row_in_view= view->replace_filter->val_int();
1805
1806 // Restoring the record back.
1807 memcpy(table->record[0], record0_saved, record_length);
1808
1809 my_free(record0_saved);
1810
1811 if (!found_row_in_view)
1812 {
1813 my_error(ER_REPLACE_INACCESSIBLE_ROWS, MYF(0));
1814 goto err;
1815 }
1816 }
1817
1818 /*
1819 The manual defines the REPLACE semantics that it is either
1820 an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
1821 InnoDB do not function in the defined way if we allow MySQL
1822 to convert the latter operation internally to an UPDATE.
1823 We also should not perform this conversion if we have
1824 timestamp field with ON UPDATE which is different from DEFAULT.
1825 Another case when conversion should not be performed is when
1826 we have ON DELETE trigger on table so user may notice that
1827 we cheat here. Note that it is ok to do such conversion for
1828 tables which have ON UPDATE but have no ON DELETE triggers,
1829 we just should not expose this fact to users by invoking
1830 ON UPDATE triggers.
1831 */
1832
1833 if (last_uniq_key(table,key_nr) &&
1834 !table->file->referenced_by_foreign_key() &&
1835 (!table->triggers || !table->triggers->has_delete_triggers()))
1836 {
1837 if ((error=table->file->ha_update_row(table->record[1],
1838 table->record[0])) &&
1839 error != HA_ERR_RECORD_IS_THE_SAME)
1840 goto err;
1841 if (error != HA_ERR_RECORD_IS_THE_SAME)
1842 info->stats.deleted++;
1843 else
1844 error= 0;
1845 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1846 /*
1847 Since we pretend that we have done insert we should call
1848 its after triggers.
1849 */
1850 goto after_trg_n_copied_inc;
1851 }
1852 else
1853 {
1854 if (table->triggers &&
1855 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1856 TRG_ACTION_BEFORE, TRUE))
1857 goto before_trg_err;
1858 if ((error=table->file->ha_delete_row(table->record[1])))
1859 goto err;
1860 info->stats.deleted++;
1861 if (!table->file->has_transactions())
1862 thd->get_transaction()->mark_modified_non_trans_table(
1863 Transaction_ctx::STMT);
1864 if (table->triggers &&
1865 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1866 TRG_ACTION_AFTER, TRUE))
1867 {
1868 trg_error= 1;
1869 goto ok_or_after_trg_err;
1870 }
1871 /* Let us attempt do write_row() once more */
1872 }
1873 }
1874 }
1875
1876 /*
1877 If more than one iteration of the above while loop is done, from the second
1878 one the row being inserted will have an explicit value in the autoinc field,
1879 which was set at the first call of handler::update_auto_increment(). This
1880 value is saved to avoid thd->insert_id_for_cur_row becoming 0. Use this saved
1881 autoinc value.
1882 */
1883 if (table->file->insert_id_for_cur_row == 0)
1884 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1885
1886 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1887 /*
1888 Restore column maps if they where replaced during an duplicate key
1889 problem.
1890 */
1891 if (table->read_set != save_read_set ||
1892 table->write_set != save_write_set)
1893 table->column_bitmaps_set(save_read_set, save_write_set);
1894 }
1895 else if ((error=table->file->ha_write_row(table->record[0])))
1896 {
1897 DEBUG_SYNC(thd, "write_row_noreplace");
1898 info->last_errno= error;
1899 myf error_flags= MYF(0);
1900 if (table->file->is_fatal_error(error))
1901 error_flags|= ME_FATALERROR;
1902 table->file->print_error(error, error_flags);
1903 /*
1904 If IGNORE option is used, handler errors will be downgraded
1905 to warnings and don't have to stop the iteration.
1906 */
1907 if (thd->is_error())
1908 goto before_trg_err;
1909 table->file->restore_auto_increment(prev_insert_id);
1910 goto ok_or_after_trg_err;
1911 }
1912
1913 after_trg_n_copied_inc:
1914 info->stats.copied++;
1915 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1916 trg_error= (table->triggers &&
1917 table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
1918 TRG_ACTION_AFTER, TRUE));
1919
1920 ok_or_after_trg_err:
1921 if (key)
1922 my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1923 if (!table->file->has_transactions())
1924 thd->get_transaction()->mark_modified_non_trans_table(
1925 Transaction_ctx::STMT);
1926 free_root(&mem_root, MYF(0));
1927 DBUG_RETURN(trg_error);
1928
1929 err:
1930 {
1931 myf error_flags= MYF(0); /**< Flag for fatal errors */
1932 info->last_errno= error;
1933 assert(thd->lex->current_select() != NULL);
1934 if (table->file->is_fatal_error(error))
1935 error_flags|= ME_FATALERROR;
1936
1937 table->file->print_error(error, error_flags);
1938 }
1939
1940 before_trg_err:
1941 table->file->restore_auto_increment(prev_insert_id);
1942 if (key)
1943 my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1944 table->column_bitmaps_set(save_read_set, save_write_set);
1945 free_root(&mem_root, MYF(0));
1946 DBUG_RETURN(1);
1947 }
1948
1949
1950 /******************************************************************************
1951 Check that all fields with arn't null_fields are used
1952 ******************************************************************************/
1953
check_that_all_fields_are_given_values(THD * thd,TABLE * entry,TABLE_LIST * table_list)1954 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
1955 TABLE_LIST *table_list)
1956 {
1957 int err= 0;
1958 MY_BITMAP *write_set= entry->fields_set_during_insert;
1959
1960 for (Field **field=entry->field ; *field ; field++)
1961 {
1962 if (!bitmap_is_set(write_set, (*field)->field_index) &&
1963 ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1964 ((*field)->real_type() != MYSQL_TYPE_ENUM))
1965 {
1966 bool view= false;
1967 if (table_list)
1968 {
1969 table_list= table_list->top_table();
1970 view= table_list->is_view();
1971 }
1972 if (view)
1973 (*field)->set_warning(Sql_condition::SL_WARNING,
1974 ER_NO_DEFAULT_FOR_VIEW_FIELD, 1,
1975 table_list->view_db.str,
1976 table_list->view_name.str);
1977 else
1978 (*field)->set_warning(Sql_condition::SL_WARNING,
1979 ER_NO_DEFAULT_FOR_FIELD, 1);
1980 err= 1;
1981 }
1982 }
1983 bitmap_clear_all(write_set);
1984 return (!thd->lex->is_ignore() && thd->is_strict_mode()) ? err : 0;
1985 }
1986
1987
1988 /***************************************************************************
1989 Store records in INSERT ... SELECT *
1990 ***************************************************************************/
1991
1992
1993 /*
1994 make insert specific preparation and checks after opening tables
1995
1996 SYNOPSIS
1997 mysql_insert_select_prepare()
1998 thd thread handler
1999
2000 RETURN
2001 FALSE OK
2002 TRUE Error
2003 */
2004
mysql_insert_select_prepare(THD * thd)2005 bool Sql_cmd_insert_select::mysql_insert_select_prepare(THD *thd)
2006 {
2007 LEX *lex= thd->lex;
2008 SELECT_LEX *select_lex= lex->select_lex;
2009 DBUG_ENTER("mysql_insert_select_prepare");
2010
2011 /*
2012 SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
2013 clause if table is VIEW
2014 */
2015 if (mysql_prepare_insert(thd, lex->query_tables, NULL, true))
2016 DBUG_RETURN(true);
2017
2018 /*
2019 exclude first table from leaf tables list, because it belong to
2020 INSERT
2021 */
2022 assert(select_lex->leaf_tables != NULL);
2023 assert(lex->insert_table == select_lex->leaf_tables->top_table());
2024
2025 select_lex->leaf_tables= lex->insert_table->next_local;
2026 if (select_lex->leaf_tables != NULL)
2027 select_lex->leaf_tables= select_lex->leaf_tables->first_leaf_table();
2028 select_lex->leaf_table_count-=
2029 lex->insert_table->is_view() ? lex->insert_table->leaf_tables_count() : 1;
2030 DBUG_RETURN(false);
2031 }
2032
2033
prepare(List<Item> & values,SELECT_LEX_UNIT * u)2034 int Query_result_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2035 {
2036 DBUG_ENTER("Query_result_insert::prepare");
2037
2038 LEX *const lex= thd->lex;
2039 bool res;
2040 SELECT_LEX *const lex_current_select_save= lex->current_select();
2041 const enum_duplicates duplicate_handling= info.get_duplicate_handling();
2042
2043 unit= u;
2044
2045 /*
2046 Since table in which we are going to insert is added to the first
2047 select, LEX::current_select() should point to the first select while
2048 we are fixing fields from insert list.
2049 */
2050 lex->set_current_select(lex->select_lex);
2051
2052 res= check_insert_fields(thd, table_list, *fields, values.elements, true,
2053 !insert_into_view);
2054 if (!res)
2055 res= setup_fields(thd, Ref_ptr_array(), values, SELECT_ACL, NULL,
2056 false, false);
2057
2058 if (!res && lex->insert_table_leaf->table->has_gcol())
2059 res= validate_gc_assignment(thd, fields, &values,
2060 lex->insert_table_leaf->table);
2061
2062 if (duplicate_handling == DUP_UPDATE && !res)
2063 {
2064 Name_resolution_context *const context= &lex->select_lex->context;
2065 Name_resolution_context_state ctx_state;
2066
2067 /* Save the state of the current name resolution context. */
2068 ctx_state.save_state(context, table_list);
2069
2070 /* Perform name resolution only in the first table - 'table_list'. */
2071 table_list->next_local= NULL;
2072 context->resolve_in_table_list_only(table_list);
2073
2074 #ifndef NO_EMBEDDED_ACCESS_CHECKS
2075 table_list->set_want_privilege(UPDATE_ACL);
2076 #endif
2077 if (!res)
2078 res= setup_fields(thd, Ref_ptr_array(), *update.get_changed_columns(),
2079 UPDATE_ACL, NULL, false, true);
2080
2081 if (!res && lex->insert_table_leaf->table->has_gcol())
2082 res= validate_gc_assignment(thd, update.get_changed_columns(),
2083 update.update_values,
2084 lex->insert_table_leaf->table);
2085 /*
2086 When we are not using GROUP BY and there are no ungrouped aggregate
2087 functions
2088 we can refer to other tables in the ON DUPLICATE KEY part.
2089 We use next_name_resolution_table destructively, so check it first
2090 (views?).
2091 */
2092 assert (!table_list->next_name_resolution_table);
2093 if (lex->select_lex->group_list.elements == 0 &&
2094 !lex->select_lex->with_sum_func)
2095 {
2096 /*
2097 We must make a single context out of the two separate name resolution
2098 contexts:
2099 the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
2100 To do that we must concatenate the two lists
2101 */
2102 table_list->next_name_resolution_table=
2103 ctx_state.get_first_name_resolution_table();
2104 }
2105 lex->in_update_value_clause= true;
2106 if (!res)
2107 res= setup_fields(thd, Ref_ptr_array(), *update.update_values,
2108 SELECT_ACL, NULL, false, false);
2109 lex->in_update_value_clause= false;
2110 if (!res)
2111 {
2112 /*
2113 Traverse the update values list and substitute fields from the
2114 select for references (Item_ref objects) to them. This is done in
2115 order to get correct values from those fields when the select
2116 employs a temporary table.
2117 */
2118 List_iterator<Item> li(*update.update_values);
2119 Item *item;
2120
2121 while ((item= li++))
2122 {
2123 item->transform(&Item::update_value_transformer,
2124 (uchar*)lex->current_select());
2125 }
2126 }
2127
2128 /* Restore the current context. */
2129 ctx_state.restore_state(context, table_list);
2130 }
2131
2132 lex->set_current_select(lex_current_select_save);
2133 if (res)
2134 DBUG_RETURN(1);
2135 /*
2136 if it is INSERT into join view then check_insert_fields already found
2137 real table for insert
2138 */
2139 table= lex->insert_table_leaf->table;
2140
2141 if (duplicate_handling == DUP_UPDATE || duplicate_handling == DUP_REPLACE)
2142 prepare_for_positional_update(table, table_list);
2143
2144 if (info.add_function_default_columns(table, table->write_set))
2145 DBUG_RETURN(1);
2146 if ((duplicate_handling == DUP_UPDATE) &&
2147 update.add_function_default_columns(table, table->write_set))
2148 DBUG_RETURN(1);
2149
2150 /*
2151 Is table which we are changing used somewhere in other parts of
2152 query
2153 */
2154 if (unique_table(thd, lex->insert_table_leaf, table_list->next_global, 0))
2155 {
2156 // Using same table for INSERT and SELECT
2157 /*
2158 @todo: Use add_base_options instead of add_active_options, and only
2159 if first_execution is true; but this can be implemented only when this
2160 function is called before first_execution is set to true.
2161 if (lex->current_select()->first_execution)
2162 lex->current_select()->add_base_options(OPTION_BUFFER_RESULT);
2163 */
2164 lex->current_select()->add_active_options(OPTION_BUFFER_RESULT);
2165 }
2166 restore_record(table,s->default_values); // Get empty record
2167 table->next_number_field=table->found_next_number_field;
2168
2169 #ifdef HAVE_REPLICATION
2170 if (thd->slave_thread)
2171 {
2172 /* Get SQL thread's rli, even for a slave worker thread */
2173 Relay_log_info *c_rli= thd->rli_slave->get_c_rli();
2174 assert(c_rli != NULL);
2175 if (duplicate_handling == DUP_UPDATE &&
2176 table->next_number_field != NULL &&
2177 rpl_master_has_bug(c_rli, 24432, TRUE, NULL, NULL))
2178 DBUG_RETURN(1);
2179 }
2180 #endif
2181
2182 thd->cuted_fields=0;
2183 if (thd->lex->is_ignore() || duplicate_handling != DUP_ERROR)
2184 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2185 if (duplicate_handling == DUP_REPLACE &&
2186 (!table->triggers || !table->triggers->has_delete_triggers()))
2187 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2188 if (duplicate_handling == DUP_UPDATE)
2189 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2190
2191 /* Decide the logging format prior to preparing table/record metadata */
2192 res= res || thd->decide_logging_format(table_list);
2193 if (!res)
2194 {
2195 prepare_triggers_for_insert_stmt(table);
2196 }
2197
2198 for (Field** next_field= table->field; *next_field; ++next_field)
2199 {
2200 (*next_field)->reset_warnings();
2201 (*next_field)->reset_tmp_null();
2202 }
2203
2204 DBUG_RETURN(res ? -1 : 0);
2205 }
2206
2207
2208 /*
2209 Finish the preparation of the result table.
2210
2211 SYNOPSIS
2212 Query_result_insert::prepare2()
2213 void
2214
2215 DESCRIPTION
2216 If the result table is the same as one of the source tables (INSERT SELECT),
2217 the result table is not finally prepared at the join prepair phase.
2218 Do the final preparation now.
2219
2220 RETURN
2221 0 OK
2222 */
2223
prepare2()2224 int Query_result_insert::prepare2()
2225 {
2226 DBUG_ENTER("Query_result_insert::prepare2");
2227 if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
2228 !thd->lex->describe)
2229 {
2230 assert(!bulk_insert_started);
2231 // TODO: Is there no better estimation than 0 == Unknown number of rows?
2232 table->file->ha_start_bulk_insert((ha_rows) 0);
2233 bulk_insert_started= true;
2234 }
2235 DBUG_RETURN(0);
2236 }
2237
2238
cleanup()2239 void Query_result_insert::cleanup()
2240 {
2241 /*
2242 Query_result_insert/Query_result_create are never re-used
2243 in prepared statement
2244 */
2245 assert(0);
2246 }
2247
2248
~Query_result_insert()2249 Query_result_insert::~Query_result_insert()
2250 {
2251 DBUG_ENTER("~Query_result_insert");
2252 if (table)
2253 {
2254 table->next_number_field=0;
2255 table->auto_increment_field_not_null= FALSE;
2256 table->file->ha_reset();
2257 }
2258 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
2259 DBUG_VOID_RETURN;
2260 }
2261
2262
send_data(List<Item> & values)2263 bool Query_result_insert::send_data(List<Item> &values)
2264 {
2265 DBUG_ENTER("Query_result_insert::send_data");
2266 bool error=0;
2267
2268 if (unit->offset_limit_cnt)
2269 { // using limit offset,count
2270 unit->offset_limit_cnt--;
2271 DBUG_RETURN(0);
2272 }
2273
2274 thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
2275 store_values(values);
2276 thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
2277 if (thd->is_error())
2278 {
2279 table->auto_increment_field_not_null= FALSE;
2280 DBUG_RETURN(1);
2281 }
2282 if (table_list) // Not CREATE ... SELECT
2283 {
2284 switch (table_list->view_check_option(thd)) {
2285 case VIEW_CHECK_SKIP:
2286 DBUG_RETURN(0);
2287 case VIEW_CHECK_ERROR:
2288 DBUG_RETURN(1);
2289 }
2290 }
2291
2292 // Release latches in case bulk insert takes a long time
2293 ha_release_temporary_latches(thd);
2294
2295 error= write_record(thd, table, &info, &update);
2296 table->auto_increment_field_not_null= FALSE;
2297
2298 DEBUG_SYNC(thd, "create_select_after_write_rows_event");
2299
2300 if (!error &&
2301 (table->triggers || info.get_duplicate_handling() == DUP_UPDATE))
2302 {
2303 /*
2304 Restore fields of the record since it is possible that they were
2305 changed by ON DUPLICATE KEY UPDATE clause.
2306 If triggers exist then whey can modify some fields which were not
2307 originally touched by INSERT ... SELECT, so we have to restore
2308 their original values for the next row.
2309 */
2310 restore_record(table, s->default_values);
2311 }
2312 if (!error && table->next_number_field)
2313 {
2314 /*
2315 If no value has been autogenerated so far, we need to remember the
2316 value we just saw, we may need to send it to client in the end.
2317 */
2318 if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
2319 autoinc_value_of_last_inserted_row= table->next_number_field->val_int();
2320 /*
2321 Clear auto-increment field for the next record, if triggers are used
2322 we will clear it twice, but this should be cheap.
2323 */
2324 table->next_number_field->reset();
2325 }
2326
2327 DBUG_RETURN(error);
2328 }
2329
2330
store_values(List<Item> & values)2331 void Query_result_insert::store_values(List<Item> &values)
2332 {
2333 if (fields->elements)
2334 {
2335 restore_record(table, s->default_values);
2336 if (!validate_default_values_of_unset_fields(thd, table))
2337 fill_record_n_invoke_before_triggers(thd, &info, *fields, values,
2338 table, TRG_EVENT_INSERT,
2339 table->s->fields);
2340 }
2341 else
2342 fill_record_n_invoke_before_triggers(thd, table->field, values,
2343 table, TRG_EVENT_INSERT,
2344 table->s->fields);
2345
2346 check_that_all_fields_are_given_values(thd, table, table_list);
2347 }
2348
send_error(uint errcode,const char * err)2349 void Query_result_insert::send_error(uint errcode,const char *err)
2350 {
2351 DBUG_ENTER("Query_result_insert::send_error");
2352
2353 my_message(errcode, err, MYF(0));
2354
2355 DBUG_VOID_RETURN;
2356 }
2357
2358
send_eof()2359 bool Query_result_insert::send_eof()
2360 {
2361 int error;
2362 bool const trans_table= table->file->has_transactions();
2363 ulonglong id, row_count;
2364 bool changed;
2365 THD::killed_state killed_status= thd->killed;
2366 DBUG_ENTER("Query_result_insert::send_eof");
2367 DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
2368 trans_table, table->file->table_type()));
2369
2370 error= (bulk_insert_started ?
2371 table->file->ha_end_bulk_insert() : 0);
2372 if (!error && thd->is_error())
2373 error= thd->get_stmt_da()->mysql_errno();
2374
2375 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2376 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2377
2378 changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
2379 if (changed)
2380 {
2381 /*
2382 We must invalidate the table in the query cache before binlog writing
2383 and ha_autocommit_or_rollback.
2384 */
2385 query_cache.invalidate(thd, table, TRUE);
2386 }
2387
2388 assert(trans_table || !changed ||
2389 thd->get_transaction()->cannot_safely_rollback(
2390 Transaction_ctx::STMT));
2391
2392 /*
2393 Write to binlog before commiting transaction. No statement will
2394 be written by the binlog_query() below in RBR mode. All the
2395 events are in the transaction cache and will be written when
2396 ha_autocommit_or_rollback() is issued below.
2397 */
2398 if (mysql_bin_log.is_open() &&
2399 (!error || thd->get_transaction()->cannot_safely_rollback(
2400 Transaction_ctx::STMT)))
2401 {
2402 int errcode= 0;
2403 if (!error)
2404 thd->clear_error();
2405 else
2406 errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);
2407 if (thd->binlog_query(THD::ROW_QUERY_TYPE,
2408 thd->query().str, thd->query().length,
2409 trans_table, false, false, errcode))
2410 {
2411 table->file->ha_release_auto_increment();
2412 DBUG_RETURN(1);
2413 }
2414 }
2415 table->file->ha_release_auto_increment();
2416
2417 if (error)
2418 {
2419 myf error_flags= MYF(0);
2420 if (table->file->is_fatal_error(my_errno()))
2421 error_flags|= ME_FATALERROR;
2422
2423 table->file->print_error(my_errno(), error_flags);
2424 DBUG_RETURN(1);
2425 }
2426
2427 /*
2428 For the strict_mode call of push_warning above results to set
2429 error in Diagnostic_area. Therefore it is necessary to check whether
2430 the error was set and leave method if it is true. If we didn't do
2431 so we would failed later when my_ok is called.
2432 */
2433 if (thd->get_stmt_da()->is_error())
2434 DBUG_RETURN(true);
2435
2436 char buff[160];
2437 if (thd->lex->is_ignore())
2438 my_snprintf(buff, sizeof(buff),
2439 ER(ER_INSERT_INFO), (long) info.stats.records,
2440 (long) (info.stats.records - info.stats.copied),
2441 (long) thd->get_stmt_da()->current_statement_cond_count());
2442 else
2443 my_snprintf(buff, sizeof(buff),
2444 ER(ER_INSERT_INFO), (long) info.stats.records,
2445 (long) (info.stats.deleted+info.stats.updated),
2446 (long) thd->get_stmt_da()->current_statement_cond_count());
2447 row_count= info.stats.copied + info.stats.deleted +
2448 (thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
2449 info.stats.touched : info.stats.updated);
2450 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
2451 thd->first_successful_insert_id_in_cur_stmt :
2452 (thd->arg_of_last_insert_id_function ?
2453 thd->first_successful_insert_id_in_prev_stmt :
2454 (info.stats.copied ? autoinc_value_of_last_inserted_row : 0));
2455 my_ok(thd, row_count, id, buff);
2456 DBUG_RETURN(0);
2457 }
2458
2459
abort_result_set()2460 void Query_result_insert::abort_result_set()
2461 {
2462 DBUG_ENTER("Query_result_insert::abort_result_set");
2463 /*
2464 If the creation of the table failed (due to a syntax error, for
2465 example), no table will have been opened and therefore 'table'
2466 will be NULL. In that case, we still need to execute the rollback
2467 and the end of the function.
2468 */
2469 if (table)
2470 {
2471 bool changed, transactional_table;
2472 /*
2473 Try to end the bulk insert which might have been started before.
2474 We don't need to do this if we are in prelocked mode (since we
2475 don't use bulk insert in this case). Also we should not do this
2476 if tables are not locked yet (bulk insert is not started yet
2477 in this case).
2478 */
2479 if (bulk_insert_started)
2480 table->file->ha_end_bulk_insert();
2481
2482 /*
2483 If at least one row has been inserted/modified and will stay in
2484 the table (the table doesn't have transactions) we must write to
2485 the binlog (and the error code will make the slave stop).
2486
2487 For many errors (example: we got a duplicate key error while
2488 inserting into a MyISAM table), no row will be added to the table,
2489 so passing the error to the slave will not help since there will
2490 be an error code mismatch (the inserts will succeed on the slave
2491 with no error).
2492
2493 If table creation failed, the number of rows modified will also be
2494 zero, so no check for that is made.
2495 */
2496 changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
2497 transactional_table= table->file->has_transactions();
2498 if (thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::STMT))
2499 {
2500 if (mysql_bin_log.is_open())
2501 {
2502 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
2503 /* error of writing binary log is ignored */
2504 (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query().str,
2505 thd->query().length,
2506 transactional_table, FALSE, FALSE, errcode);
2507 }
2508 if (changed)
2509 query_cache.invalidate(thd, table, TRUE);
2510 }
2511 assert(transactional_table || !changed ||
2512 thd->get_transaction()->cannot_safely_rollback(
2513 Transaction_ctx::STMT));
2514 table->file->ha_release_auto_increment();
2515 }
2516
2517 DBUG_VOID_RETURN;
2518 }
2519
2520
2521 /***************************************************************************
2522 CREATE TABLE (SELECT) ...
2523 ***************************************************************************/
2524
2525 /**
2526 Create table from lists of fields and items (or just return TABLE
2527 object for pre-opened existing table). Used by CREATE SELECT.
2528
2529 Let "source table" be the table in the SELECT part.
2530
2531 Let "source table columns" be the set of columns in the SELECT list.
2532
2533 An interesting peculiarity in the syntax CREATE TABLE (<columns>) SELECT is
2534 that function defaults are stripped from the the source table columns, but
2535 not from the additional columns defined in the CREATE TABLE part. The first
2536 @c TIMESTAMP column there is also subject to promotion to @c TIMESTAMP @c
2537 DEFAULT @c CURRENT_TIMESTAMP @c ON @c UPDATE @c CURRENT_TIMESTAMP, as usual.
2538
2539
2540 @param thd [in] Thread object
2541 @param create_info [in] Create information (like MAX_ROWS, ENGINE or
2542 temporary table flag)
2543 @param create_table [in] Pointer to TABLE_LIST object providing database
2544 and name for table to be created or to be open
2545 @param alter_info [in/out] Initial list of columns and indexes for the
2546 table to be created
2547 @param items [in] The source table columns. Corresponding column
2548 definitions (Create_field's) will be added to
2549 the end of alter_info->create_list.
2550 @param lock [out] Pointer to the MYSQL_LOCK object for table
2551 created will be returned in this parameter.
2552 Since this table is not included in THD::lock
2553 caller is responsible for explicitly unlocking
2554 this table.
2555 @param hooks [in] Hooks to be invoked before and after obtaining
2556 table lock on the table being created.
2557
2558 @note
2559 This function assumes that either table exists and was pre-opened and
2560 locked at open_and_lock_tables() stage (and in this case we just emit
2561 error or warning and return pre-opened TABLE object) or an exclusive
2562 metadata lock was acquired on table so we can safely create, open and
2563 lock table in it (we don't acquire metadata lock if this create is
2564 for temporary table).
2565
2566 @note
2567 Since this function contains some logic specific to CREATE TABLE ...
2568 SELECT it should be changed before it can be used in other contexts.
2569
2570 @retval non-zero Pointer to TABLE object for table created or opened
2571 @retval 0 Error
2572 */
2573
create_table_from_items(THD * thd,HA_CREATE_INFO * create_info,TABLE_LIST * create_table,Alter_info * alter_info,List<Item> * items)2574 static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
2575 TABLE_LIST *create_table,
2576 Alter_info *alter_info,
2577 List<Item> *items)
2578 {
2579 TABLE tmp_table; // Used during 'Create_field()'
2580 TABLE_SHARE share;
2581 TABLE *table= 0;
2582 uint select_field_count= items->elements;
2583 /* Add selected items to field list */
2584 List_iterator_fast<Item> it(*items);
2585 Item *item;
2586
2587 DBUG_ENTER("create_table_from_items");
2588
2589 tmp_table.alias= 0;
2590 tmp_table.s= &share;
2591 init_tmp_table_share(thd, &share, "", 0, "", "");
2592
2593 tmp_table.s->db_create_options=0;
2594 tmp_table.s->db_low_byte_first=
2595 MY_TEST(create_info->db_type == myisam_hton ||
2596 create_info->db_type == heap_hton);
2597 tmp_table.reset_null_row();
2598
2599 if (!thd->variables.explicit_defaults_for_timestamp)
2600 promote_first_timestamp_column(&alter_info->create_list);
2601
2602 while ((item=it++))
2603 {
2604 Field *tmp_table_field;
2605 if (item->type() == Item::FUNC_ITEM)
2606 {
2607 if (item->result_type() != STRING_RESULT)
2608 tmp_table_field= item->tmp_table_field(&tmp_table);
2609 else
2610 tmp_table_field= item->tmp_table_field_from_field_type(&tmp_table,
2611 false);
2612 }
2613 else
2614 {
2615 Field *from_field, *default_field;
2616 tmp_table_field= create_tmp_field(thd, &tmp_table, item, item->type(),
2617 NULL,
2618 &from_field, &default_field,
2619 false, false, false, false);
2620 }
2621
2622 if (!tmp_table_field)
2623 DBUG_RETURN(NULL);
2624
2625 Field *table_field;
2626
2627 switch (item->type())
2628 {
2629 /*
2630 We have to take into account both the real table's fields and
2631 pseudo-fields used in trigger's body. These fields are used
2632 to copy defaults values later inside constructor of
2633 the class Create_field.
2634 */
2635 case Item::FIELD_ITEM:
2636 case Item::TRIGGER_FIELD_ITEM:
2637 table_field= ((Item_field *) item)->field;
2638 break;
2639 default:
2640 {
2641 /*
2642 If the expression is of temporal type having date and non-nullable,
2643 a zero date is generated. If in strict mode, then zero date is
2644 invalid. For such cases no default is generated.
2645 */
2646 table_field= NULL;
2647 if (tmp_table_field->is_temporal_with_date() &&
2648 thd->is_strict_mode() && !item->maybe_null)
2649 tmp_table_field->flags|= NO_DEFAULT_VALUE_FLAG;
2650 }
2651 }
2652
2653 assert(tmp_table_field->gcol_info== NULL && tmp_table_field->stored_in_db);
2654 Create_field *cr_field= new Create_field(tmp_table_field, table_field);
2655
2656 if (!cr_field)
2657 DBUG_RETURN(NULL);
2658
2659 if (item->maybe_null)
2660 cr_field->flags &= ~NOT_NULL_FLAG;
2661 alter_info->create_list.push_back(cr_field);
2662 }
2663
2664 DEBUG_SYNC(thd,"create_table_select_before_create");
2665
2666 /*
2667 Create and lock table.
2668
2669 Note that we either creating (or opening existing) temporary table or
2670 creating base table on which name we have exclusive lock. So code below
2671 should not cause deadlocks or races.
2672
2673 We don't log the statement, it will be logged later.
2674
2675 If this is a HEAP table, the automatic DELETE FROM which is written to the
2676 binlog when a HEAP table is opened for the first time since startup, must
2677 not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
2678 don't want to delete from it) 2) it would be written before the CREATE
2679 TABLE, which is a wrong order. So we keep binary logging disabled when we
2680 open_table().
2681 */
2682 {
2683 if (!mysql_create_table_no_lock(thd, create_table->db,
2684 create_table->table_name,
2685 create_info, alter_info,
2686 select_field_count, NULL))
2687 {
2688 DEBUG_SYNC(thd,"create_table_select_before_open");
2689
2690 if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
2691 {
2692 Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
2693 /*
2694 Here we open the destination table, on which we already have
2695 an exclusive metadata lock.
2696 */
2697 if (open_table(thd, create_table, &ot_ctx))
2698 {
2699 quick_rm_table(thd, create_info->db_type, create_table->db,
2700 table_case_name(create_info, create_table->table_name),
2701 0);
2702 }
2703 else
2704 table= create_table->table;
2705 }
2706 else
2707 {
2708 if (open_temporary_table(thd, create_table))
2709 {
2710 /*
2711 This shouldn't happen as creation of temporary table should make
2712 it preparable for open. Anyway we can't drop temporary table if
2713 we are unable to fint it.
2714 */
2715 assert(0);
2716 }
2717 else
2718 {
2719 table= create_table->table;
2720 }
2721 }
2722 }
2723 if (!table) // open failed
2724 DBUG_RETURN(NULL);
2725 }
2726 DBUG_RETURN(table);
2727 }
2728
2729
2730 /**
2731 Create the new table from the selected items.
2732
2733 @param values List of items to be used as new columns
2734 @param u Select
2735
2736 @return Operation status.
2737 @retval 0 Success
2738 @retval !=0 Failure
2739 */
2740
prepare(List<Item> & values,SELECT_LEX_UNIT * u)2741 int Query_result_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2742 {
2743 DBUG_ENTER("Query_result_create::prepare");
2744
2745 unit= u;
2746 assert(create_table->table == NULL);
2747
2748 DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
2749
2750 if (!(table= create_table_from_items(thd, create_info, create_table,
2751 alter_info, &values)))
2752 /* abort() deletes table */
2753 DBUG_RETURN(-1);
2754
2755 if (table->s->fields < values.elements)
2756 {
2757 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
2758 DBUG_RETURN(-1);
2759 }
2760 /* First field to copy */
2761 field= table->field+table->s->fields - values.elements;
2762 for (Field **f= field ; *f ; f++)
2763 {
2764 if ((*f)->gcol_info)
2765 {
2766 /*
2767 Generated columns are not allowed to be given a value for CREATE TABLE ..
2768 SELECT statment.
2769 */
2770 my_error(ER_NON_DEFAULT_VALUE_FOR_GENERATED_COLUMN, MYF(0),
2771 (*f)->field_name, (*f)->table->s->table_name.str);
2772 DBUG_RETURN(true);
2773 }
2774 }
2775
2776 // Turn off function defaults for columns filled from SELECT list:
2777 const bool retval= info.ignore_last_columns(table, values.elements);
2778 DBUG_RETURN(retval);
2779 }
2780
2781
2782 /**
2783 Lock the newly created table and prepare it for insertion.
2784
2785 @return Operation status.
2786 @retval 0 Success
2787 @retval !=0 Failure
2788 */
2789
prepare2()2790 int Query_result_create::prepare2()
2791 {
2792 DBUG_ENTER("Query_result_create::prepare2");
2793 DEBUG_SYNC(thd,"create_table_select_before_lock");
2794
2795 MYSQL_LOCK *extra_lock= NULL;
2796 /*
2797 For row-based replication, the CREATE-SELECT statement is written
2798 in two pieces: the first one contain the CREATE TABLE statement
2799 necessary to create the table and the second part contain the rows
2800 that should go into the table.
2801
2802 For non-temporary tables, the start of the CREATE-SELECT
2803 implicitly commits the previous transaction, and all events
2804 forming the statement will be stored the transaction cache. At end
2805 of the statement, the entire statement is committed as a
2806 transaction, and all events are written to the binary log.
2807
2808 On the master, the table is locked for the duration of the
2809 statement, but since the CREATE part is replicated as a simple
2810 statement, there is no way to lock the table for accesses on the
2811 slave. Hence, we have to hold on to the CREATE part of the
2812 statement until the statement has finished.
2813 */
2814 class MY_HOOKS : public TABLEOP_HOOKS {
2815 public:
2816 MY_HOOKS(Query_result_create *x, TABLE_LIST *create_table_arg,
2817 TABLE_LIST *select_tables_arg)
2818 : ptr(x),
2819 create_table(create_table_arg),
2820 select_tables(select_tables_arg)
2821 {
2822 }
2823
2824 private:
2825 virtual int do_postlock(TABLE **tables, uint count)
2826 {
2827 int error;
2828 THD *thd= const_cast<THD*>(ptr->get_thd());
2829 TABLE_LIST *save_next_global= create_table->next_global;
2830
2831 create_table->next_global= select_tables;
2832
2833 error= thd->decide_logging_format(create_table);
2834
2835 create_table->next_global= save_next_global;
2836
2837 if (error)
2838 return error;
2839
2840 TABLE const *const table = *tables;
2841 create_table->table->set_binlog_drop_if_temp(
2842 !thd->is_current_stmt_binlog_disabled()
2843 && !thd->is_current_stmt_binlog_format_row());
2844
2845 if (thd->is_current_stmt_binlog_format_row() &&
2846 !table->s->tmp_table)
2847 {
2848 if (int error= ptr->binlog_show_create_table(tables, count))
2849 return error;
2850 }
2851 return 0;
2852 }
2853 Query_result_create *ptr;
2854 TABLE_LIST *create_table;
2855 TABLE_LIST *select_tables;
2856 };
2857
2858 MY_HOOKS hooks(this, create_table, select_tables);
2859
2860 table->reginfo.lock_type=TL_WRITE;
2861 hooks.prelock(&table, 1); // Call prelock hooks
2862 /*
2863 mysql_lock_tables() below should never fail with request to reopen table
2864 since it won't wait for the table lock (we have exclusive metadata lock on
2865 the table) and thus can't get aborted.
2866 */
2867 if (! (extra_lock= mysql_lock_tables(thd, &table, 1, 0)) ||
2868 hooks.postlock(&table, 1))
2869 {
2870 if (extra_lock)
2871 {
2872 mysql_unlock_tables(thd, extra_lock);
2873 extra_lock= 0;
2874 }
2875 drop_open_table(thd, table, create_table->db, create_table->table_name);
2876 table= 0;
2877 DBUG_RETURN(1);
2878 }
2879 if (extra_lock)
2880 {
2881 assert(m_plock == NULL);
2882
2883 if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
2884 m_plock= &m_lock;
2885 else
2886 m_plock= &thd->extra_lock;
2887
2888 *m_plock= extra_lock;
2889 }
2890 /* Mark all fields that are given values */
2891 for (Field **f= field ; *f ; f++)
2892 {
2893 bitmap_set_bit(table->write_set, (*f)->field_index);
2894 bitmap_set_bit(table->fields_set_during_insert, (*f)->field_index);
2895 }
2896
2897 // Set up an empty bitmap of function defaults
2898 if (info.add_function_default_columns(table, table->write_set))
2899 DBUG_RETURN(1);
2900
2901 if (info.add_function_default_columns(table,
2902 table->fields_set_during_insert))
2903 DBUG_RETURN(1);
2904
2905 table->next_number_field=table->found_next_number_field;
2906
2907 restore_record(table,s->default_values); // Get empty record
2908 thd->cuted_fields=0;
2909
2910 const enum_duplicates duplicate_handling= info.get_duplicate_handling();
2911
2912 if (thd->lex->is_ignore() || duplicate_handling != DUP_ERROR)
2913 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2914 if (duplicate_handling == DUP_REPLACE &&
2915 (!table->triggers || !table->triggers->has_delete_triggers()))
2916 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2917 if (duplicate_handling == DUP_UPDATE)
2918 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2919 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
2920 {
2921 table->file->ha_start_bulk_insert((ha_rows) 0);
2922 bulk_insert_started= true;
2923 }
2924
2925 enum_check_fields save_count_cuted_fields= thd->count_cuted_fields;
2926 thd->count_cuted_fields= CHECK_FIELD_WARN;
2927
2928 if (check_that_all_fields_are_given_values(thd, table, table_list))
2929 DBUG_RETURN(1);
2930
2931 thd->count_cuted_fields= save_count_cuted_fields;
2932
2933 table->mark_columns_needed_for_insert();
2934 table->file->extra(HA_EXTRA_WRITE_CACHE);
2935 DBUG_RETURN(0);
2936 }
2937
2938
binlog_show_create_table(TABLE ** tables,uint count)2939 int Query_result_create::binlog_show_create_table(TABLE **tables, uint count)
2940 {
2941 DBUG_ENTER("select_create::binlog_show_create_table");
2942 /*
2943 Note 1: In RBR mode, we generate a CREATE TABLE statement for the
2944 created table by calling store_create_info() (behaves as SHOW
2945 CREATE TABLE). The 'CREATE TABLE' event will be put in the
2946 binlog statement cache with an Anonymous_gtid_log_event, and
2947 any subsequent events (e.g., table-map events and rows event)
2948 will be put in the binlog transaction cache with an
2949 Anonymous_gtid_log_event. So that the 'CREATE...SELECT'
2950 statement is logged as:
2951 Anonymous_gtid_log_event
2952 CREATE TABLE event
2953 Anonymous_gtid_log_event
2954 BEGIN
2955 rows event
2956 COMMIT
2957
2958 We write the CREATE TABLE statement here and not in prepare()
2959 since there potentially are sub-selects or accesses to information
2960 schema that will do a close_thread_tables(), destroying the
2961 statement transaction cache.
2962 */
2963 assert(thd->is_current_stmt_binlog_format_row());
2964 assert(tables && *tables && count > 0);
2965
2966 char buf[2048];
2967 String query(buf, sizeof(buf), system_charset_info);
2968 int result;
2969 TABLE_LIST tmp_table_list;
2970
2971 tmp_table_list.table = *tables;
2972 query.length(0); // Have to zero it since constructor doesn't
2973
2974 result= store_create_info(thd, &tmp_table_list, &query, create_info,
2975 /* show_database */ TRUE);
2976 assert(result == 0); /* store_create_info() always return 0 */
2977
2978 if (mysql_bin_log.is_open())
2979 {
2980 DEBUG_SYNC(thd, "create_select_before_write_create_event");
2981 int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
2982 result= thd->binlog_query(THD::STMT_QUERY_TYPE,
2983 query.ptr(), query.length(),
2984 /* is_trans */ false,
2985 /* direct */ true,
2986 /* suppress_use */ FALSE,
2987 errcode);
2988 DEBUG_SYNC(thd, "create_select_after_write_create_event");
2989 }
2990 DBUG_RETURN(result);
2991 }
2992
2993
store_values(List<Item> & values)2994 void Query_result_create::store_values(List<Item> &values)
2995 {
2996 fill_record_n_invoke_before_triggers(thd, field, values,
2997 table, TRG_EVENT_INSERT,
2998 table->s->fields);
2999 }
3000
3001
send_error(uint errcode,const char * err)3002 void Query_result_create::send_error(uint errcode,const char *err)
3003 {
3004 DBUG_ENTER("Query_result_create::send_error");
3005
3006 DBUG_PRINT("info",
3007 ("Current statement %s row-based",
3008 thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT"));
3009 DBUG_PRINT("info",
3010 ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3011 (ulong) table,
3012 table && !table->s->tmp_table ? "is NOT" : "is"));
3013 /*
3014 This will execute any rollbacks that are necessary before writing
3015 the transcation cache.
3016
3017 We disable the binary log since nothing should be written to the
3018 binary log. This disabling is important, since we potentially do
3019 a "roll back" of non-transactional tables by removing the table,
3020 and the actual rollback might generate events that should not be
3021 written to the binary log.
3022
3023 */
3024 tmp_disable_binlog(thd);
3025 Query_result_insert::send_error(errcode, err);
3026 reenable_binlog(thd);
3027
3028 DBUG_VOID_RETURN;
3029 }
3030
3031
send_eof()3032 bool Query_result_create::send_eof()
3033 {
3034 /*
3035 The routine that writes the statement in the binary log
3036 is in Query_result_insert::send_eof(). For that reason, we
3037 mark the flag at this point.
3038 */
3039 if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
3040 thd->get_transaction()->mark_created_temp_table(Transaction_ctx::STMT);
3041
3042 bool tmp= Query_result_insert::send_eof();
3043 if (tmp)
3044 abort_result_set();
3045 else
3046 {
3047 /*
3048 Do an implicit commit at end of statement for non-temporary
3049 tables. This can fail, but we should unlock the table
3050 nevertheless.
3051 */
3052 if (!table->s->tmp_table)
3053 {
3054 trans_commit_stmt(thd);
3055 trans_commit_implicit(thd);
3056 }
3057
3058 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3059 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3060 if (m_plock)
3061 {
3062 mysql_unlock_tables(thd, *m_plock);
3063 *m_plock= NULL;
3064 m_plock= NULL;
3065 }
3066 }
3067 return tmp;
3068 }
3069
3070
abort_result_set()3071 void Query_result_create::abort_result_set()
3072 {
3073 DBUG_ENTER("Query_result_create::abort_result_set");
3074
3075 /*
3076 In Query_result_insert::abort_result_set() we roll back the statement, including
3077 truncating the transaction cache of the binary log. To do this, we
3078 pretend that the statement is transactional, even though it might
3079 be the case that it was not.
3080
3081 We roll back the statement prior to deleting the table and prior
3082 to releasing the lock on the table, since there might be potential
3083 for failure if the rollback is executed after the drop or after
3084 unlocking the table.
3085
3086 We also roll back the statement regardless of whether the creation
3087 of the table succeeded or not, since we need to reset the binary
3088 log state.
3089 */
3090 tmp_disable_binlog(thd);
3091 Query_result_insert::abort_result_set();
3092 thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::STMT);
3093 reenable_binlog(thd);
3094 /* possible error of writing binary log is ignored deliberately */
3095 (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
3096
3097 if (m_plock)
3098 {
3099 mysql_unlock_tables(thd, *m_plock);
3100 *m_plock= NULL;
3101 m_plock= NULL;
3102 }
3103
3104 if (table)
3105 {
3106 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3107 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3108 table->auto_increment_field_not_null= FALSE;
3109 drop_open_table(thd, table, create_table->db, create_table->table_name);
3110 table=0; // Safety
3111 }
3112 DBUG_VOID_RETURN;
3113 }
3114
3115
execute(THD * thd)3116 bool Sql_cmd_insert::execute(THD *thd)
3117 {
3118 assert(thd->lex->sql_command == SQLCOM_REPLACE ||
3119 thd->lex->sql_command == SQLCOM_INSERT);
3120
3121 bool res= false;
3122 LEX *const lex= thd->lex;
3123 SELECT_LEX *const select_lex= lex->select_lex;
3124 TABLE_LIST *const first_table= select_lex->get_table_list();
3125 TABLE_LIST *const all_tables= first_table;
3126
3127 if (open_temporary_tables(thd, all_tables))
3128 return true;
3129
3130 if (insert_precheck(thd, all_tables))
3131 return true;
3132
3133 /* Push ignore / strict error handler */
3134 Ignore_error_handler ignore_handler;
3135 Strict_error_handler strict_handler;
3136 if (thd->lex->is_ignore())
3137 thd->push_internal_handler(&ignore_handler);
3138 else if (thd->is_strict_mode())
3139 thd->push_internal_handler(&strict_handler);
3140
3141 MYSQL_INSERT_START(const_cast<char*>(thd->query().str));
3142 res= mysql_insert(thd, all_tables);
3143 MYSQL_INSERT_DONE(res, (ulong) thd->get_row_count_func());
3144
3145 /* Pop ignore / strict error handler */
3146 if (thd->lex->is_ignore() || thd->is_strict_mode())
3147 thd->pop_internal_handler();
3148
3149 /*
3150 If we have inserted into a VIEW, and the base table has
3151 AUTO_INCREMENT column, but this column is not accessible through
3152 a view, then we should restore LAST_INSERT_ID to the value it
3153 had before the statement.
3154 */
3155 if (first_table->is_view() && !first_table->contain_auto_increment)
3156 thd->first_successful_insert_id_in_cur_stmt=
3157 thd->first_successful_insert_id_in_prev_stmt;
3158
3159 DBUG_EXECUTE_IF("after_mysql_insert",
3160 {
3161 const char act[]=
3162 "now "
3163 "wait_for signal.continue";
3164 assert(opt_debug_sync_timeout > 0);
3165 assert(!debug_sync_set_action(current_thd,
3166 STRING_WITH_LEN(act)));
3167 };);
3168
3169 thd->lex->clear_values_map();
3170 return res;
3171 }
3172
3173
execute(THD * thd)3174 bool Sql_cmd_insert_select::execute(THD *thd)
3175 {
3176 assert(thd->lex->sql_command == SQLCOM_REPLACE_SELECT ||
3177 thd->lex->sql_command == SQLCOM_INSERT_SELECT);
3178
3179 bool res= false;
3180 LEX *const lex= thd->lex;
3181 SELECT_LEX *const select_lex= lex->select_lex;
3182 SELECT_LEX_UNIT *const unit= lex->unit;
3183 TABLE_LIST *const first_table= select_lex->get_table_list();
3184 TABLE_LIST *const all_tables= first_table;
3185
3186 Query_result_insert *sel_result;
3187 if (insert_precheck(thd, all_tables))
3188 return true;
3189 /*
3190 INSERT...SELECT...ON DUPLICATE KEY UPDATE/REPLACE SELECT/
3191 INSERT...IGNORE...SELECT can be unsafe, unless ORDER BY PRIMARY KEY
3192 clause is used in SELECT statement. We therefore use row based
3193 logging if mixed or row based logging is available.
3194 TODO: Check if the order of the output of the select statement is
3195 deterministic. Waiting for BUG#42415
3196 */
3197 if (lex->sql_command == SQLCOM_INSERT_SELECT &&
3198 lex->duplicates == DUP_UPDATE)
3199 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_SELECT_UPDATE);
3200
3201 if (lex->sql_command == SQLCOM_INSERT_SELECT && lex->is_ignore())
3202 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_IGNORE_SELECT);
3203
3204 if (lex->sql_command == SQLCOM_REPLACE_SELECT)
3205 lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_REPLACE_SELECT);
3206
3207 unit->set_limit(select_lex);
3208
3209 if (!(res= open_tables_for_query(thd, all_tables, 0)))
3210 {
3211 if (run_before_dml_hook(thd))
3212 return true;
3213
3214 MYSQL_INSERT_SELECT_START(const_cast<char*>(thd->query().str));
3215
3216 /* Skip first table, which is the table we are inserting in */
3217 TABLE_LIST *second_table= first_table->next_local;
3218 select_lex->table_list.first= second_table;
3219 select_lex->context.table_list=
3220 select_lex->context.first_name_resolution_table= second_table;
3221
3222 res= mysql_insert_select_prepare(thd);
3223 if (!res && (sel_result= new Query_result_insert(first_table,
3224 first_table->table,
3225 &insert_field_list,
3226 &insert_field_list,
3227 &insert_update_list,
3228 &insert_value_list,
3229 lex->duplicates)))
3230 {
3231 Ignore_error_handler ignore_handler;
3232 Strict_error_handler strict_handler;
3233 if (thd->lex->is_ignore())
3234 thd->push_internal_handler(&ignore_handler);
3235 else if (thd->is_strict_mode())
3236 thd->push_internal_handler(&strict_handler);
3237
3238 res= handle_query(thd, lex, sel_result,
3239 // Don't unlock tables until command is written
3240 // to binary log
3241 OPTION_SETUP_TABLES_DONE | SELECT_NO_UNLOCK,
3242 0);
3243
3244 if (thd->lex->is_ignore() || thd->is_strict_mode())
3245 thd->pop_internal_handler();
3246
3247 delete sel_result;
3248 }
3249 /* revert changes for SP */
3250 MYSQL_INSERT_SELECT_DONE(res, (ulong) thd->get_row_count_func());
3251 select_lex->table_list.first= first_table;
3252 }
3253 /*
3254 If we have inserted into a VIEW, and the base table has
3255 AUTO_INCREMENT column, but this column is not accessible through
3256 a view, then we should restore LAST_INSERT_ID to the value it
3257 had before the statement.
3258 */
3259 if (first_table->is_view() && !first_table->contain_auto_increment)
3260 thd->first_successful_insert_id_in_cur_stmt=
3261 thd->first_successful_insert_id_in_prev_stmt;
3262
3263 thd->lex->clear_values_map();
3264 return res;
3265 }
3266
3267
prepared_statement_test(THD * thd)3268 bool Sql_cmd_insert::prepared_statement_test(THD *thd)
3269 {
3270 LEX *lex= thd->lex;
3271 return mysql_test_insert(thd, lex->query_tables);
3272 }
3273