1 /*
2    Copyright (c) 2000, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 /* Insert of records */
26 
27 #include "sql_insert.h"
28 
29 #include "auth_common.h"              // check_grant_all_columns
30 #include "debug_sync.h"               // DEBUG_SYNC
31 #include "item.h"                     // Item
32 #include "lock.h"                     // mysql_unlock_tables
33 #include "opt_explain.h"              // Modification_plan
34 #include "opt_explain_format.h"       // enum_mod_type
35 #include "rpl_rli.h"                  // Relay_log_info
36 #include "rpl_slave.h"                // rpl_master_has_bug
37 #include "sql_base.h"                 // setup_fields
38 #include "sql_resolver.h"             // Column_privilege_tracker
39 #include "sql_select.h"               // free_underlaid_joins
40 #include "sql_show.h"                 // store_create_info
41 #include "sql_table.h"                // quick_rm_table
42 #include "sql_tmp_table.h"            // create_tmp_field
43 #include "sql_update.h"               // records_are_comparable
44 #include "sql_view.h"                 // check_key_in_view
45 #include "table_trigger_dispatcher.h" // Table_trigger_dispatcher
46 #include "transaction.h"              // trans_commit_stmt
47 #include "sql_resolver.h"             // validate_gc_assignment
48 #include "partition_info.h"           // partition_info
49 #include "probes_mysql.h"             // MYSQL_INSERT_START
50 
51 static bool check_view_insertability(THD *thd, TABLE_LIST *view,
52                                      const TABLE_LIST *insert_table_ref);
53 
54 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables);
55 
56 /**
57   Check that insert fields are from a single table of a multi-table view.
58 
59   @param fields            The insert fields to be checked.
60   @param view              The view for insert.
61   @param insert_table_ref[out] Reference to table to insert into
62 
63   This function is called to check that the fields being inserted into
64   are from a single base table. This must be checked when the table to
65   be inserted into is a multi-table view.
66 
67   @return false if success, true if an error was raised.
68 */
69 
check_single_table_insert(List<Item> & fields,TABLE_LIST * view,TABLE_LIST ** insert_table_ref)70 static bool check_single_table_insert(List<Item> &fields, TABLE_LIST *view,
71                                       TABLE_LIST **insert_table_ref)
72 {
73   // It is join view => we need to find the table for insert
74   List_iterator_fast<Item> it(fields);
75   Item *item;
76   *insert_table_ref= NULL;          // reset for call to check_single_table()
77   table_map tables= 0;
78 
79   while ((item= it++))
80     tables|= item->used_tables();
81 
82   if (view->check_single_table(insert_table_ref, tables))
83   {
84     my_error(ER_VIEW_MULTIUPDATE, MYF(0),
85              view->view_db.str, view->view_name.str);
86     return true;
87   }
88   assert(*insert_table_ref && (*insert_table_ref)->is_insertable());
89 
90   return false;
91 }
92 
93 
94 /**
95   Check insert fields.
96 
97   @param thd          The current thread.
98   @param table_list   The table for insert.
99   @param fields       The insert fields.
100   @param value_count  Number of values supplied
101   @param value_count_known if false, delay field count check
102                       @todo: Eliminate this when preparation is properly phased
103   @param check_unique If duplicate values should be rejected.
104 
105   @return false if success, true if error
106 
107   Resolved reference to base table is returned in lex->insert_table_leaf.
108 
109   @todo check_insert_fields() should be refactored as follows:
110         - Remove the argument value_count_known and all predicates involving it.
111         - Rearrange the call to check_insert_fields() from
112           mysql_prepare_insert() so that the value_count is known also when
113           processing a prepared statement.
114 */
115 
check_insert_fields(THD * thd,TABLE_LIST * table_list,List<Item> & fields,uint value_count,bool value_count_known,bool check_unique)116 static bool check_insert_fields(THD *thd, TABLE_LIST *table_list,
117                                 List<Item> &fields, uint value_count,
118                                 bool value_count_known, bool check_unique)
119 {
120   LEX *const lex= thd->lex;
121 
122 #ifndef NDEBUG
123   TABLE_LIST *const saved_insert_table_leaf= lex->insert_table_leaf;
124 #endif
125 
126   TABLE *table= table_list->table;
127 
128   assert(table_list->is_insertable());
129 
130   if (fields.elements == 0 && value_count_known && value_count > 0)
131   {
132     /*
133       No field list supplied, but a value list has been supplied.
134       Use field list of table being updated.
135     */
136     assert(table);    // This branch is not reached with a view:
137 
138     lex->insert_table_leaf= table_list;
139 
140     // Values for all fields in table are needed
141     if (value_count != table->s->fields)
142     {
143       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
144       return true;
145     }
146 #ifndef NO_EMBEDDED_ACCESS_CHECKS
147     Field_iterator_table_ref field_it;
148     field_it.set(table_list);
149     if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
150       return true;
151 #endif
152     /*
153       No fields are provided so all fields must be provided in the values.
154       Thus we set all bits in the write set.
155     */
156     bitmap_set_all(table->write_set);
157   }
158   else
159   {
160     // INSERT with explicit field list.
161     SELECT_LEX *select_lex= thd->lex->select_lex;
162     Name_resolution_context *context= &select_lex->context;
163     Name_resolution_context_state ctx_state;
164     int res;
165 
166     if (value_count_known && fields.elements != value_count)
167     {
168       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
169       return true;
170     }
171 
172     thd->dup_field= 0;
173 
174     /* Save the state of the current name resolution context. */
175     ctx_state.save_state(context, table_list);
176 
177     /*
178       Perform name resolution only in the first table - 'table_list',
179       which is the table that is inserted into.
180     */
181     table_list->next_local= NULL;
182     context->resolve_in_table_list_only(table_list);
183     res= setup_fields(thd, Ref_ptr_array(), fields, INSERT_ACL, NULL,
184                       false, true);
185 
186     /* Restore the current context. */
187     ctx_state.restore_state(context, table_list);
188 
189     if (res)
190       return true;
191 
192     if (table_list->is_merged())
193     {
194       if (check_single_table_insert(fields, table_list,
195                                     &lex->insert_table_leaf))
196         return true;
197       table= lex->insert_table_leaf->table;
198     }
199     else
200     {
201       lex->insert_table_leaf= table_list;
202     }
203 
204     if (check_unique && thd->dup_field)
205     {
206       my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), thd->dup_field->field_name);
207       return true;
208     }
209   }
210   /* Mark all generated columns for write*/
211   if (table->vfield)
212     table->mark_generated_columns(false);
213 
214   if (check_key_in_view(thd, table_list, lex->insert_table_leaf) ||
215       (table_list->is_view() &&
216        check_view_insertability(thd, table_list, lex->insert_table_leaf)))
217   {
218     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
219     return true;
220   }
221 
222   assert(saved_insert_table_leaf == NULL ||
223          lex->insert_table_leaf == saved_insert_table_leaf);
224 
225   return false;
226 }
227 
228 
229 /**
230   Check that table references are restricted to the supplied table map.
231   The check can be ignored if the supplied table is a base table.
232 
233   @param view   Table being specified
234   @param values Values whose used tables are to be matched against table map
235   @param map    Table map to match against
236 
237   @return false if success, true if error
238 */
239 
check_valid_table_refs(const TABLE_LIST * view,List<Item> & values,table_map map)240 static bool check_valid_table_refs(const TABLE_LIST *view, List<Item> &values,
241                                    table_map map)
242 {
243   List_iterator_fast<Item> it(values);
244   Item *item;
245 
246   // A base table will always match the supplied map.
247   assert(view->is_view() || (view->table && map));
248 
249   if (!view->is_view())       // Ignore check if called with base table.
250     return false;
251 
252   map|= PSEUDO_TABLE_BITS;
253 
254   while ((item= it++))
255   {
256     if (item->used_tables() & ~map)
257     {
258       my_error(ER_VIEW_MULTIUPDATE, MYF(0),
259                view->view_db.str, view->view_name.str);
260       return true;
261     }
262   }
263   return false;
264 }
265 
266 
267 /**
268   Validates default value of fields which are not specified in
269   the column list of INSERT statement.
270 
271   @Note table->record[0] should be be populated with default values
272         before calling this function.
273 
274   @param thd              thread context
275   @param table            table to which values are inserted.
276 
277   @return
278     @retval false Success.
279     @retval true  Failure.
280 */
281 
validate_default_values_of_unset_fields(THD * thd,TABLE * table)282 bool validate_default_values_of_unset_fields(THD *thd, TABLE *table)
283 {
284   MY_BITMAP *write_set= table->write_set;
285   DBUG_ENTER("validate_default_values_of_unset_fields");
286 
287   for (Field **field= table->field; *field; field++)
288   {
289     if (!bitmap_is_set(write_set, (*field)->field_index) &&
290         !((*field)->flags & NO_DEFAULT_VALUE_FLAG))
291     {
292       if ((*field)->validate_stored_val(thd) && thd->is_error())
293         DBUG_RETURN(true);
294     }
295   }
296 
297   DBUG_RETURN(false);
298 }
299 
300 
301 /*
302   Prepare triggers  for INSERT-like statement.
303 
304   SYNOPSIS
305     prepare_triggers_for_insert_stmt()
306       table   Table to which insert will happen
307 
308   NOTE
309     Prepare triggers for INSERT-like statement by marking fields
310     used by triggers and inform handlers that batching of UPDATE/DELETE
311     cannot be done if there are BEFORE UPDATE/DELETE triggers.
312 */
313 
prepare_triggers_for_insert_stmt(TABLE * table)314 void prepare_triggers_for_insert_stmt(TABLE *table)
315 {
316   if (table->triggers)
317   {
318     if (table->triggers->has_triggers(TRG_EVENT_DELETE,
319                                       TRG_ACTION_AFTER))
320     {
321       /*
322         The table has AFTER DELETE triggers that might access to
323         subject table and therefore might need delete to be done
324         immediately. So we turn-off the batching.
325       */
326       (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
327     }
328     if (table->triggers->has_triggers(TRG_EVENT_UPDATE,
329                                       TRG_ACTION_AFTER))
330     {
331       /*
332         The table has AFTER UPDATE triggers that might access to subject
333         table and therefore might need update to be done immediately.
334         So we turn-off the batching.
335       */
336       (void) table->file->extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
337     }
338   }
339   table->mark_columns_needed_for_insert();
340 }
341 
342 /**
343   Setup data for field BLOB/GEOMETRY field types for execution of
344   "INSERT...UPDATE" statement. For a expression in 'UPDATE' clause
345   like "a= VALUES(a)", let as call Field* referring 'a' as LHS_FIELD
346   and Field* referring field 'a' in "VALUES(a)" as RHS_FIELD
347 
348   This function creates a separate copy of the blob value for RHS_FIELD,
349   if the field is updated as well as accessed through VALUES()
350   function in 'UPDATE' clause of "INSERT...UPDATE" statement.
351 
352   @param [in] thd
353     Pointer to THD object.
354 
355   @param [in] fields
356     List of fields representing LHS_FIELD of all expressions
357     in 'UPDATE' clause.
358 
359   @return - Can fail only when we are out of memory.
360     @retval false   Success
361     @retval true    Failure
362 */
363 
mysql_prepare_blob_values(THD * thd,List<Item> & fields,MEM_ROOT * mem_root)364 bool mysql_prepare_blob_values(THD *thd, List<Item> &fields, MEM_ROOT *mem_root)
365 {
366   DBUG_ENTER("mysql_prepare_blob_values");
367 
368   if (fields.elements <= 1)
369     DBUG_RETURN(false);
370 
371   // Collect LHS_FIELD's which are updated in a 'set'.
372   // This 'set' helps decide if we need to make copy of BLOB value
373   // or not.
374 
375   Prealloced_array<Field_blob *, 16, true>
376     blob_update_field_set(PSI_NOT_INSTRUMENTED);
377   if (blob_update_field_set.reserve(fields.elements))
378     DBUG_RETURN(true);
379 
380   List_iterator_fast<Item> f(fields);
381   Item *fld;
382   while ((fld= f++))
383   {
384     Item_field *field= fld->field_for_view_update();
385     Field *lhs_field= field->field;
386 
387     if (lhs_field->type() == MYSQL_TYPE_BLOB ||
388         lhs_field->type() == MYSQL_TYPE_GEOMETRY)
389       blob_update_field_set.insert_unique(down_cast<Field_blob *>(lhs_field));
390   }
391 
392   // Traverse through thd->lex->insert_update_values_map
393   // and make copy of BLOB values in RHS_FIELD, if the same field is
394   // modified (present in above 'set' prepared).
395   if (thd->lex->has_values_map())
396   {
397     std::map<Field *, Field *>::iterator iter;
398     for(iter= thd->lex->begin_values_map();
399         iter != thd->lex->end_values_map();
400         ++iter)
401     {
402       // Retrieve the Field_blob pointers from the map.
403       // and initialize newly declared variables immediately.
404       Field_blob *lhs_field= down_cast<Field_blob *>(iter->first);
405       Field_blob *rhs_field= down_cast<Field_blob *>(iter->second);
406 
407       // Check if the Field_blob object is updated before making a copy.
408       if (blob_update_field_set.count_unique(lhs_field) == 0)
409         continue;
410 
411       // Copy blob value
412       if(rhs_field->copy_blob_value(mem_root))
413         DBUG_RETURN(true);
414     }
415   }
416 
417   DBUG_RETURN(false);
418 }
419 
420 /**
421   INSERT statement implementation
422 
423   @note Like implementations of other DDL/DML in MySQL, this function
424   relies on the caller to close the thread tables. This is done in the
425   end of dispatch_command().
426 */
427 
mysql_insert(THD * thd,TABLE_LIST * table_list)428 bool Sql_cmd_insert::mysql_insert(THD *thd,TABLE_LIST *table_list)
429 {
430   DBUG_ENTER("mysql_insert");
431 
432   LEX *const lex= thd->lex;
433   int error, res;
434   bool err= true;
435   bool transactional_table, joins_freed= FALSE;
436   bool changed;
437   bool is_locked= false;
438   ulong counter= 0;
439   ulonglong id;
440   /*
441     We have three alternative syntax rules for the INSERT statement:
442     1) "INSERT (columns) VALUES ...", so non-listed columns need a default
443     2) "INSERT VALUES (), ..." so all columns need a default;
444     note that "VALUES (),(expr_1, ..., expr_n)" is not allowed, so checking
445     emptiness of the first row is enough
446     3) "INSERT VALUES (expr_1, ...), ..." so no defaults are needed; even if
447     expr_i is "DEFAULT" (in which case the column is set by
448     Item_default_value::save_in_field_inner()).
449   */
450   const bool manage_defaults=
451     insert_field_list.elements != 0 ||          // 1)
452     insert_many_values.head()->elements == 0;   // 2)
453   COPY_INFO info(COPY_INFO::INSERT_OPERATION,
454                  &insert_field_list,
455                  manage_defaults,
456                  duplicates);
457   COPY_INFO update(COPY_INFO::UPDATE_OPERATION, &insert_update_list,
458                    &insert_value_list);
459   Name_resolution_context *context;
460   Name_resolution_context_state ctx_state;
461   uint num_partitions= 0;
462   enum partition_info::enum_can_prune can_prune_partitions=
463                                                   partition_info::PRUNE_NO;
464   MY_BITMAP used_partitions;
465   bool prune_needs_default_values;
466 
467   SELECT_LEX *const select_lex= lex->select_lex;
468 
469   select_lex->make_active_options(0, 0);
470 
471   if (open_tables_for_query(thd, table_list, 0))
472     DBUG_RETURN(true);
473 
474   if (run_before_dml_hook(thd))
475     DBUG_RETURN(true);
476 
477   THD_STAGE_INFO(thd, stage_init);
478   lex->used_tables=0;
479 
480   List_iterator_fast<List_item> its(insert_many_values);
481   List_item *values= its++;
482   const uint value_count= values->elements;
483   TABLE      *insert_table= NULL;
484   if (mysql_prepare_insert(thd, table_list, values, false))
485     goto exit_without_my_ok;
486 
487   insert_table= lex->insert_table_leaf->table;
488 
489   if (duplicates == DUP_UPDATE || duplicates == DUP_REPLACE)
490     prepare_for_positional_update(insert_table, table_list);
491 
492   /* Must be done before can_prune_insert, due to internal initialization. */
493   if (info.add_function_default_columns(insert_table, insert_table->write_set))
494     goto exit_without_my_ok; /* purecov: inspected */
495   if (duplicates == DUP_UPDATE &&
496       update.add_function_default_columns(insert_table,
497                                           insert_table->write_set))
498     goto exit_without_my_ok; /* purecov: inspected */
499 
500   context= &select_lex->context;
501   /*
502     These three asserts test the hypothesis that the resetting of the name
503     resolution context below is not necessary at all since the list of local
504     tables for INSERT always consists of one table.
505   */
506   assert(!table_list->next_local);
507   assert(!context->table_list->next_local);
508   assert(!context->first_name_resolution_table->next_name_resolution_table);
509 
510   /* Save the state of the current name resolution context. */
511   ctx_state.save_state(context, table_list);
512 
513   /*
514     Perform name resolution only in the first table - 'table_list',
515     which is the table that is inserted into.
516   */
517   assert(table_list->next_local == 0);
518   context->resolve_in_table_list_only(table_list);
519 
520   if (!is_locked && insert_table->part_info)
521   {
522     if (insert_table->part_info->can_prune_insert(thd,
523                                            duplicates,
524                                            update,
525                                            insert_update_list,
526                                            insert_field_list,
527                                            !MY_TEST(values->elements),
528                                            &can_prune_partitions,
529                                            &prune_needs_default_values,
530                                            &used_partitions))
531       goto exit_without_my_ok; /* purecov: inspected */
532 
533     if (can_prune_partitions != partition_info::PRUNE_NO)
534     {
535       num_partitions= insert_table->part_info->lock_partitions.n_bits;
536       /*
537         Pruning probably possible, all partitions is unmarked for read/lock,
538         and we must now add them on row by row basis.
539 
540         Check the first INSERT value.
541         Do not fail here, since that would break MyISAM behavior of inserting
542         all rows before the failing row.
543 
544         PRUNE_DEFAULTS means the partitioning fields are only set to DEFAULT
545         values, so we only need to check the first INSERT value, since all the
546         rest will be in the same partition.
547       */
548       if (insert_table->part_info->set_used_partition(insert_field_list,
549                                                *values,
550                                                info,
551                                                prune_needs_default_values,
552                                                &used_partitions))
553         can_prune_partitions= partition_info::PRUNE_NO;
554     }
555   }
556 
557   its.rewind();
558   while ((values= its++))
559   {
560     counter++;
561     /*
562       To make it possible to increase concurrency on table level locking
563       engines such as MyISAM, we check pruning for each row until we will use
564       all partitions, Even if the number of rows is much higher than the
565       number of partitions.
566       TODO: Cache the calculated part_id and reuse in
567       ha_partition::write_row() if possible.
568     */
569     if (can_prune_partitions == partition_info::PRUNE_YES)
570     {
571       if (insert_table->part_info->set_used_partition(insert_field_list,
572                                                *values,
573                                                info,
574                                                prune_needs_default_values,
575                                                &used_partitions))
576         can_prune_partitions= partition_info::PRUNE_NO;
577       if (!(counter % num_partitions))
578       {
579         /*
580           Check if we using all partitions in table after adding partition
581           for current row to the set of used partitions. Do it only from
582           time to time to avoid overhead from bitmap_is_set_all() call.
583         */
584         if (bitmap_is_set_all(&used_partitions))
585           can_prune_partitions= partition_info::PRUNE_NO;
586       }
587     }
588   }
589   insert_table->auto_increment_field_not_null= false;
590   its.rewind ();
591 
592   /* Restore the current context. */
593   ctx_state.restore_state(context, table_list);
594 
595   { // Statement plan is available within these braces
596   Modification_plan plan(thd,
597                          (lex->sql_command == SQLCOM_INSERT) ?
598                          MT_INSERT : MT_REPLACE, insert_table,
599                          NULL, false, 0);
600   DEBUG_SYNC(thd, "planned_single_insert");
601 
602   if (can_prune_partitions != partition_info::PRUNE_NO)
603   {
604     /*
605       Only lock the partitions we will insert into.
606       And also only read from those partitions (duplicates etc.).
607       If explicit partition selection 'INSERT INTO t PARTITION (p1)' is used,
608       the new set of read/lock partitions is the intersection of read/lock
609       partitions and used partitions, i.e only the partitions that exists in
610       both sets will be marked for read/lock.
611       It is also safe for REPLACE, since all potentially conflicting records
612       always belong to the same partition as the one which we try to
613       insert a row. This is because ALL unique/primary keys must
614       include ALL partitioning columns.
615     */
616     bitmap_intersect(&insert_table->part_info->read_partitions,
617                      &used_partitions);
618     bitmap_intersect(&insert_table->part_info->lock_partitions,
619                      &used_partitions);
620   }
621 
622   // Lock the tables now if not locked already.
623   if (!is_locked &&
624       lock_tables(thd, table_list, lex->table_count, 0))
625     DBUG_RETURN(true);
626 
627   if (lex->describe)
628   {
629     err= explain_single_table_modification(thd, &plan, select_lex);
630     goto exit_without_my_ok;
631   }
632 
633   /*
634     Count warnings for all inserts.
635     For single line insert, generate an error if try to set a NOT NULL field
636     to NULL.
637   */
638   thd->count_cuted_fields= ((insert_many_values.elements == 1 &&
639                              !lex->is_ignore()) ?
640                             CHECK_FIELD_ERROR_FOR_NULL :
641                             CHECK_FIELD_WARN);
642   thd->cuted_fields = 0L;
643   insert_table->next_number_field= insert_table->found_next_number_field;
644 
645 #ifdef HAVE_REPLICATION
646     if (thd->slave_thread)
647     {
648       /* Get SQL thread's rli, even for a slave worker thread */
649       Relay_log_info* c_rli= thd->rli_slave->get_c_rli();
650       assert(c_rli != NULL);
651       if(info.get_duplicate_handling() == DUP_UPDATE &&
652          insert_table->next_number_field != NULL &&
653          rpl_master_has_bug(c_rli, 24432, TRUE, NULL, NULL))
654         goto exit_without_my_ok;
655     }
656 #endif
657 
658   error=0;
659   THD_STAGE_INFO(thd, stage_update);
660   if (duplicates == DUP_REPLACE &&
661       (!insert_table->triggers ||
662        !insert_table->triggers->has_delete_triggers()))
663     insert_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
664   if (duplicates == DUP_UPDATE)
665     insert_table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
666   /*
667     let's *try* to start bulk inserts. It won't necessary
668     start them as insert_many_values.elements should be greater than
669     some - handler dependent - threshold.
670     We should not start bulk inserts if this statement uses
671     functions or invokes triggers since they may access
672     to the same table and therefore should not see its
673     inconsistent state created by this optimization.
674     So we call start_bulk_insert to perform nesessary checks on
675     insert_many_values.elements, and - if nothing else - to initialize
676     the code to make the call of end_bulk_insert() below safe.
677   */
678   if (duplicates != DUP_ERROR || lex->is_ignore())
679     insert_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
680   /**
681      This is a simple check for the case when the table has a trigger
682      that reads from it, or when the statement invokes a stored function
683      that reads from the table being inserted to.
684      Engines can't handle a bulk insert in parallel with a read form the
685      same table in the same connection.
686   */
687   if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
688     insert_table->file->ha_start_bulk_insert(insert_many_values.elements);
689 
690   prepare_triggers_for_insert_stmt(insert_table);
691 
692   for (Field** next_field= insert_table->field; *next_field; ++next_field)
693   {
694     (*next_field)->reset_warnings();
695   }
696 
697   while ((values= its++))
698   {
699     if (insert_field_list.elements || !value_count)
700     {
701       restore_record(insert_table, s->default_values);  // Get empty record
702 
703       /*
704         Check whether default values of the insert_field_list not specified in
705         column list are correct or not.
706       */
707       if (validate_default_values_of_unset_fields(thd, insert_table))
708       {
709         error= 1;
710         break;
711       }
712       if (fill_record_n_invoke_before_triggers(thd, &info, insert_field_list,
713                                                *values, insert_table,
714                                                TRG_EVENT_INSERT,
715                                                insert_table->s->fields))
716       {
717         assert(thd->is_error());
718         /*
719           TODO: Convert warnings to errors if values_list.elements == 1
720           and check that all items return warning in case of problem with
721           storing field.
722         */
723         error= 1;
724         break;
725       }
726 
727       res= check_that_all_fields_are_given_values(thd, insert_table,
728                                                   table_list);
729       if (res)
730       {
731         assert(thd->is_error());
732         error= 1;
733         break;
734       }
735     }
736     else
737     {
738       if (lex->used_tables)               // Column used in values()
739         restore_record(insert_table, s->default_values); // Get empty record
740       else
741       {
742         TABLE_SHARE *share= insert_table->s;
743 
744         /*
745           Fix delete marker. No need to restore rest of record since it will
746           be overwritten by fill_record() anyway (and fill_record() does not
747           use default values in this case).
748         */
749         insert_table->record[0][0]= share->default_values[0];
750 
751         /* Fix undefined null_bits. */
752         if (share->null_bytes > 1 && share->last_null_bit_pos)
753         {
754           insert_table->record[0][share->null_bytes - 1]=
755             share->default_values[share->null_bytes - 1];
756         }
757       }
758       if (fill_record_n_invoke_before_triggers(thd, insert_table->field,
759                                                *values, insert_table,
760                                                TRG_EVENT_INSERT,
761                                                insert_table->s->fields))
762       {
763         assert(thd->is_error());
764         error= 1;
765         break;
766       }
767     }
768 
769     if ((res= table_list->view_check_option(thd)) == VIEW_CHECK_SKIP)
770       continue;
771     else if (res == VIEW_CHECK_ERROR)
772     {
773       error= 1;
774       break;
775     }
776     error= write_record(thd, insert_table, &info, &update);
777     if (error)
778       break;
779     thd->get_stmt_da()->inc_current_row_for_condition();
780   }
781   } // Statement plan is available within these braces
782 
783   error= thd->get_stmt_da()->is_error();
784   free_underlaid_joins(thd, select_lex);
785   joins_freed= true;
786 
787   /*
788     Now all rows are inserted.  Time to update logs and sends response to
789     user
790   */
791   {
792     /* TODO: Only call this if insert_table->found_next_number_field.*/
793     insert_table->file->ha_release_auto_increment();
794     /*
795       Make sure 'end_bulk_insert()' is called regardless of current error
796     */
797     int loc_error= 0;
798     if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
799       loc_error= insert_table->file->ha_end_bulk_insert();
800     /*
801       Report error if 'end_bulk_insert()' failed, and set 'error' to 1
802     */
803     if (loc_error && !error)
804     {
805       /* purecov: begin inspected */
806       myf error_flags= MYF(0);
807       if (insert_table->file->is_fatal_error(loc_error))
808         error_flags|= ME_FATALERROR;
809 
810       insert_table->file->print_error(loc_error, error_flags);
811       error= 1;
812       /* purecov: end */
813     }
814     if (duplicates != DUP_ERROR || lex->is_ignore())
815       insert_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
816 
817     transactional_table= insert_table->file->has_transactions();
818 
819     if ((changed= (info.stats.copied || info.stats.deleted || info.stats.updated)))
820     {
821       /*
822         Invalidate the table in the query cache if something changed.
823         For the transactional algorithm to work the invalidation must be
824         before binlog writing and ha_autocommit_or_rollback
825       */
826       query_cache.invalidate_single(thd, lex->insert_table_leaf, true);
827       DEBUG_SYNC(thd, "wait_after_query_cache_invalidate");
828     }
829 
830     if (error <= 0 || thd->get_transaction()->cannot_safely_rollback(
831         Transaction_ctx::STMT))
832     {
833       if (mysql_bin_log.is_open())
834       {
835         int errcode= 0;
836 	if (error <= 0)
837         {
838 	  /*
839 	    [Guilhem wrote] Temporary errors may have filled
840 	    thd->net.last_error/errno.  For example if there has
841 	    been a disk full error when writing the row, and it was
842 	    MyISAM, then thd->net.last_error/errno will be set to
843             "disk full"... and the mysql_file_pwrite() will wait until free
844 	    space appears, and so when it finishes then the
845 	    write_row() was entirely successful
846 	  */
847 	  /* todo: consider removing */
848 	  thd->clear_error();
849 	}
850         else
851           errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
852 
853 	/* bug#22725:
854 
855 	A query which per-row-loop can not be interrupted with
856 	KILLED, like INSERT, and that does not invoke stored
857 	routines can be binlogged with neglecting the KILLED error.
858 
859 	If there was no error (error == zero) until after the end of
860 	inserting loop the KILLED flag that appeared later can be
861 	disregarded since previously possible invocation of stored
862 	routines did not result in any error due to the KILLED.  In
863 	such case the flag is ignored for constructing binlog event.
864 	*/
865 	assert(thd->killed != THD::KILL_BAD_DATA || error > 0);
866         if (thd->binlog_query(THD::ROW_QUERY_TYPE,
867                               thd->query().str, thd->query().length,
868 			           transactional_table, FALSE, FALSE,
869                                    errcode))
870 	  error= 1;
871       }
872     }
873     assert(transactional_table || !changed ||
874            thd->get_transaction()->cannot_safely_rollback(
875                                                           Transaction_ctx::STMT));
876   }
877   THD_STAGE_INFO(thd, stage_end);
878   /*
879     We'll report to the client this id:
880     - if the table contains an autoincrement column and we successfully
881     inserted an autogenerated value, the autogenerated value.
882     - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
883     called, X.
884     - if the table contains an autoincrement column, and some rows were
885     inserted, the id of the last "inserted" row (if IGNORE, that value may not
886     have been really inserted but ignored).
887   */
888   id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
889     thd->first_successful_insert_id_in_cur_stmt :
890     (thd->arg_of_last_insert_id_function ?
891      thd->first_successful_insert_id_in_prev_stmt :
892      ((insert_table->next_number_field && info.stats.copied) ?
893      insert_table->next_number_field->val_int() : 0));
894   insert_table->next_number_field= 0;
895   thd->count_cuted_fields= CHECK_FIELD_IGNORE;
896   insert_table->auto_increment_field_not_null= FALSE;
897   if (duplicates == DUP_REPLACE &&
898       (!insert_table->triggers ||
899        !insert_table->triggers->has_delete_triggers()))
900     insert_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
901 
902   if (thd->is_error())
903     goto exit_without_my_ok;
904 
905   if (insert_many_values.elements == 1 &&
906       (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
907   {
908     my_ok(thd, info.stats.copied + info.stats.deleted +
909           (thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
910            info.stats.touched : info.stats.updated),
911           id);
912   }
913   else
914   {
915     char buff[160];
916     ha_rows updated=
917       thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
918         info.stats.touched : info.stats.updated;
919     if (lex->is_ignore())
920       my_snprintf(buff, sizeof(buff),
921                   ER(ER_INSERT_INFO), (long) info.stats.records,
922                   (long) (info.stats.records - info.stats.copied),
923                   (long) thd->get_stmt_da()->current_statement_cond_count());
924     else
925       my_snprintf(buff, sizeof(buff),
926                   ER(ER_INSERT_INFO), (long) info.stats.records,
927                   (long) (info.stats.deleted + updated),
928                   (long) thd->get_stmt_da()->current_statement_cond_count());
929     my_ok(thd, info.stats.copied + info.stats.deleted + updated, id, buff);
930   }
931   DBUG_RETURN(FALSE);
932 
933 exit_without_my_ok:
934   thd->lex->clear_values_map();
935   if (!joins_freed)
936     free_underlaid_joins(thd, select_lex);
937   DBUG_RETURN(err);
938 }
939 
940 
941 /**
942   Additional check for insertability for VIEW
943 
944   A view is insertable if the following conditions are true:
945   - All columns being inserted into are from a single table.
946   - All not used columns in table have default values.
947   - All columns in view are distinct (not referring to the same column).
948 
949   @param thd              thread handler
950   @param[in,out] view     reference to view being inserted into.
951                           view->contain_auto_increment is true if and only if
952                           the view contains an auto_increment field.
953   @param insert_table_ref reference to underlying table being inserted into
954 
955   @return false if success, true if error
956 */
957 
check_view_insertability(THD * thd,TABLE_LIST * view,const TABLE_LIST * insert_table_ref)958 static bool check_view_insertability(THD *thd, TABLE_LIST *view,
959                                      const TABLE_LIST *insert_table_ref)
960  {
961   DBUG_ENTER("check_view_insertability");
962 
963   const uint num= view->view_query()->select_lex->item_list.elements;
964   TABLE *const table= insert_table_ref->table;
965   MY_BITMAP used_fields;
966   enum_mark_columns save_mark_used_columns= thd->mark_used_columns;
967 
968   const uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
969   uint32 *const used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
970   if (!used_fields_buff)
971     DBUG_RETURN(true);                      /* purecov: inspected */
972 
973   assert(view->table == NULL &&
974          table != NULL &&
975          view->field_translation != 0);
976 
977   (void) bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
978   bitmap_clear_all(&used_fields);
979 
980   view->contain_auto_increment= false;
981 
982   thd->mark_used_columns= MARK_COLUMNS_NONE;
983 
984   // No privilege checking is done for these columns
985   Column_privilege_tracker column_privilege(thd, 0);
986 
987   /* check simplicity and prepare unique test of view */
988   Field_translator *const trans_start= view->field_translation;
989   Field_translator *const trans_end= trans_start + num;
990 
991   for (Field_translator *trans= trans_start; trans != trans_end; trans++)
992   {
993     if (trans->item == NULL)
994       continue;
995     /*
996       @todo
997       This fix_fields() call is necessary for execution of prepared statements.
998       When repeated preparation is eliminated the call can be deleted.
999     */
1000     if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
1001       DBUG_RETURN(true);  /* purecov: inspected */
1002 
1003     Item_field *field;
1004     /* simple SELECT list entry (field without expression) */
1005     if (!(field= trans->item->field_for_view_update()))
1006       DBUG_RETURN(true);
1007 
1008     if (field->field->unireg_check == Field::NEXT_NUMBER)
1009       view->contain_auto_increment= true;
1010     /* prepare unique test */
1011     /*
1012       remove collation (or other transparent for update function) if we have
1013       it
1014     */
1015     trans->item= field;
1016   }
1017   thd->mark_used_columns= save_mark_used_columns;
1018 
1019   /* unique test */
1020   for (Field_translator *trans= trans_start; trans != trans_end; trans++)
1021   {
1022     if (trans->item == NULL)
1023       continue;
1024     /* Thanks to test above, we know that all columns are of type Item_field */
1025     Item_field *field= (Item_field *)trans->item;
1026     /* check fields belong to table in which we are inserting */
1027     if (field->field->table == table &&
1028         bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1029       DBUG_RETURN(true);
1030   }
1031 
1032   DBUG_RETURN(false);
1033 }
1034 
1035 
1036 /**
1037   Recursive helper function for resolving join conditions for
1038   insertion into view for prepared statements.
1039 
1040   @param thd      Thread handler
1041   @param tr       Table structure which is traversed recursively
1042 
1043   @return false if success, true if error
1044 */
fix_join_cond_for_insert(THD * thd,TABLE_LIST * tr)1045 static bool fix_join_cond_for_insert(THD *thd, TABLE_LIST *tr)
1046 {
1047   if (tr->join_cond() && !tr->join_cond()->fixed)
1048   {
1049     Column_privilege_tracker column_privilege(thd, SELECT_ACL);
1050 
1051     if (tr->join_cond()->fix_fields(thd, NULL))
1052       return true;             /* purecov: inspected */
1053   }
1054 
1055   if (tr->nested_join == NULL)
1056     return false;
1057 
1058   List_iterator<TABLE_LIST> li(tr->nested_join->join_list);
1059   TABLE_LIST *ti;
1060 
1061   while ((ti= li++))
1062   {
1063     if (fix_join_cond_for_insert(thd, ti))
1064       return true;             /* purecov: inspected */
1065   }
1066   return false;
1067 }
1068 
1069 /**
1070   Check if table can be updated
1071 
1072   @param thd           Thread handle
1073   @param table_list    Table reference
1074   @param fields        List of fields to be inserted
1075   @param select_insert True if processing INSERT ... SELECT statement
1076 
1077   @return false if success, true if error
1078 */
1079 
1080 bool
mysql_prepare_insert_check_table(THD * thd,TABLE_LIST * table_list,List<Item> & fields,bool select_insert)1081 Sql_cmd_insert_base::mysql_prepare_insert_check_table(THD *thd,
1082                                                       TABLE_LIST *table_list,
1083                                                       List<Item> &fields,
1084                                                       bool select_insert)
1085 {
1086   DBUG_ENTER("mysql_prepare_insert_check_table");
1087 
1088   SELECT_LEX *const select= thd->lex->select_lex;
1089   const bool insert_into_view= table_list->is_view();
1090 
1091   if (select->setup_tables(thd, table_list, select_insert))
1092     DBUG_RETURN(true);             /* purecov: inspected */
1093 
1094   if (insert_into_view)
1095   {
1096     // Allowing semi-join would transform this table into a "join view"
1097     if (table_list->resolve_derived(thd, false))
1098       DBUG_RETURN(true);
1099 
1100     if (select->merge_derived(thd, table_list))
1101       DBUG_RETURN(true);           /* purecov: inspected */
1102 
1103     /*
1104       On second preparation, we may need to resolve view condition generated
1105       when merging the view.
1106     */
1107     if (!select->first_execution && table_list->is_merged() &&
1108         fix_join_cond_for_insert(thd, table_list))
1109       DBUG_RETURN(true);           /* purecov: inspected */
1110   }
1111 
1112   if (!table_list->is_insertable())
1113   {
1114     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
1115     DBUG_RETURN(true);
1116   }
1117 
1118   // Allow semi-join for selected tables containing subqueries
1119   if (select->derived_table_count && select->resolve_derived(thd, true))
1120     DBUG_RETURN(true);
1121 
1122   /*
1123     First table in list is the one being inserted into, requires INSERT_ACL.
1124     All other tables require SELECT_ACL only.
1125   */
1126   if (select->derived_table_count &&
1127       select->check_view_privileges(thd, INSERT_ACL, SELECT_ACL))
1128     DBUG_RETURN(true);
1129 
1130   // Precompute and store the row types of NATURAL/USING joins.
1131   if (setup_natural_join_row_types(thd, select->join_list, &select->context))
1132     DBUG_RETURN(true);
1133 
1134   if (insert_into_view && !fields.elements)
1135   {
1136     empty_field_list_on_rset= true;
1137     if (table_list->is_multiple_tables())
1138     {
1139       my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1140                table_list->view_db.str, table_list->view_name.str);
1141       DBUG_RETURN(true);
1142     }
1143     if (insert_view_fields(thd, &fields, table_list))
1144       DBUG_RETURN(true);
1145     /*
1146        Item_fields inserted above from field_translation list have been
1147        already fixed in resolved_derived(), thus setup_fields() in
1148        check_insert_fields() will not process them, not mark them in write_set;
1149        we have to do it:
1150     */
1151     bitmap_set_all(table_list->updatable_base_table()->table->write_set);
1152   }
1153 
1154   DBUG_RETURN(false);
1155 }
1156 
1157 
1158 /**
1159   Get extra info for tables we insert into
1160 
1161   @param table     table(TABLE object) we insert into,
1162                    might be NULL in case of view
1163   @param           table(TABLE_LIST object) or view we insert into
1164 */
1165 
prepare_for_positional_update(TABLE * table,TABLE_LIST * tables)1166 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1167 {
1168   if (table)
1169   {
1170     table->prepare_for_position();
1171     return;
1172   }
1173 
1174   assert(tables->is_view());
1175   List_iterator<TABLE_LIST> it(*tables->view_tables);
1176   TABLE_LIST *tbl;
1177   while ((tbl= it++))
1178     prepare_for_positional_update(tbl->table, tbl);
1179 
1180   return;
1181 }
1182 
1183 
1184 /**
1185   Prepare items in INSERT statement
1186 
1187   @param thd                   Thread handler
1188   @param table_list            Global/local table list
1189   @param values                List of values to be inserted
1190   @param duplic                What to do on duplicate key error
1191   @param where                 Where clause (for insert ... select)
1192   @param select_insert         TRUE if INSERT ... SELECT statement
1193 
1194   @todo (in far future)
1195     In cases of:
1196     INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1197     ON DUPLICATE KEY ...
1198     we should be able to refer to sum1 in the ON DUPLICATE KEY part
1199 
1200   WARNING
1201     You MUST set table->insert_values to 0 after calling this function
1202     before releasing the table object.
1203 
1204   @return false if success, true if error
1205 */
1206 
mysql_prepare_insert(THD * thd,TABLE_LIST * table_list,List_item * values,bool select_insert)1207 bool Sql_cmd_insert_base::mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1208                                                List_item *values,
1209                                                bool select_insert)
1210 {
1211   DBUG_ENTER("mysql_prepare_insert");
1212 
1213   // INSERT should have a SELECT or VALUES clause
1214   assert (!select_insert || !values);
1215 
1216   // Number of update fields must match number of update values
1217   assert(insert_update_list.elements == insert_value_list.elements);
1218 
1219   LEX * const lex= thd->lex;
1220   SELECT_LEX *const select_lex= lex->select_lex;
1221   Name_resolution_context *const context= &select_lex->context;
1222   Name_resolution_context_state ctx_state;
1223   const bool insert_into_view= table_list->is_view();
1224   bool res= false;
1225 
1226   DBUG_PRINT("enter", ("table_list 0x%lx, view %d",
1227                        (ulong)table_list,
1228                        (int)insert_into_view));
1229   /*
1230     For subqueries in VALUES() we should not see the table in which we are
1231     inserting (for INSERT ... SELECT this is done by changing table_list,
1232     because INSERT ... SELECT share SELECT_LEX it with SELECT.
1233   */
1234   if (!select_insert)
1235   {
1236     for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1237          un;
1238          un= un->next_unit())
1239     {
1240       for (SELECT_LEX *sl= un->first_select();
1241            sl;
1242            sl= sl->next_select())
1243       {
1244         sl->context.outer_context= 0;
1245       }
1246     }
1247   }
1248 
1249   if (mysql_prepare_insert_check_table(thd, table_list, insert_field_list,
1250                                        select_insert))
1251     DBUG_RETURN(true);
1252 
1253   // REPLACE for a JOIN view is not permitted.
1254   if (table_list->is_multiple_tables() && duplicates == DUP_REPLACE)
1255   {
1256     my_error(ER_VIEW_DELETE_MERGE_VIEW, MYF(0),
1257              table_list->view_db.str, table_list->view_name.str);
1258     DBUG_RETURN(true);
1259   }
1260 
1261   if (duplicates == DUP_UPDATE)
1262   {
1263     /* it should be allocated before Item::fix_fields() */
1264     if (table_list->set_insert_values(thd->mem_root))
1265       DBUG_RETURN(true);                       /* purecov: inspected */
1266   }
1267 
1268   // Save the state of the current name resolution context.
1269   ctx_state.save_state(context, table_list);
1270 
1271   // Prepare the lists of columns and values in the statement.
1272   if (values)
1273   {
1274     // if we have INSERT ... VALUES () we cannot have a GROUP BY clause
1275     assert (!select_lex->group_list.elements);
1276 
1277     /*
1278       Perform name resolution only in the first table - 'table_list',
1279       which is the table that is inserted into.
1280      */
1281     assert(table_list->next_local == NULL);
1282     table_list->next_local= NULL;
1283     context->resolve_in_table_list_only(table_list);
1284 
1285     if (!res)
1286       res= check_insert_fields(thd, context->table_list, insert_field_list,
1287                                values->elements, true, !insert_into_view);
1288     table_map map= 0;
1289     if (!res)
1290       map= lex->insert_table_leaf->map();
1291 
1292     // values is reset here to cover all the rows in the VALUES-list.
1293     List_iterator_fast<List_item> its(insert_many_values);
1294 
1295     // Check whether all rows have the same number of fields.
1296     const uint value_count= values->elements;
1297     ulong counter= 0;
1298     while ((values= its++))
1299     {
1300       counter++;
1301       if (values->elements != value_count)
1302       {
1303         my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
1304         DBUG_RETURN(true);
1305       }
1306 
1307       if (!res)
1308         res= setup_fields(thd, Ref_ptr_array(), *values, SELECT_ACL, NULL,
1309                           false, false);
1310       if (!res)
1311         res= check_valid_table_refs(table_list, *values, map);
1312 
1313       if (!res && lex->insert_table_leaf->table->has_gcol())
1314         res= validate_gc_assignment(thd, &insert_field_list, values,
1315                                     lex->insert_table_leaf->table);
1316     }
1317     its.rewind();
1318     values= its++;
1319 
1320     if (!res && duplicates == DUP_UPDATE)
1321     {
1322 #ifndef NO_EMBEDDED_ACCESS_CHECKS
1323       table_list->set_want_privilege(UPDATE_ACL);
1324 #endif
1325       // Setup the columns to be updated
1326       res= setup_fields(thd, Ref_ptr_array(),
1327                         insert_update_list, UPDATE_ACL, NULL, false, true);
1328       if (!res)
1329         res= check_valid_table_refs(table_list, insert_update_list, map);
1330 
1331       // Setup the corresponding values
1332       thd->lex->in_update_value_clause= true;
1333       if (!res)
1334         res= setup_fields(thd, Ref_ptr_array(), insert_value_list, SELECT_ACL,
1335                           NULL, false, false);
1336       thd->lex->in_update_value_clause= false;
1337 
1338       if (!res)
1339         res= check_valid_table_refs(table_list, insert_value_list, map);
1340 
1341       if (!res && lex->insert_table_leaf->table->has_gcol())
1342         res= validate_gc_assignment(thd, &insert_update_list,
1343                                     &insert_value_list,
1344                                     lex->insert_table_leaf->table);
1345     }
1346   }
1347   else if (thd->stmt_arena->is_stmt_prepare())
1348   {
1349     /*
1350       This section of code is more or less a duplicate of the code  in
1351       Query_result_insert::prepare, and the 'if' branch above.
1352       @todo Consolidate these three sections into one.
1353     */
1354     /*
1355       Perform name resolution only in the first table - 'table_list',
1356       which is the table that is inserted into.
1357      */
1358     table_list->next_local= NULL;
1359     thd->dup_field= NULL;
1360     context->resolve_in_table_list_only(table_list);
1361 
1362     /*
1363       When processing a prepared INSERT ... SELECT statement,
1364       mysql_prepare_insert() is called from
1365       mysql_insert_select_prepare_tester(), when the values list (aka the
1366       SELECT list from the SELECT) is not resolved yet, so pass "false"
1367       for value_count_known.
1368     */
1369     res= check_insert_fields(thd, context->table_list, insert_field_list, 0,
1370                              false, !insert_into_view);
1371     table_map map= 0;
1372     if (!res)
1373       map= lex->insert_table_leaf->map();
1374 
1375     if (!res && lex->insert_table_leaf->table->vfield)
1376       res= validate_gc_assignment(thd, &insert_field_list, values,
1377                                   lex->insert_table_leaf->table);
1378 
1379     if (!res && duplicates == DUP_UPDATE)
1380     {
1381 #ifndef NO_EMBEDDED_ACCESS_CHECKS
1382       table_list->set_want_privilege(UPDATE_ACL);
1383 #endif
1384       // Setup the columns to be modified
1385       res= setup_fields(thd, Ref_ptr_array(),
1386                         insert_update_list, UPDATE_ACL, NULL, false, true);
1387       if (!res)
1388         res= check_valid_table_refs(table_list, insert_update_list, map);
1389 
1390       if (!res && lex->insert_table_leaf->table->vfield)
1391         res= validate_gc_assignment(thd, &insert_update_list,
1392                                     &insert_value_list,
1393                                     lex->insert_table_leaf->table);
1394       assert(!table_list->next_name_resolution_table);
1395       if (select_lex->group_list.elements == 0 && !select_lex->with_sum_func)
1396       {
1397         /*
1398           There are two separata name resolution contexts:
1399           the INSERT table and the tables in the SELECT expression
1400           Make a single context out of them by concatenating the lists:
1401         */
1402         table_list->next_name_resolution_table=
1403           ctx_state.get_first_name_resolution_table();
1404       }
1405       thd->lex->in_update_value_clause= true;
1406       if (!res)
1407         res= setup_fields(thd, Ref_ptr_array(), insert_value_list,
1408                           SELECT_ACL, NULL, false, false);
1409       thd->lex->in_update_value_clause= false;
1410 
1411       /*
1412         Notice that there is no need to apply the Item::update_value_transformer
1413         here, as this will be done during EXECUTE in
1414         Query_result_insert::prepare().
1415       */
1416     }
1417   }
1418 
1419   // Restore the current name resolution context
1420   ctx_state.restore_state(context, table_list);
1421 
1422   if (res)
1423     DBUG_RETURN(res);
1424 
1425   if (!select_insert)
1426   {
1427     TABLE_LIST *const duplicate=
1428       unique_table(thd, lex->insert_table_leaf, table_list->next_global, true);
1429     if (duplicate)
1430     {
1431       update_non_unique_table_error(table_list, "INSERT", duplicate);
1432       DBUG_RETURN(true);
1433     }
1434   }
1435 
1436   if (table_list->is_merged())
1437   {
1438     Column_privilege_tracker column_privilege(thd, SELECT_ACL);
1439 
1440     if (table_list->prepare_check_option(thd))
1441       DBUG_RETURN(true);
1442 
1443     if (duplicates == DUP_REPLACE &&
1444         table_list->prepare_replace_filter(thd))
1445       DBUG_RETURN(true);
1446   }
1447 
1448   if (!select_insert && select_lex->apply_local_transforms(thd, false))
1449     DBUG_RETURN(true);
1450 
1451   DBUG_RETURN(false);
1452 }
1453 
1454 
1455 	/* Check if there is more uniq keys after field */
1456 
last_uniq_key(TABLE * table,uint keynr)1457 static int last_uniq_key(TABLE *table,uint keynr)
1458 {
1459   /*
1460     When an underlying storage engine informs that the unique key
1461     conflicts are not reported in the ascending order by setting
1462     the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1463     information to determine the last key conflict.
1464 
1465     The information about the last key conflict will be used to
1466     do a replace of the new row on the conflicting row, rather
1467     than doing a delete (of old row) + insert (of new row).
1468 
1469     Hence check for this flag and disable replacing the last row
1470     by returning 0 always. Returning 0 will result in doing
1471     a delete + insert always.
1472   */
1473   if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1474     return 0;
1475 
1476   while (++keynr < table->s->keys)
1477     if (table->key_info[keynr].flags & HA_NOSAME)
1478       return 0;
1479   return 1;
1480 }
1481 
1482 
1483 /**
1484   Write a record to table with optional deletion of conflicting records,
1485   invoke proper triggers if needed.
1486 
1487   SYNOPSIS
1488      write_record()
1489       thd   - thread context
1490       table - table to which record should be written
1491       info  - COPY_INFO structure describing handling of duplicates
1492               and which is used for counting number of records inserted
1493               and deleted.
1494       update - COPY_INFO structure describing the UPDATE part (only used for
1495                INSERT ON DUPLICATE KEY UPDATE)
1496 
1497   @note
1498 
1499   Once this record is written to the table buffer, any AFTER INSERT trigger
1500   will be invoked. If instead of inserting a new record we end up updating an
1501   old one, both ON UPDATE triggers will fire instead. Similarly both ON
1502   DELETE triggers will be invoked if are to delete conflicting records.
1503 
1504   Call thd->transaction.stmt.mark_modified_non_trans_table() if table is a
1505   non-transactional table.
1506 
1507   RETURN VALUE
1508     0     - success
1509     non-0 - error
1510 */
1511 
write_record(THD * thd,TABLE * table,COPY_INFO * info,COPY_INFO * update)1512 int write_record(THD *thd, TABLE *table, COPY_INFO *info, COPY_INFO *update)
1513 {
1514   int error, trg_error= 0;
1515   char *key=0;
1516   MY_BITMAP *save_read_set, *save_write_set;
1517   ulonglong prev_insert_id= table->file->next_insert_id;
1518   ulonglong insert_id_for_cur_row= 0;
1519   MEM_ROOT mem_root;
1520   DBUG_ENTER("write_record");
1521 
1522   /* Here we are using separate MEM_ROOT as this memory should be freed once we
1523      exit write_record() function. This is marked as not instumented as it is
1524      allocated for very short time in a very specific case.
1525   */
1526   init_sql_alloc(PSI_NOT_INSTRUMENTED, &mem_root, 256, 0);
1527   info->stats.records++;
1528   save_read_set=  table->read_set;
1529   save_write_set= table->write_set;
1530 
1531   info->set_function_defaults(table);
1532 
1533   const enum_duplicates duplicate_handling= info->get_duplicate_handling();
1534 
1535   if (duplicate_handling == DUP_REPLACE || duplicate_handling == DUP_UPDATE)
1536   {
1537     assert(duplicate_handling != DUP_UPDATE || update != NULL);
1538     while ((error=table->file->ha_write_row(table->record[0])))
1539     {
1540       uint key_nr;
1541       /*
1542         If we do more than one iteration of this loop, from the second one the
1543         row will have an explicit value in the autoinc field, which was set at
1544         the first call of handler::update_auto_increment(). So we must save
1545         the autogenerated value to avoid thd->insert_id_for_cur_row to become
1546         0.
1547       */
1548       if (table->file->insert_id_for_cur_row > 0)
1549         insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1550       else
1551         table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1552       bool is_duplicate_key_error;
1553       if (!table->file->is_ignorable_error(error))
1554 	goto err;
1555       is_duplicate_key_error= (error == HA_ERR_FOUND_DUPP_KEY ||
1556                                error == HA_ERR_FOUND_DUPP_UNIQUE);
1557       if (!is_duplicate_key_error)
1558       {
1559         /*
1560           We come here when we had an ignorable error which is not a duplicate
1561           key error. In this we ignore error if ignore flag is set, otherwise
1562           report error as usual. We will not do any duplicate key processing.
1563         */
1564          info->last_errno= error;
1565          table->file->print_error(error, MYF(0));
1566          /*
1567            If IGNORE option is used, handler errors will be downgraded
1568            to warnings and don't have to stop the iteration.
1569          */
1570          if (thd->is_error())
1571            goto before_trg_err;
1572          goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1573       }
1574       if ((int) (key_nr = table->file->get_dup_key(error)) < 0)
1575       {
1576 	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
1577 	goto err;
1578       }
1579       /*
1580         key index value is either valid in the range [0-MAX_KEY) or
1581         has value MAX_KEY as a marker for the case when no information
1582         about key can be found. In the last case we have to require
1583         that storage engine has the flag HA_DUPLICATE_POS turned on.
1584         If this invariant is false then assert will crash
1585         the server built in debug mode. For the server that was built
1586         without DEBUG we have additional check for the value of key_nr
1587         in the code below in order to report about error in any case.
1588       */
1589       assert(key_nr != MAX_KEY ||
1590              (key_nr == MAX_KEY &&
1591               (table->file->ha_table_flags() & HA_DUPLICATE_POS)));
1592 
1593       DEBUG_SYNC(thd, "write_row_replace");
1594 
1595       /* Read all columns for the row we are going to replace */
1596       table->use_all_columns();
1597       /*
1598 	Don't allow REPLACE to replace a row when a auto_increment column
1599 	was used.  This ensures that we don't get a problem when the
1600 	whole range of the key has been used.
1601       */
1602       if (duplicate_handling == DUP_REPLACE &&
1603           table->next_number_field &&
1604           key_nr == table->s->next_number_index &&
1605 	  (insert_id_for_cur_row > 0))
1606 	goto err;
1607       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1608       {
1609         if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1610           goto err;
1611       }
1612       /*
1613         If the key index is equal to MAX_KEY it's treated as unknown key case
1614         and we shouldn't try to locate key info.
1615       */
1616       else if (key_nr < MAX_KEY)
1617       {
1618 	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1619 	{
1620 	  error=my_errno();
1621 	  goto err;
1622 	}
1623 
1624 	if (!key)
1625 	{
1626 	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length,
1627 					   MAX_KEY_LENGTH)))
1628 	  {
1629 	    error=ENOMEM;
1630 	    goto err;
1631 	  }
1632 	}
1633 	/*
1634           If we convert INSERT operation internally to an UPDATE.
1635           An INSERT operation may update table->vfield for BLOB fields,
1636           So here we recalculate data for generated columns.
1637 	*/
1638         if (table->vfield) {
1639           update_generated_write_fields(table->write_set, table);
1640         }
1641 	key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1642 	if ((error=(table->file->ha_index_read_idx_map(table->record[1],key_nr,
1643                                                        (uchar*) key, HA_WHOLE_KEY,
1644                                                        HA_READ_KEY_EXACT))))
1645 	  goto err;
1646       }
1647       else
1648       {
1649         /*
1650           For the server built in non-debug mode returns error if
1651           handler::get_dup_key() returned MAX_KEY as the value of key index.
1652         */
1653         error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
1654         goto err;
1655       }
1656       if (duplicate_handling == DUP_UPDATE)
1657       {
1658         int res= 0;
1659         /*
1660           We don't check for other UNIQUE keys - the first row
1661           that matches, is updated. If update causes a conflict again,
1662           an error is returned
1663         */
1664 	assert(table->insert_values != NULL);
1665         /*
1666           The insert has failed, store the insert_id generated for
1667           this row to be re-used for the next insert.
1668         */
1669         if (insert_id_for_cur_row > 0) prev_insert_id = insert_id_for_cur_row;
1670 
1671         store_record(table,insert_values);
1672         /*
1673           Special check for BLOB/GEOMETRY field in statements with
1674           "ON DUPLICATE KEY UPDATE" clause.
1675           See mysql_prepare_blob_values() function for more details.
1676         */
1677         if (mysql_prepare_blob_values(thd,
1678                                       *update->get_changed_columns(),
1679                                       &mem_root))
1680            goto before_trg_err;
1681         restore_record(table,record[1]);
1682         assert(update->get_changed_columns()->elements ==
1683                update->update_values->elements);
1684         if (fill_record_n_invoke_before_triggers(thd, update,
1685                                                  *update->get_changed_columns(),
1686                                                  *update->update_values,
1687                                                  table, TRG_EVENT_UPDATE, 0))
1688           goto before_trg_err;
1689 
1690         bool insert_id_consumed= false;
1691         if (// UPDATE clause specifies a value for the auto increment field
1692             table->auto_increment_field_not_null &&
1693             // An auto increment value has been generated for this row
1694             (insert_id_for_cur_row > 0))
1695         {
1696           // After-update value:
1697           const ulonglong auto_incr_val= table->next_number_field->val_int();
1698           if (auto_incr_val == insert_id_for_cur_row)
1699           {
1700             // UPDATE wants to use the generated value
1701             insert_id_consumed= true;
1702           }
1703           else if (table->file->auto_inc_interval_for_cur_row.
1704                    in_range(auto_incr_val))
1705           {
1706             /*
1707               UPDATE wants to use one auto generated value which we have already
1708               reserved for another (previous or following) row. That may cause
1709               a duplicate key error if we later try to insert the reserved
1710               value. Such conflicts on auto generated values would be strange
1711               behavior, so we return a clear error now.
1712             */
1713             my_error(ER_AUTO_INCREMENT_CONFLICT, MYF(0));
1714 	    goto before_trg_err;
1715           }
1716         }
1717 
1718         if (!insert_id_consumed)
1719           table->file->restore_auto_increment(prev_insert_id);
1720 
1721         /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1722         {
1723           const TABLE_LIST *inserted_view=
1724             table->pos_in_table_list->belong_to_view;
1725           if (inserted_view != NULL)
1726           {
1727             res= inserted_view->view_check_option(thd);
1728             if (res == VIEW_CHECK_SKIP)
1729               goto ok_or_after_trg_err;
1730             if (res == VIEW_CHECK_ERROR)
1731               goto before_trg_err;
1732           }
1733         }
1734 
1735         info->stats.touched++;
1736         if (!records_are_comparable(table) || compare_records(table))
1737         {
1738           // Handle the INSERT ON DUPLICATE KEY UPDATE operation
1739           update->set_function_defaults(table);
1740 
1741           if ((error=table->file->ha_update_row(table->record[1],
1742                                                 table->record[0])) &&
1743               error != HA_ERR_RECORD_IS_THE_SAME)
1744           {
1745              info->last_errno= error;
1746              myf error_flags= MYF(0);
1747              if (table->file->is_fatal_error(error))
1748                error_flags|= ME_FATALERROR;
1749              table->file->print_error(error, error_flags);
1750              /*
1751                If IGNORE option is used, handler errors will be downgraded
1752                to warnings and don't  have to stop the iteration.
1753              */
1754              if (thd->is_error())
1755                goto before_trg_err;
1756              goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1757           }
1758 
1759           if (error != HA_ERR_RECORD_IS_THE_SAME)
1760             info->stats.updated++;
1761           else
1762             error= 0;
1763           /*
1764             If ON DUP KEY UPDATE updates a row instead of inserting one, it's
1765             like a regular UPDATE statement: it should not affect the value of a
1766             next SELECT LAST_INSERT_ID() or mysql_insert_id().
1767             Except if LAST_INSERT_ID(#) was in the INSERT query, which is
1768             handled separately by THD::arg_of_last_insert_id_function.
1769           */
1770           insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1771           info->stats.copied++;
1772         }
1773 
1774         // Execute the 'AFTER, ON UPDATE' trigger
1775         trg_error= (table->triggers &&
1776                     table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1777                                                       TRG_ACTION_AFTER, TRUE));
1778         goto ok_or_after_trg_err;
1779       }
1780       else /* DUP_REPLACE */
1781       {
1782         TABLE_LIST *view= table->pos_in_table_list->belong_to_view;
1783 
1784         if (view && view->replace_filter)
1785         {
1786           const size_t record_length= table->s->reclength;
1787 
1788           void *record0_saved= my_malloc(PSI_NOT_INSTRUMENTED, record_length,
1789                                          MYF(MY_WME));
1790 
1791           if (!record0_saved)
1792           {
1793             error= ENOMEM;
1794             goto err;
1795           }
1796 
1797           // Save the record used for comparison.
1798           memcpy(record0_saved, table->record[0], record_length);
1799 
1800           // Preparing the record for comparison.
1801           memcpy(table->record[0], table->record[1], record_length);
1802 
1803           // Checking if the row being conflicted is visible by the view.
1804           bool found_row_in_view= view->replace_filter->val_int();
1805 
1806           // Restoring the record back.
1807           memcpy(table->record[0], record0_saved, record_length);
1808 
1809           my_free(record0_saved);
1810 
1811           if (!found_row_in_view)
1812           {
1813             my_error(ER_REPLACE_INACCESSIBLE_ROWS, MYF(0));
1814             goto err;
1815           }
1816         }
1817 
1818 	/*
1819 	  The manual defines the REPLACE semantics that it is either
1820 	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
1821 	  InnoDB do not function in the defined way if we allow MySQL
1822 	  to convert the latter operation internally to an UPDATE.
1823           We also should not perform this conversion if we have
1824           timestamp field with ON UPDATE which is different from DEFAULT.
1825           Another case when conversion should not be performed is when
1826           we have ON DELETE trigger on table so user may notice that
1827           we cheat here. Note that it is ok to do such conversion for
1828           tables which have ON UPDATE but have no ON DELETE triggers,
1829           we just should not expose this fact to users by invoking
1830           ON UPDATE triggers.
1831 	*/
1832 
1833 	if (last_uniq_key(table,key_nr) &&
1834 	    !table->file->referenced_by_foreign_key() &&
1835             (!table->triggers || !table->triggers->has_delete_triggers()))
1836         {
1837           if ((error=table->file->ha_update_row(table->record[1],
1838 					        table->record[0])) &&
1839               error != HA_ERR_RECORD_IS_THE_SAME)
1840             goto err;
1841           if (error != HA_ERR_RECORD_IS_THE_SAME)
1842             info->stats.deleted++;
1843           else
1844             error= 0;
1845           thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1846           /*
1847             Since we pretend that we have done insert we should call
1848             its after triggers.
1849           */
1850           goto after_trg_n_copied_inc;
1851         }
1852         else
1853         {
1854           if (table->triggers &&
1855               table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1856                                                 TRG_ACTION_BEFORE, TRUE))
1857             goto before_trg_err;
1858           if ((error=table->file->ha_delete_row(table->record[1])))
1859             goto err;
1860           info->stats.deleted++;
1861           if (!table->file->has_transactions())
1862             thd->get_transaction()->mark_modified_non_trans_table(
1863               Transaction_ctx::STMT);
1864           if (table->triggers &&
1865               table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1866                                                 TRG_ACTION_AFTER, TRUE))
1867           {
1868             trg_error= 1;
1869             goto ok_or_after_trg_err;
1870           }
1871           /* Let us attempt do write_row() once more */
1872         }
1873       }
1874     }
1875 
1876     /*
1877         If more than one iteration of the above while loop is done, from the second
1878         one the row being inserted will have an explicit value in the autoinc field,
1879         which was set at the first call of handler::update_auto_increment(). This
1880         value is saved to avoid thd->insert_id_for_cur_row becoming 0. Use this saved
1881         autoinc value.
1882      */
1883     if (table->file->insert_id_for_cur_row == 0)
1884       table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1885 
1886     thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1887     /*
1888       Restore column maps if they where replaced during an duplicate key
1889       problem.
1890     */
1891     if (table->read_set != save_read_set ||
1892         table->write_set != save_write_set)
1893       table->column_bitmaps_set(save_read_set, save_write_set);
1894   }
1895   else if ((error=table->file->ha_write_row(table->record[0])))
1896   {
1897     DEBUG_SYNC(thd, "write_row_noreplace");
1898     info->last_errno= error;
1899     myf error_flags= MYF(0);
1900     if (table->file->is_fatal_error(error))
1901       error_flags|= ME_FATALERROR;
1902     table->file->print_error(error, error_flags);
1903     /*
1904       If IGNORE option is used, handler errors will be downgraded
1905       to warnings and don't  have to stop the iteration.
1906     */
1907     if (thd->is_error())
1908       goto before_trg_err;
1909     table->file->restore_auto_increment(prev_insert_id);
1910     goto ok_or_after_trg_err;
1911   }
1912 
1913 after_trg_n_copied_inc:
1914   info->stats.copied++;
1915   thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
1916   trg_error= (table->triggers &&
1917               table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
1918                                                 TRG_ACTION_AFTER, TRUE));
1919 
1920 ok_or_after_trg_err:
1921   if (key)
1922     my_safe_afree(key,table->s->max_unique_length,MAX_KEY_LENGTH);
1923   if (!table->file->has_transactions())
1924     thd->get_transaction()->mark_modified_non_trans_table(
1925       Transaction_ctx::STMT);
1926   free_root(&mem_root, MYF(0));
1927   DBUG_RETURN(trg_error);
1928 
1929 err:
1930   {
1931     myf error_flags= MYF(0);                      /**< Flag for fatal errors */
1932     info->last_errno= error;
1933     assert(thd->lex->current_select() != NULL);
1934     if (table->file->is_fatal_error(error))
1935       error_flags|= ME_FATALERROR;
1936 
1937     table->file->print_error(error, error_flags);
1938   }
1939 
1940 before_trg_err:
1941   table->file->restore_auto_increment(prev_insert_id);
1942   if (key)
1943     my_safe_afree(key, table->s->max_unique_length, MAX_KEY_LENGTH);
1944   table->column_bitmaps_set(save_read_set, save_write_set);
1945   free_root(&mem_root, MYF(0));
1946   DBUG_RETURN(1);
1947 }
1948 
1949 
1950 /******************************************************************************
1951   Check that all fields with arn't null_fields are used
1952 ******************************************************************************/
1953 
check_that_all_fields_are_given_values(THD * thd,TABLE * entry,TABLE_LIST * table_list)1954 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry,
1955                                            TABLE_LIST *table_list)
1956 {
1957   int err= 0;
1958   MY_BITMAP *write_set= entry->fields_set_during_insert;
1959 
1960   for (Field **field=entry->field ; *field ; field++)
1961   {
1962     if (!bitmap_is_set(write_set, (*field)->field_index) &&
1963         ((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
1964         ((*field)->real_type() != MYSQL_TYPE_ENUM))
1965     {
1966       bool view= false;
1967       if (table_list)
1968       {
1969         table_list= table_list->top_table();
1970         view= table_list->is_view();
1971       }
1972       if (view)
1973         (*field)->set_warning(Sql_condition::SL_WARNING,
1974                               ER_NO_DEFAULT_FOR_VIEW_FIELD, 1,
1975                               table_list->view_db.str,
1976                               table_list->view_name.str);
1977       else
1978         (*field)->set_warning(Sql_condition::SL_WARNING,
1979                               ER_NO_DEFAULT_FOR_FIELD, 1);
1980       err= 1;
1981     }
1982   }
1983   bitmap_clear_all(write_set);
1984   return (!thd->lex->is_ignore() && thd->is_strict_mode()) ? err : 0;
1985 }
1986 
1987 
1988 /***************************************************************************
1989   Store records in INSERT ... SELECT *
1990 ***************************************************************************/
1991 
1992 
1993 /*
1994   make insert specific preparation and checks after opening tables
1995 
1996   SYNOPSIS
1997     mysql_insert_select_prepare()
1998     thd         thread handler
1999 
2000   RETURN
2001     FALSE OK
2002     TRUE  Error
2003 */
2004 
mysql_insert_select_prepare(THD * thd)2005 bool Sql_cmd_insert_select::mysql_insert_select_prepare(THD *thd)
2006 {
2007   LEX *lex= thd->lex;
2008   SELECT_LEX *select_lex= lex->select_lex;
2009   DBUG_ENTER("mysql_insert_select_prepare");
2010 
2011   /*
2012     SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
2013     clause if table is VIEW
2014   */
2015   if (mysql_prepare_insert(thd, lex->query_tables, NULL, true))
2016     DBUG_RETURN(true);
2017 
2018   /*
2019     exclude first table from leaf tables list, because it belong to
2020     INSERT
2021   */
2022   assert(select_lex->leaf_tables != NULL);
2023   assert(lex->insert_table == select_lex->leaf_tables->top_table());
2024 
2025   select_lex->leaf_tables= lex->insert_table->next_local;
2026   if (select_lex->leaf_tables != NULL)
2027     select_lex->leaf_tables= select_lex->leaf_tables->first_leaf_table();
2028   select_lex->leaf_table_count-=
2029     lex->insert_table->is_view() ? lex->insert_table->leaf_tables_count() : 1;
2030   DBUG_RETURN(false);
2031 }
2032 
2033 
prepare(List<Item> & values,SELECT_LEX_UNIT * u)2034 int Query_result_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2035 {
2036   DBUG_ENTER("Query_result_insert::prepare");
2037 
2038   LEX *const lex= thd->lex;
2039   bool res;
2040   SELECT_LEX *const lex_current_select_save= lex->current_select();
2041   const enum_duplicates duplicate_handling= info.get_duplicate_handling();
2042 
2043   unit= u;
2044 
2045   /*
2046     Since table in which we are going to insert is added to the first
2047     select, LEX::current_select() should point to the first select while
2048     we are fixing fields from insert list.
2049   */
2050   lex->set_current_select(lex->select_lex);
2051 
2052   res= check_insert_fields(thd, table_list, *fields, values.elements, true,
2053                            !insert_into_view);
2054   if (!res)
2055     res= setup_fields(thd, Ref_ptr_array(), values, SELECT_ACL, NULL,
2056                       false, false);
2057 
2058   if (!res && lex->insert_table_leaf->table->has_gcol())
2059     res= validate_gc_assignment(thd, fields, &values,
2060                                 lex->insert_table_leaf->table);
2061 
2062   if (duplicate_handling == DUP_UPDATE && !res)
2063   {
2064     Name_resolution_context *const context= &lex->select_lex->context;
2065     Name_resolution_context_state ctx_state;
2066 
2067     /* Save the state of the current name resolution context. */
2068     ctx_state.save_state(context, table_list);
2069 
2070     /* Perform name resolution only in the first table - 'table_list'. */
2071     table_list->next_local= NULL;
2072     context->resolve_in_table_list_only(table_list);
2073 
2074 #ifndef NO_EMBEDDED_ACCESS_CHECKS
2075     table_list->set_want_privilege(UPDATE_ACL);
2076 #endif
2077     if (!res)
2078       res= setup_fields(thd, Ref_ptr_array(), *update.get_changed_columns(),
2079                         UPDATE_ACL, NULL, false, true);
2080 
2081     if (!res && lex->insert_table_leaf->table->has_gcol())
2082       res= validate_gc_assignment(thd, update.get_changed_columns(),
2083                                   update.update_values,
2084                                   lex->insert_table_leaf->table);
2085     /*
2086       When we are not using GROUP BY and there are no ungrouped aggregate
2087       functions
2088       we can refer to other tables in the ON DUPLICATE KEY part.
2089       We use next_name_resolution_table destructively, so check it first
2090       (views?).
2091     */
2092     assert (!table_list->next_name_resolution_table);
2093     if (lex->select_lex->group_list.elements == 0 &&
2094         !lex->select_lex->with_sum_func)
2095     {
2096       /*
2097         We must make a single context out of the two separate name resolution
2098         contexts:
2099         the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
2100         To do that we must concatenate the two lists
2101       */
2102       table_list->next_name_resolution_table=
2103         ctx_state.get_first_name_resolution_table();
2104     }
2105     lex->in_update_value_clause= true;
2106     if (!res)
2107       res= setup_fields(thd, Ref_ptr_array(), *update.update_values,
2108                         SELECT_ACL, NULL, false, false);
2109     lex->in_update_value_clause= false;
2110     if (!res)
2111     {
2112       /*
2113         Traverse the update values list and substitute fields from the
2114         select for references (Item_ref objects) to them. This is done in
2115         order to get correct values from those fields when the select
2116         employs a temporary table.
2117       */
2118       List_iterator<Item> li(*update.update_values);
2119       Item *item;
2120 
2121       while ((item= li++))
2122       {
2123         item->transform(&Item::update_value_transformer,
2124                         (uchar*)lex->current_select());
2125       }
2126     }
2127 
2128     /* Restore the current context. */
2129     ctx_state.restore_state(context, table_list);
2130   }
2131 
2132   lex->set_current_select(lex_current_select_save);
2133   if (res)
2134     DBUG_RETURN(1);
2135   /*
2136     if it is INSERT into join view then check_insert_fields already found
2137     real table for insert
2138   */
2139   table= lex->insert_table_leaf->table;
2140 
2141   if (duplicate_handling == DUP_UPDATE || duplicate_handling == DUP_REPLACE)
2142     prepare_for_positional_update(table, table_list);
2143 
2144   if (info.add_function_default_columns(table, table->write_set))
2145     DBUG_RETURN(1);
2146   if ((duplicate_handling == DUP_UPDATE) &&
2147       update.add_function_default_columns(table, table->write_set))
2148     DBUG_RETURN(1);
2149 
2150   /*
2151     Is table which we are changing used somewhere in other parts of
2152     query
2153   */
2154   if (unique_table(thd, lex->insert_table_leaf, table_list->next_global, 0))
2155   {
2156     // Using same table for INSERT and SELECT
2157     /*
2158       @todo: Use add_base_options instead of add_active_options, and only
2159       if first_execution is true; but this can be implemented only when this
2160       function is called before first_execution is set to true.
2161       if (lex->current_select()->first_execution)
2162         lex->current_select()->add_base_options(OPTION_BUFFER_RESULT);
2163     */
2164     lex->current_select()->add_active_options(OPTION_BUFFER_RESULT);
2165   }
2166   restore_record(table,s->default_values);		// Get empty record
2167   table->next_number_field=table->found_next_number_field;
2168 
2169 #ifdef HAVE_REPLICATION
2170   if (thd->slave_thread)
2171   {
2172     /* Get SQL thread's rli, even for a slave worker thread */
2173     Relay_log_info *c_rli= thd->rli_slave->get_c_rli();
2174     assert(c_rli != NULL);
2175     if (duplicate_handling == DUP_UPDATE &&
2176         table->next_number_field != NULL &&
2177         rpl_master_has_bug(c_rli, 24432, TRUE, NULL, NULL))
2178       DBUG_RETURN(1);
2179   }
2180 #endif
2181 
2182   thd->cuted_fields=0;
2183   if (thd->lex->is_ignore() || duplicate_handling != DUP_ERROR)
2184     table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2185   if (duplicate_handling == DUP_REPLACE &&
2186       (!table->triggers || !table->triggers->has_delete_triggers()))
2187     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2188   if (duplicate_handling == DUP_UPDATE)
2189     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2190 
2191   /* Decide the logging format prior to preparing table/record metadata */
2192   res= res || thd->decide_logging_format(table_list);
2193   if (!res)
2194   {
2195      prepare_triggers_for_insert_stmt(table);
2196   }
2197 
2198   for (Field** next_field= table->field; *next_field; ++next_field)
2199   {
2200     (*next_field)->reset_warnings();
2201     (*next_field)->reset_tmp_null();
2202   }
2203 
2204   DBUG_RETURN(res ? -1 : 0);
2205 }
2206 
2207 
2208 /*
2209   Finish the preparation of the result table.
2210 
2211   SYNOPSIS
2212     Query_result_insert::prepare2()
2213     void
2214 
2215   DESCRIPTION
2216     If the result table is the same as one of the source tables (INSERT SELECT),
2217     the result table is not finally prepared at the join prepair phase.
2218     Do the final preparation now.
2219 
2220   RETURN
2221     0   OK
2222 */
2223 
prepare2()2224 int Query_result_insert::prepare2()
2225 {
2226   DBUG_ENTER("Query_result_insert::prepare2");
2227   if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
2228       !thd->lex->describe)
2229   {
2230     assert(!bulk_insert_started);
2231     // TODO: Is there no better estimation than 0 == Unknown number of rows?
2232     table->file->ha_start_bulk_insert((ha_rows) 0);
2233     bulk_insert_started= true;
2234   }
2235   DBUG_RETURN(0);
2236 }
2237 
2238 
cleanup()2239 void Query_result_insert::cleanup()
2240 {
2241   /*
2242     Query_result_insert/Query_result_create are never re-used
2243     in prepared statement
2244   */
2245   assert(0);
2246 }
2247 
2248 
~Query_result_insert()2249 Query_result_insert::~Query_result_insert()
2250 {
2251   DBUG_ENTER("~Query_result_insert");
2252   if (table)
2253   {
2254     table->next_number_field=0;
2255     table->auto_increment_field_not_null= FALSE;
2256     table->file->ha_reset();
2257   }
2258   thd->count_cuted_fields= CHECK_FIELD_IGNORE;
2259   DBUG_VOID_RETURN;
2260 }
2261 
2262 
send_data(List<Item> & values)2263 bool Query_result_insert::send_data(List<Item> &values)
2264 {
2265   DBUG_ENTER("Query_result_insert::send_data");
2266   bool error=0;
2267 
2268   if (unit->offset_limit_cnt)
2269   {						// using limit offset,count
2270     unit->offset_limit_cnt--;
2271     DBUG_RETURN(0);
2272   }
2273 
2274   thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
2275   store_values(values);
2276   thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
2277   if (thd->is_error())
2278   {
2279     table->auto_increment_field_not_null= FALSE;
2280     DBUG_RETURN(1);
2281   }
2282   if (table_list)                               // Not CREATE ... SELECT
2283   {
2284     switch (table_list->view_check_option(thd)) {
2285     case VIEW_CHECK_SKIP:
2286       DBUG_RETURN(0);
2287     case VIEW_CHECK_ERROR:
2288       DBUG_RETURN(1);
2289     }
2290   }
2291 
2292   // Release latches in case bulk insert takes a long time
2293   ha_release_temporary_latches(thd);
2294 
2295   error= write_record(thd, table, &info, &update);
2296   table->auto_increment_field_not_null= FALSE;
2297 
2298   DEBUG_SYNC(thd, "create_select_after_write_rows_event");
2299 
2300   if (!error &&
2301       (table->triggers || info.get_duplicate_handling() == DUP_UPDATE))
2302   {
2303     /*
2304       Restore fields of the record since it is possible that they were
2305       changed by ON DUPLICATE KEY UPDATE clause.
2306       If triggers exist then whey can modify some fields which were not
2307       originally touched by INSERT ... SELECT, so we have to restore
2308       their original values for the next row.
2309     */
2310     restore_record(table, s->default_values);
2311   }
2312   if (!error && table->next_number_field)
2313   {
2314     /*
2315       If no value has been autogenerated so far, we need to remember the
2316       value we just saw, we may need to send it to client in the end.
2317     */
2318     if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
2319       autoinc_value_of_last_inserted_row= table->next_number_field->val_int();
2320     /*
2321       Clear auto-increment field for the next record, if triggers are used
2322       we will clear it twice, but this should be cheap.
2323     */
2324     table->next_number_field->reset();
2325   }
2326 
2327   DBUG_RETURN(error);
2328 }
2329 
2330 
store_values(List<Item> & values)2331 void Query_result_insert::store_values(List<Item> &values)
2332 {
2333   if (fields->elements)
2334   {
2335     restore_record(table, s->default_values);
2336     if (!validate_default_values_of_unset_fields(thd, table))
2337       fill_record_n_invoke_before_triggers(thd, &info, *fields, values,
2338                                            table, TRG_EVENT_INSERT,
2339                                            table->s->fields);
2340   }
2341   else
2342     fill_record_n_invoke_before_triggers(thd, table->field, values,
2343                                          table, TRG_EVENT_INSERT,
2344                                          table->s->fields);
2345 
2346   check_that_all_fields_are_given_values(thd, table, table_list);
2347 }
2348 
send_error(uint errcode,const char * err)2349 void Query_result_insert::send_error(uint errcode,const char *err)
2350 {
2351   DBUG_ENTER("Query_result_insert::send_error");
2352 
2353   my_message(errcode, err, MYF(0));
2354 
2355   DBUG_VOID_RETURN;
2356 }
2357 
2358 
send_eof()2359 bool Query_result_insert::send_eof()
2360 {
2361   int error;
2362   bool const trans_table= table->file->has_transactions();
2363   ulonglong id, row_count;
2364   bool changed;
2365   THD::killed_state killed_status= thd->killed;
2366   DBUG_ENTER("Query_result_insert::send_eof");
2367   DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
2368                        trans_table, table->file->table_type()));
2369 
2370   error= (bulk_insert_started ?
2371           table->file->ha_end_bulk_insert() : 0);
2372   if (!error && thd->is_error())
2373     error= thd->get_stmt_da()->mysql_errno();
2374 
2375   table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
2376   table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
2377 
2378   changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
2379   if (changed)
2380   {
2381     /*
2382       We must invalidate the table in the query cache before binlog writing
2383       and ha_autocommit_or_rollback.
2384     */
2385     query_cache.invalidate(thd, table, TRUE);
2386   }
2387 
2388   assert(trans_table || !changed ||
2389          thd->get_transaction()->cannot_safely_rollback(
2390                                                         Transaction_ctx::STMT));
2391 
2392   /*
2393     Write to binlog before commiting transaction.  No statement will
2394     be written by the binlog_query() below in RBR mode.  All the
2395     events are in the transaction cache and will be written when
2396     ha_autocommit_or_rollback() is issued below.
2397   */
2398   if (mysql_bin_log.is_open() &&
2399       (!error || thd->get_transaction()->cannot_safely_rollback(
2400         Transaction_ctx::STMT)))
2401   {
2402     int errcode= 0;
2403     if (!error)
2404       thd->clear_error();
2405     else
2406       errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);
2407     if (thd->binlog_query(THD::ROW_QUERY_TYPE,
2408                           thd->query().str, thd->query().length,
2409                           trans_table, false, false, errcode))
2410     {
2411       table->file->ha_release_auto_increment();
2412       DBUG_RETURN(1);
2413     }
2414   }
2415   table->file->ha_release_auto_increment();
2416 
2417   if (error)
2418   {
2419     myf error_flags= MYF(0);
2420     if (table->file->is_fatal_error(my_errno()))
2421       error_flags|= ME_FATALERROR;
2422 
2423     table->file->print_error(my_errno(), error_flags);
2424     DBUG_RETURN(1);
2425   }
2426 
2427   /*
2428     For the strict_mode call of push_warning above results to set
2429     error in Diagnostic_area. Therefore it is necessary to check whether
2430     the error was set and leave method if it is true. If we didn't do
2431     so we would failed later when my_ok is called.
2432   */
2433   if (thd->get_stmt_da()->is_error())
2434     DBUG_RETURN(true);
2435 
2436   char buff[160];
2437   if (thd->lex->is_ignore())
2438     my_snprintf(buff, sizeof(buff),
2439                 ER(ER_INSERT_INFO), (long) info.stats.records,
2440                 (long) (info.stats.records - info.stats.copied),
2441                 (long) thd->get_stmt_da()->current_statement_cond_count());
2442   else
2443     my_snprintf(buff, sizeof(buff),
2444                 ER(ER_INSERT_INFO), (long) info.stats.records,
2445                 (long) (info.stats.deleted+info.stats.updated),
2446                 (long) thd->get_stmt_da()->current_statement_cond_count());
2447   row_count= info.stats.copied + info.stats.deleted +
2448              (thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS) ?
2449               info.stats.touched : info.stats.updated);
2450   id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
2451     thd->first_successful_insert_id_in_cur_stmt :
2452     (thd->arg_of_last_insert_id_function ?
2453      thd->first_successful_insert_id_in_prev_stmt :
2454      (info.stats.copied ? autoinc_value_of_last_inserted_row : 0));
2455   my_ok(thd, row_count, id, buff);
2456   DBUG_RETURN(0);
2457 }
2458 
2459 
abort_result_set()2460 void Query_result_insert::abort_result_set()
2461 {
2462   DBUG_ENTER("Query_result_insert::abort_result_set");
2463   /*
2464     If the creation of the table failed (due to a syntax error, for
2465     example), no table will have been opened and therefore 'table'
2466     will be NULL. In that case, we still need to execute the rollback
2467     and the end of the function.
2468    */
2469   if (table)
2470   {
2471     bool changed, transactional_table;
2472     /*
2473       Try to end the bulk insert which might have been started before.
2474       We don't need to do this if we are in prelocked mode (since we
2475       don't use bulk insert in this case). Also we should not do this
2476       if tables are not locked yet (bulk insert is not started yet
2477       in this case).
2478     */
2479     if (bulk_insert_started)
2480       table->file->ha_end_bulk_insert();
2481 
2482     /*
2483       If at least one row has been inserted/modified and will stay in
2484       the table (the table doesn't have transactions) we must write to
2485       the binlog (and the error code will make the slave stop).
2486 
2487       For many errors (example: we got a duplicate key error while
2488       inserting into a MyISAM table), no row will be added to the table,
2489       so passing the error to the slave will not help since there will
2490       be an error code mismatch (the inserts will succeed on the slave
2491       with no error).
2492 
2493       If table creation failed, the number of rows modified will also be
2494       zero, so no check for that is made.
2495     */
2496     changed= (info.stats.copied || info.stats.deleted || info.stats.updated);
2497     transactional_table= table->file->has_transactions();
2498     if (thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::STMT))
2499     {
2500         if (mysql_bin_log.is_open())
2501         {
2502           int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
2503           /* error of writing binary log is ignored */
2504           (void) thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query().str,
2505                                    thd->query().length,
2506                                    transactional_table, FALSE, FALSE, errcode);
2507         }
2508 	if (changed)
2509 	  query_cache.invalidate(thd, table, TRUE);
2510     }
2511     assert(transactional_table || !changed ||
2512            thd->get_transaction()->cannot_safely_rollback(
2513                                                           Transaction_ctx::STMT));
2514     table->file->ha_release_auto_increment();
2515   }
2516 
2517   DBUG_VOID_RETURN;
2518 }
2519 
2520 
2521 /***************************************************************************
2522   CREATE TABLE (SELECT) ...
2523 ***************************************************************************/
2524 
2525 /**
2526   Create table from lists of fields and items (or just return TABLE
2527   object for pre-opened existing table). Used by CREATE SELECT.
2528 
2529   Let "source table" be the table in the SELECT part.
2530 
2531   Let "source table columns" be the set of columns in the SELECT list.
2532 
2533   An interesting peculiarity in the syntax CREATE TABLE (<columns>) SELECT is
2534   that function defaults are stripped from the the source table columns, but
2535   not from the additional columns defined in the CREATE TABLE part. The first
2536   @c TIMESTAMP column there is also subject to promotion to @c TIMESTAMP @c
2537   DEFAULT @c CURRENT_TIMESTAMP @c ON @c UPDATE @c CURRENT_TIMESTAMP, as usual.
2538 
2539 
2540   @param thd           [in]     Thread object
2541   @param create_info   [in]     Create information (like MAX_ROWS, ENGINE or
2542                                 temporary table flag)
2543   @param create_table  [in]     Pointer to TABLE_LIST object providing database
2544                                 and name for table to be created or to be open
2545   @param alter_info    [in/out] Initial list of columns and indexes for the
2546                                 table to be created
2547   @param items         [in]     The source table columns. Corresponding column
2548                                 definitions (Create_field's) will be added to
2549                                 the end of alter_info->create_list.
2550   @param lock          [out]    Pointer to the MYSQL_LOCK object for table
2551                                 created will be returned in this parameter.
2552                                 Since this table is not included in THD::lock
2553                                 caller is responsible for explicitly unlocking
2554                                 this table.
2555   @param hooks         [in]     Hooks to be invoked before and after obtaining
2556                                 table lock on the table being created.
2557 
2558   @note
2559     This function assumes that either table exists and was pre-opened and
2560     locked at open_and_lock_tables() stage (and in this case we just emit
2561     error or warning and return pre-opened TABLE object) or an exclusive
2562     metadata lock was acquired on table so we can safely create, open and
2563     lock table in it (we don't acquire metadata lock if this create is
2564     for temporary table).
2565 
2566   @note
2567     Since this function contains some logic specific to CREATE TABLE ...
2568     SELECT it should be changed before it can be used in other contexts.
2569 
2570   @retval non-zero  Pointer to TABLE object for table created or opened
2571   @retval 0         Error
2572 */
2573 
create_table_from_items(THD * thd,HA_CREATE_INFO * create_info,TABLE_LIST * create_table,Alter_info * alter_info,List<Item> * items)2574 static TABLE *create_table_from_items(THD *thd, HA_CREATE_INFO *create_info,
2575                                       TABLE_LIST *create_table,
2576                                       Alter_info *alter_info,
2577                                       List<Item> *items)
2578 {
2579   TABLE tmp_table;		// Used during 'Create_field()'
2580   TABLE_SHARE share;
2581   TABLE *table= 0;
2582   uint select_field_count= items->elements;
2583   /* Add selected items to field list */
2584   List_iterator_fast<Item> it(*items);
2585   Item *item;
2586 
2587   DBUG_ENTER("create_table_from_items");
2588 
2589   tmp_table.alias= 0;
2590   tmp_table.s= &share;
2591   init_tmp_table_share(thd, &share, "", 0, "", "");
2592 
2593   tmp_table.s->db_create_options=0;
2594   tmp_table.s->db_low_byte_first=
2595         MY_TEST(create_info->db_type == myisam_hton ||
2596                 create_info->db_type == heap_hton);
2597   tmp_table.reset_null_row();
2598 
2599   if (!thd->variables.explicit_defaults_for_timestamp)
2600     promote_first_timestamp_column(&alter_info->create_list);
2601 
2602   while ((item=it++))
2603   {
2604     Field *tmp_table_field;
2605     if (item->type() == Item::FUNC_ITEM)
2606     {
2607       if (item->result_type() != STRING_RESULT)
2608         tmp_table_field= item->tmp_table_field(&tmp_table);
2609       else
2610         tmp_table_field= item->tmp_table_field_from_field_type(&tmp_table,
2611                                                                false);
2612     }
2613     else
2614     {
2615       Field *from_field, *default_field;
2616       tmp_table_field= create_tmp_field(thd, &tmp_table, item, item->type(),
2617                                         NULL,
2618                                         &from_field, &default_field,
2619                                         false, false, false, false);
2620     }
2621 
2622     if (!tmp_table_field)
2623       DBUG_RETURN(NULL);
2624 
2625     Field *table_field;
2626 
2627     switch (item->type())
2628     {
2629     /*
2630       We have to take into account both the real table's fields and
2631       pseudo-fields used in trigger's body. These fields are used
2632       to copy defaults values later inside constructor of
2633       the class Create_field.
2634     */
2635     case Item::FIELD_ITEM:
2636     case Item::TRIGGER_FIELD_ITEM:
2637       table_field= ((Item_field *) item)->field;
2638       break;
2639     default:
2640       {
2641         /*
2642          If the expression is of temporal type having date and non-nullable,
2643          a zero date is generated. If in strict mode, then zero date is
2644          invalid. For such cases no default is generated.
2645        */
2646         table_field= NULL;
2647         if (tmp_table_field->is_temporal_with_date() &&
2648             thd->is_strict_mode() && !item->maybe_null)
2649           tmp_table_field->flags|= NO_DEFAULT_VALUE_FLAG;
2650       }
2651     }
2652 
2653     assert(tmp_table_field->gcol_info== NULL && tmp_table_field->stored_in_db);
2654     Create_field *cr_field= new Create_field(tmp_table_field, table_field);
2655 
2656     if (!cr_field)
2657       DBUG_RETURN(NULL);
2658 
2659     if (item->maybe_null)
2660       cr_field->flags &= ~NOT_NULL_FLAG;
2661     alter_info->create_list.push_back(cr_field);
2662   }
2663 
2664   DEBUG_SYNC(thd,"create_table_select_before_create");
2665 
2666   /*
2667     Create and lock table.
2668 
2669     Note that we either creating (or opening existing) temporary table or
2670     creating base table on which name we have exclusive lock. So code below
2671     should not cause deadlocks or races.
2672 
2673     We don't log the statement, it will be logged later.
2674 
2675     If this is a HEAP table, the automatic DELETE FROM which is written to the
2676     binlog when a HEAP table is opened for the first time since startup, must
2677     not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
2678     don't want to delete from it) 2) it would be written before the CREATE
2679     TABLE, which is a wrong order. So we keep binary logging disabled when we
2680     open_table().
2681   */
2682   {
2683     if (!mysql_create_table_no_lock(thd, create_table->db,
2684                                     create_table->table_name,
2685                                     create_info, alter_info,
2686                                     select_field_count, NULL))
2687     {
2688       DEBUG_SYNC(thd,"create_table_select_before_open");
2689 
2690       if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
2691       {
2692         Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
2693         /*
2694           Here we open the destination table, on which we already have
2695           an exclusive metadata lock.
2696         */
2697         if (open_table(thd, create_table, &ot_ctx))
2698         {
2699           quick_rm_table(thd, create_info->db_type, create_table->db,
2700                          table_case_name(create_info, create_table->table_name),
2701                          0);
2702         }
2703         else
2704           table= create_table->table;
2705       }
2706       else
2707       {
2708         if (open_temporary_table(thd, create_table))
2709         {
2710           /*
2711             This shouldn't happen as creation of temporary table should make
2712             it preparable for open. Anyway we can't drop temporary table if
2713             we are unable to fint it.
2714           */
2715           assert(0);
2716         }
2717         else
2718         {
2719           table= create_table->table;
2720         }
2721       }
2722     }
2723     if (!table)                                   // open failed
2724       DBUG_RETURN(NULL);
2725   }
2726   DBUG_RETURN(table);
2727 }
2728 
2729 
2730 /**
2731   Create the new table from the selected items.
2732 
2733   @param values  List of items to be used as new columns
2734   @param u       Select
2735 
2736   @return Operation status.
2737     @retval 0   Success
2738     @retval !=0 Failure
2739 */
2740 
prepare(List<Item> & values,SELECT_LEX_UNIT * u)2741 int Query_result_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
2742 {
2743   DBUG_ENTER("Query_result_create::prepare");
2744 
2745   unit= u;
2746   assert(create_table->table == NULL);
2747 
2748   DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
2749 
2750   if (!(table= create_table_from_items(thd, create_info, create_table,
2751                                        alter_info, &values)))
2752     /* abort() deletes table */
2753     DBUG_RETURN(-1);
2754 
2755   if (table->s->fields < values.elements)
2756   {
2757     my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
2758     DBUG_RETURN(-1);
2759   }
2760   /* First field to copy */
2761   field= table->field+table->s->fields - values.elements;
2762   for (Field **f= field ; *f ; f++)
2763   {
2764     if ((*f)->gcol_info)
2765     {
2766       /*
2767         Generated columns are not allowed to be given a value for CREATE TABLE ..
2768         SELECT statment.
2769       */
2770       my_error(ER_NON_DEFAULT_VALUE_FOR_GENERATED_COLUMN, MYF(0),
2771                (*f)->field_name, (*f)->table->s->table_name.str);
2772       DBUG_RETURN(true);
2773     }
2774   }
2775 
2776   // Turn off function defaults for columns filled from SELECT list:
2777   const bool retval= info.ignore_last_columns(table, values.elements);
2778   DBUG_RETURN(retval);
2779 }
2780 
2781 
2782 /**
2783   Lock the newly created table and prepare it for insertion.
2784 
2785   @return Operation status.
2786     @retval 0   Success
2787     @retval !=0 Failure
2788 */
2789 
prepare2()2790 int Query_result_create::prepare2()
2791 {
2792   DBUG_ENTER("Query_result_create::prepare2");
2793   DEBUG_SYNC(thd,"create_table_select_before_lock");
2794 
2795   MYSQL_LOCK *extra_lock= NULL;
2796   /*
2797     For row-based replication, the CREATE-SELECT statement is written
2798     in two pieces: the first one contain the CREATE TABLE statement
2799     necessary to create the table and the second part contain the rows
2800     that should go into the table.
2801 
2802     For non-temporary tables, the start of the CREATE-SELECT
2803     implicitly commits the previous transaction, and all events
2804     forming the statement will be stored the transaction cache. At end
2805     of the statement, the entire statement is committed as a
2806     transaction, and all events are written to the binary log.
2807 
2808     On the master, the table is locked for the duration of the
2809     statement, but since the CREATE part is replicated as a simple
2810     statement, there is no way to lock the table for accesses on the
2811     slave.  Hence, we have to hold on to the CREATE part of the
2812     statement until the statement has finished.
2813    */
2814   class MY_HOOKS : public TABLEOP_HOOKS {
2815   public:
2816     MY_HOOKS(Query_result_create *x, TABLE_LIST *create_table_arg,
2817              TABLE_LIST *select_tables_arg)
2818       : ptr(x),
2819         create_table(create_table_arg),
2820         select_tables(select_tables_arg)
2821       {
2822       }
2823 
2824   private:
2825     virtual int do_postlock(TABLE **tables, uint count)
2826     {
2827       int error;
2828       THD *thd= const_cast<THD*>(ptr->get_thd());
2829       TABLE_LIST *save_next_global= create_table->next_global;
2830 
2831       create_table->next_global= select_tables;
2832 
2833       error= thd->decide_logging_format(create_table);
2834 
2835       create_table->next_global= save_next_global;
2836 
2837       if (error)
2838         return error;
2839 
2840       TABLE const *const table = *tables;
2841       create_table->table->set_binlog_drop_if_temp(
2842         !thd->is_current_stmt_binlog_disabled()
2843         && !thd->is_current_stmt_binlog_format_row());
2844 
2845       if (thd->is_current_stmt_binlog_format_row()  &&
2846           !table->s->tmp_table)
2847       {
2848         if (int error= ptr->binlog_show_create_table(tables, count))
2849           return error;
2850       }
2851       return 0;
2852     }
2853     Query_result_create *ptr;
2854     TABLE_LIST *create_table;
2855     TABLE_LIST *select_tables;
2856   };
2857 
2858   MY_HOOKS hooks(this, create_table, select_tables);
2859 
2860   table->reginfo.lock_type=TL_WRITE;
2861   hooks.prelock(&table, 1);                    // Call prelock hooks
2862   /*
2863     mysql_lock_tables() below should never fail with request to reopen table
2864     since it won't wait for the table lock (we have exclusive metadata lock on
2865     the table) and thus can't get aborted.
2866   */
2867   if (! (extra_lock= mysql_lock_tables(thd, &table, 1, 0)) ||
2868         hooks.postlock(&table, 1))
2869   {
2870     if (extra_lock)
2871     {
2872       mysql_unlock_tables(thd, extra_lock);
2873       extra_lock= 0;
2874     }
2875     drop_open_table(thd, table, create_table->db, create_table->table_name);
2876     table= 0;
2877     DBUG_RETURN(1);
2878   }
2879   if (extra_lock)
2880   {
2881     assert(m_plock == NULL);
2882 
2883     if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
2884       m_plock= &m_lock;
2885     else
2886       m_plock= &thd->extra_lock;
2887 
2888     *m_plock= extra_lock;
2889   }
2890   /* Mark all fields that are given values */
2891   for (Field **f= field ; *f ; f++)
2892   {
2893     bitmap_set_bit(table->write_set, (*f)->field_index);
2894     bitmap_set_bit(table->fields_set_during_insert, (*f)->field_index);
2895   }
2896 
2897   // Set up an empty bitmap of function defaults
2898   if (info.add_function_default_columns(table, table->write_set))
2899     DBUG_RETURN(1);
2900 
2901   if (info.add_function_default_columns(table,
2902                                         table->fields_set_during_insert))
2903     DBUG_RETURN(1);
2904 
2905   table->next_number_field=table->found_next_number_field;
2906 
2907   restore_record(table,s->default_values);      // Get empty record
2908   thd->cuted_fields=0;
2909 
2910   const enum_duplicates duplicate_handling= info.get_duplicate_handling();
2911 
2912   if (thd->lex->is_ignore() || duplicate_handling != DUP_ERROR)
2913     table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
2914   if (duplicate_handling == DUP_REPLACE &&
2915       (!table->triggers || !table->triggers->has_delete_triggers()))
2916     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
2917   if (duplicate_handling == DUP_UPDATE)
2918     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
2919   if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
2920   {
2921     table->file->ha_start_bulk_insert((ha_rows) 0);
2922     bulk_insert_started= true;
2923   }
2924 
2925   enum_check_fields save_count_cuted_fields= thd->count_cuted_fields;
2926   thd->count_cuted_fields= CHECK_FIELD_WARN;
2927 
2928   if (check_that_all_fields_are_given_values(thd, table, table_list))
2929     DBUG_RETURN(1);
2930 
2931   thd->count_cuted_fields= save_count_cuted_fields;
2932 
2933   table->mark_columns_needed_for_insert();
2934   table->file->extra(HA_EXTRA_WRITE_CACHE);
2935   DBUG_RETURN(0);
2936 }
2937 
2938 
binlog_show_create_table(TABLE ** tables,uint count)2939 int Query_result_create::binlog_show_create_table(TABLE **tables, uint count)
2940 {
2941   DBUG_ENTER("select_create::binlog_show_create_table");
2942   /*
2943     Note 1: In RBR mode, we generate a CREATE TABLE statement for the
2944     created table by calling store_create_info() (behaves as SHOW
2945     CREATE TABLE). The 'CREATE TABLE' event will be put in the
2946     binlog statement cache with an Anonymous_gtid_log_event, and
2947     any subsequent events (e.g., table-map events and rows event)
2948     will be put in the binlog transaction cache with an
2949     Anonymous_gtid_log_event. So that the 'CREATE...SELECT'
2950     statement is logged as:
2951       Anonymous_gtid_log_event
2952       CREATE TABLE event
2953       Anonymous_gtid_log_event
2954       BEGIN
2955       rows event
2956       COMMIT
2957 
2958     We write the CREATE TABLE statement here and not in prepare()
2959     since there potentially are sub-selects or accesses to information
2960     schema that will do a close_thread_tables(), destroying the
2961     statement transaction cache.
2962   */
2963   assert(thd->is_current_stmt_binlog_format_row());
2964   assert(tables && *tables && count > 0);
2965 
2966   char buf[2048];
2967   String query(buf, sizeof(buf), system_charset_info);
2968   int result;
2969   TABLE_LIST tmp_table_list;
2970 
2971   tmp_table_list.table = *tables;
2972   query.length(0);      // Have to zero it since constructor doesn't
2973 
2974   result= store_create_info(thd, &tmp_table_list, &query, create_info,
2975                             /* show_database */ TRUE);
2976   assert(result == 0); /* store_create_info() always return 0 */
2977 
2978   if (mysql_bin_log.is_open())
2979   {
2980     DEBUG_SYNC(thd, "create_select_before_write_create_event");
2981     int errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
2982     result= thd->binlog_query(THD::STMT_QUERY_TYPE,
2983                               query.ptr(), query.length(),
2984                               /* is_trans */ false,
2985                               /* direct */ true,
2986                               /* suppress_use */ FALSE,
2987                               errcode);
2988     DEBUG_SYNC(thd, "create_select_after_write_create_event");
2989   }
2990   DBUG_RETURN(result);
2991 }
2992 
2993 
store_values(List<Item> & values)2994 void Query_result_create::store_values(List<Item> &values)
2995 {
2996   fill_record_n_invoke_before_triggers(thd, field, values,
2997                                        table, TRG_EVENT_INSERT,
2998                                        table->s->fields);
2999 }
3000 
3001 
send_error(uint errcode,const char * err)3002 void Query_result_create::send_error(uint errcode,const char *err)
3003 {
3004   DBUG_ENTER("Query_result_create::send_error");
3005 
3006   DBUG_PRINT("info",
3007              ("Current statement %s row-based",
3008               thd->is_current_stmt_binlog_format_row() ? "is" : "is NOT"));
3009   DBUG_PRINT("info",
3010              ("Current table (at 0x%lu) %s a temporary (or non-existant) table",
3011               (ulong) table,
3012               table && !table->s->tmp_table ? "is NOT" : "is"));
3013   /*
3014     This will execute any rollbacks that are necessary before writing
3015     the transcation cache.
3016 
3017     We disable the binary log since nothing should be written to the
3018     binary log.  This disabling is important, since we potentially do
3019     a "roll back" of non-transactional tables by removing the table,
3020     and the actual rollback might generate events that should not be
3021     written to the binary log.
3022 
3023   */
3024   tmp_disable_binlog(thd);
3025   Query_result_insert::send_error(errcode, err);
3026   reenable_binlog(thd);
3027 
3028   DBUG_VOID_RETURN;
3029 }
3030 
3031 
send_eof()3032 bool Query_result_create::send_eof()
3033 {
3034   /*
3035     The routine that writes the statement in the binary log
3036     is in Query_result_insert::send_eof(). For that reason, we
3037     mark the flag at this point.
3038   */
3039   if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
3040     thd->get_transaction()->mark_created_temp_table(Transaction_ctx::STMT);
3041 
3042   bool tmp= Query_result_insert::send_eof();
3043   if (tmp)
3044     abort_result_set();
3045   else
3046   {
3047     /*
3048       Do an implicit commit at end of statement for non-temporary
3049       tables.  This can fail, but we should unlock the table
3050       nevertheless.
3051     */
3052     if (!table->s->tmp_table)
3053     {
3054       trans_commit_stmt(thd);
3055       trans_commit_implicit(thd);
3056     }
3057 
3058     table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3059     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3060     if (m_plock)
3061     {
3062       mysql_unlock_tables(thd, *m_plock);
3063       *m_plock= NULL;
3064       m_plock= NULL;
3065     }
3066   }
3067   return tmp;
3068 }
3069 
3070 
abort_result_set()3071 void Query_result_create::abort_result_set()
3072 {
3073   DBUG_ENTER("Query_result_create::abort_result_set");
3074 
3075   /*
3076     In Query_result_insert::abort_result_set() we roll back the statement, including
3077     truncating the transaction cache of the binary log. To do this, we
3078     pretend that the statement is transactional, even though it might
3079     be the case that it was not.
3080 
3081     We roll back the statement prior to deleting the table and prior
3082     to releasing the lock on the table, since there might be potential
3083     for failure if the rollback is executed after the drop or after
3084     unlocking the table.
3085 
3086     We also roll back the statement regardless of whether the creation
3087     of the table succeeded or not, since we need to reset the binary
3088     log state.
3089   */
3090   tmp_disable_binlog(thd);
3091   Query_result_insert::abort_result_set();
3092   thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::STMT);
3093   reenable_binlog(thd);
3094   /* possible error of writing binary log is ignored deliberately */
3095   (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
3096 
3097   if (m_plock)
3098   {
3099     mysql_unlock_tables(thd, *m_plock);
3100     *m_plock= NULL;
3101     m_plock= NULL;
3102   }
3103 
3104   if (table)
3105   {
3106     table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3107     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3108     table->auto_increment_field_not_null= FALSE;
3109     drop_open_table(thd, table, create_table->db, create_table->table_name);
3110     table=0;                                    // Safety
3111   }
3112   DBUG_VOID_RETURN;
3113 }
3114 
3115 
execute(THD * thd)3116 bool Sql_cmd_insert::execute(THD *thd)
3117 {
3118   assert(thd->lex->sql_command == SQLCOM_REPLACE ||
3119          thd->lex->sql_command == SQLCOM_INSERT);
3120 
3121   bool res= false;
3122   LEX *const lex= thd->lex;
3123   SELECT_LEX *const select_lex= lex->select_lex;
3124   TABLE_LIST *const first_table= select_lex->get_table_list();
3125   TABLE_LIST *const all_tables= first_table;
3126 
3127   if (open_temporary_tables(thd, all_tables))
3128     return true;
3129 
3130   if (insert_precheck(thd, all_tables))
3131     return true;
3132 
3133   /* Push ignore / strict error handler */
3134   Ignore_error_handler ignore_handler;
3135   Strict_error_handler strict_handler;
3136   if (thd->lex->is_ignore())
3137     thd->push_internal_handler(&ignore_handler);
3138   else if (thd->is_strict_mode())
3139     thd->push_internal_handler(&strict_handler);
3140 
3141   MYSQL_INSERT_START(const_cast<char*>(thd->query().str));
3142   res= mysql_insert(thd, all_tables);
3143   MYSQL_INSERT_DONE(res, (ulong) thd->get_row_count_func());
3144 
3145   /* Pop ignore / strict error handler */
3146   if (thd->lex->is_ignore() || thd->is_strict_mode())
3147     thd->pop_internal_handler();
3148 
3149   /*
3150     If we have inserted into a VIEW, and the base table has
3151     AUTO_INCREMENT column, but this column is not accessible through
3152     a view, then we should restore LAST_INSERT_ID to the value it
3153     had before the statement.
3154   */
3155   if (first_table->is_view() && !first_table->contain_auto_increment)
3156     thd->first_successful_insert_id_in_cur_stmt=
3157       thd->first_successful_insert_id_in_prev_stmt;
3158 
3159   DBUG_EXECUTE_IF("after_mysql_insert",
3160                   {
3161                     const char act[]=
3162                       "now "
3163                       "wait_for signal.continue";
3164                     assert(opt_debug_sync_timeout > 0);
3165                     assert(!debug_sync_set_action(current_thd,
3166                                                   STRING_WITH_LEN(act)));
3167                   };);
3168 
3169   thd->lex->clear_values_map();
3170   return res;
3171 }
3172 
3173 
execute(THD * thd)3174 bool Sql_cmd_insert_select::execute(THD *thd)
3175 {
3176   assert(thd->lex->sql_command == SQLCOM_REPLACE_SELECT ||
3177          thd->lex->sql_command == SQLCOM_INSERT_SELECT);
3178 
3179   bool res= false;
3180   LEX *const lex= thd->lex;
3181   SELECT_LEX *const select_lex= lex->select_lex;
3182   SELECT_LEX_UNIT *const unit= lex->unit;
3183   TABLE_LIST *const first_table= select_lex->get_table_list();
3184   TABLE_LIST *const all_tables= first_table;
3185 
3186   Query_result_insert *sel_result;
3187   if (insert_precheck(thd, all_tables))
3188     return true;
3189   /*
3190     INSERT...SELECT...ON DUPLICATE KEY UPDATE/REPLACE SELECT/
3191     INSERT...IGNORE...SELECT can be unsafe, unless ORDER BY PRIMARY KEY
3192     clause is used in SELECT statement. We therefore use row based
3193     logging if mixed or row based logging is available.
3194     TODO: Check if the order of the output of the select statement is
3195     deterministic. Waiting for BUG#42415
3196   */
3197   if (lex->sql_command == SQLCOM_INSERT_SELECT &&
3198       lex->duplicates == DUP_UPDATE)
3199     lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_SELECT_UPDATE);
3200 
3201   if (lex->sql_command == SQLCOM_INSERT_SELECT && lex->is_ignore())
3202     lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_INSERT_IGNORE_SELECT);
3203 
3204   if (lex->sql_command == SQLCOM_REPLACE_SELECT)
3205     lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_REPLACE_SELECT);
3206 
3207   unit->set_limit(select_lex);
3208 
3209   if (!(res= open_tables_for_query(thd, all_tables, 0)))
3210   {
3211     if (run_before_dml_hook(thd))
3212       return true;
3213 
3214     MYSQL_INSERT_SELECT_START(const_cast<char*>(thd->query().str));
3215 
3216     /* Skip first table, which is the table we are inserting in */
3217     TABLE_LIST *second_table= first_table->next_local;
3218     select_lex->table_list.first= second_table;
3219     select_lex->context.table_list=
3220       select_lex->context.first_name_resolution_table= second_table;
3221 
3222     res= mysql_insert_select_prepare(thd);
3223     if (!res && (sel_result= new Query_result_insert(first_table,
3224                                                      first_table->table,
3225                                                      &insert_field_list,
3226                                                      &insert_field_list,
3227                                                      &insert_update_list,
3228                                                      &insert_value_list,
3229                                                      lex->duplicates)))
3230     {
3231       Ignore_error_handler ignore_handler;
3232       Strict_error_handler strict_handler;
3233       if (thd->lex->is_ignore())
3234         thd->push_internal_handler(&ignore_handler);
3235       else if (thd->is_strict_mode())
3236         thd->push_internal_handler(&strict_handler);
3237 
3238       res= handle_query(thd, lex, sel_result,
3239                         // Don't unlock tables until command is written
3240                         // to binary log
3241                         OPTION_SETUP_TABLES_DONE | SELECT_NO_UNLOCK,
3242                         0);
3243 
3244       if (thd->lex->is_ignore() || thd->is_strict_mode())
3245         thd->pop_internal_handler();
3246 
3247       delete sel_result;
3248     }
3249     /* revert changes for SP */
3250     MYSQL_INSERT_SELECT_DONE(res, (ulong) thd->get_row_count_func());
3251     select_lex->table_list.first= first_table;
3252   }
3253   /*
3254     If we have inserted into a VIEW, and the base table has
3255     AUTO_INCREMENT column, but this column is not accessible through
3256     a view, then we should restore LAST_INSERT_ID to the value it
3257     had before the statement.
3258   */
3259   if (first_table->is_view() && !first_table->contain_auto_increment)
3260     thd->first_successful_insert_id_in_cur_stmt=
3261       thd->first_successful_insert_id_in_prev_stmt;
3262 
3263   thd->lex->clear_values_map();
3264   return res;
3265 }
3266 
3267 
prepared_statement_test(THD * thd)3268 bool Sql_cmd_insert::prepared_statement_test(THD *thd)
3269 {
3270   LEX *lex= thd->lex;
3271   return mysql_test_insert(thd, lex->query_tables);
3272 }
3273