1 /*
2    Copyright (c) 2000, 2016, Oracle and/or its affiliates.
3    Copyright (c) 2010, 2021, MariaDB Corporation
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA
17 */
18 
19 /* Insert of records */
20 
21 /*
22   INSERT DELAYED
23 
24   Insert delayed is distinguished from a normal insert by lock_type ==
25   TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
26   "delayed" table (delayed_get_table()), but falls back to
27   open_and_lock_tables() on error and proceeds as normal insert then.
28 
29   Opening a "delayed" table means to find a delayed insert thread that
30   has the table open already. If this fails, a new thread is created and
31   waited for to open and lock the table.
32 
33   If accessing the thread succeeded, in
34   Delayed_insert::get_local_table() the table of the thread is copied
35   for local use. A copy is required because the normal insert logic
36   works on a target table, but the other threads table object must not
37   be used. The insert logic uses the record buffer to create a record.
38   And the delayed insert thread uses the record buffer to pass the
39   record to the table handler. So there must be different objects. Also
40   the copied table is not included in the lock, so that the statement
41   can proceed even if the real table cannot be accessed at this moment.
42 
43   Copying a table object is not a trivial operation. Besides the TABLE
44   object there are the field pointer array, the field objects and the
45   record buffer. After copying the field objects, their pointers into
46   the record must be "moved" to point to the new record buffer.
47 
48   After this setup the normal insert logic is used. Only that for
49   delayed inserts write_delayed() is called instead of write_record().
50   It inserts the rows into a queue and signals the delayed insert thread
51   instead of writing directly to the table.
52 
53   The delayed insert thread awakes from the signal. It locks the table,
54   inserts the rows from the queue, unlocks the table, and waits for the
55   next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
56 
57 */
58 
59 #include "mariadb.h"                 /* NO_EMBEDDED_ACCESS_CHECKS */
60 #include "sql_priv.h"
61 #include "sql_insert.h"
62 #include "sql_update.h"                         // compare_record
63 #include "sql_base.h"                           // close_thread_tables
64 #include "sql_cache.h"                          // query_cache_*
65 #include "key.h"                                // key_copy
66 #include "lock.h"                               // mysql_unlock_tables
67 #include "sp_head.h"
68 #include "sql_view.h"         // check_key_in_view, insert_view_fields
69 #include "sql_table.h"        // mysql_create_table_no_lock
70 #include "sql_trigger.h"
71 #include "sql_select.h"
72 #include "sql_show.h"
73 #include "slave.h"
74 #include "sql_parse.h"                          // end_active_trans
75 #include "rpl_mi.h"
76 #include "transaction.h"
77 #include "sql_audit.h"
78 #include "sql_derived.h"                        // mysql_handle_derived
79 #include "sql_prepare.h"
80 #include <my_bit.h>
81 
82 #include "debug_sync.h"
83 
84 #ifdef WITH_WSREP
85 #include "wsrep_trans_observer.h" /* wsrep_start_transction() */
86 #endif /* WITH_WSREP */
87 
88 #ifndef EMBEDDED_LIBRARY
89 static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
90                               TABLE_LIST *table_list);
91 static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
92                          LEX_STRING query, bool ignore, bool log_on);
93 static void end_delayed_insert(THD *thd);
94 pthread_handler_t handle_delayed_insert(void *arg);
95 static void unlink_blobs(TABLE *table);
96 #endif
97 static bool check_view_insertability(THD *thd, TABLE_LIST *view);
98 static int binlog_show_create_table(THD *thd, TABLE *table,
99                                     Table_specification_st *create_info);
100 
101 /*
102   Check that insert/update fields are from the same single table of a view.
103 
104   @param fields            The insert/update fields to be checked.
105   @param values            The insert/update values to be checked, NULL if
106   checking is not wanted.
107   @param view              The view for insert.
108   @param map     [in/out]  The insert table map.
109 
110   This function is called in 2 cases:
111     1. to check insert fields. In this case *map will be set to 0.
112        Insert fields are checked to be all from the same single underlying
113        table of the given view. Otherwise the error is thrown. Found table
114        map is returned in the map parameter.
115     2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
116        In this case *map contains table_map found on the previous call of
117        the function to check insert fields. Update fields are checked to be
118        from the same table as the insert fields.
119 
120   @returns false if success.
121 */
122 
check_view_single_update(List<Item> & fields,List<Item> * values,TABLE_LIST * view,table_map * map,bool insert)123 static bool check_view_single_update(List<Item> &fields, List<Item> *values,
124                                      TABLE_LIST *view, table_map *map,
125                                      bool insert)
126 {
127   /* it is join view => we need to find the table for update */
128   List_iterator_fast<Item> it(fields);
129   Item *item;
130   TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
131   table_map tables= 0;
132 
133   while ((item= it++))
134     tables|= item->used_tables();
135 
136   /*
137     Check that table is only one
138     (we can not rely on check_single_table because it skips some
139     types of tables)
140   */
141   if (my_count_bits(tables) > 1)
142     goto error;
143 
144   if (values)
145   {
146     it.init(*values);
147     while ((item= it++))
148       tables|= item->view_used_tables(view);
149   }
150 
151   /* Convert to real table bits */
152   tables&= ~PSEUDO_TABLE_BITS;
153 
154   /* Check found map against provided map */
155   if (*map)
156   {
157     if (tables != *map)
158       goto error;
159     return FALSE;
160   }
161 
162   if (view->check_single_table(&tbl, tables, view) || tbl == 0)
163     goto error;
164 
165   /* view->table should have been set in mysql_derived_merge_for_insert */
166   DBUG_ASSERT(view->table);
167 
168   /*
169     Use buffer for the insert values that was allocated for the merged view.
170   */
171   tbl->table->insert_values= view->table->insert_values;
172   view->table= tbl->table;
173   if (!tbl->single_table_updatable())
174   {
175     if (insert)
176       my_error(ER_NON_INSERTABLE_TABLE, MYF(0), view->alias.str, "INSERT");
177     else
178       my_error(ER_NON_UPDATABLE_TABLE, MYF(0), view->alias.str, "UPDATE");
179     return TRUE;
180   }
181   *map= tables;
182 
183   return FALSE;
184 
185 error:
186   my_error(ER_VIEW_MULTIUPDATE, MYF(0),
187            view->view_db.str, view->view_name.str);
188   return TRUE;
189 }
190 
191 
192 /*
193   Check if insert fields are correct.
194 
195   @param thd            The current thread.
196   @param table_list     The table we are inserting into (may be view)
197   @param fields         The insert fields.
198   @param values         The insert values.
199   @param check_unique   If duplicate values should be rejected.
200   @param fields_and_values_from_different_maps If 'values' are allowed to
201   refer to other tables than those of 'fields'
202   @param map            See check_view_single_update
203 
204   @returns 0 if success, -1 if error
205 */
206 
check_insert_fields(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<Item> & values,bool check_unique,bool fields_and_values_from_different_maps,table_map * map)207 static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
208                                List<Item> &fields, List<Item> &values,
209                                bool check_unique,
210                                bool fields_and_values_from_different_maps,
211                                table_map *map)
212 {
213   TABLE *table= table_list->table;
214   DBUG_ENTER("check_insert_fields");
215 
216   if (!table_list->single_table_updatable())
217   {
218     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
219     DBUG_RETURN(-1);
220   }
221 
222   if (fields.elements == 0 && values.elements != 0)
223   {
224     if (!table)
225     {
226       my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
227                table_list->view_db.str, table_list->view_name.str);
228       DBUG_RETURN(-1);
229     }
230     if (values.elements != table->s->visible_fields)
231     {
232       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
233       DBUG_RETURN(-1);
234     }
235 #ifndef NO_EMBEDDED_ACCESS_CHECKS
236     Field_iterator_table_ref field_it;
237     field_it.set(table_list);
238     if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
239       DBUG_RETURN(-1);
240 #endif
241     /*
242       No fields are provided so all fields must be provided in the values.
243       Thus we set all bits in the write set.
244     */
245     bitmap_set_all(table->write_set);
246   }
247   else
248   {						// Part field list
249     SELECT_LEX *select_lex= thd->lex->first_select_lex();
250     Name_resolution_context *context= &select_lex->context;
251     Name_resolution_context_state ctx_state;
252     int res;
253 
254     if (fields.elements != values.elements)
255     {
256       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
257       DBUG_RETURN(-1);
258     }
259 
260     thd->dup_field= 0;
261     select_lex->no_wrap_view_item= TRUE;
262 
263     /* Save the state of the current name resolution context. */
264     ctx_state.save_state(context, table_list);
265 
266     /*
267       Perform name resolution only in the first table - 'table_list',
268       which is the table that is inserted into.
269     */
270     table_list->next_local= 0;
271     context->resolve_in_table_list_only(table_list);
272     /* 'Unfix' fields to allow correct marking by the setup_fields function. */
273     if (table_list->is_view())
274       unfix_fields(fields);
275 
276     res= setup_fields(thd, Ref_ptr_array(),
277                       fields, MARK_COLUMNS_WRITE, 0, NULL, 0);
278 
279     /* Restore the current context. */
280     ctx_state.restore_state(context, table_list);
281     thd->lex->first_select_lex()->no_wrap_view_item= FALSE;
282 
283     if (res)
284       DBUG_RETURN(-1);
285 
286     if (table_list->is_view() && table_list->is_merged_derived())
287     {
288       if (check_view_single_update(fields,
289                                    fields_and_values_from_different_maps ?
290                                    (List<Item>*) 0 : &values,
291                                    table_list, map, true))
292         DBUG_RETURN(-1);
293       table= table_list->table;
294     }
295 
296     if (check_unique && thd->dup_field)
297     {
298       my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0),
299                thd->dup_field->field_name.str);
300       DBUG_RETURN(-1);
301     }
302   }
303   // For the values we need select_priv
304 #ifndef NO_EMBEDDED_ACCESS_CHECKS
305   table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
306 #endif
307 
308   if (check_key_in_view(thd, table_list) ||
309       (table_list->view &&
310        check_view_insertability(thd, table_list)))
311   {
312     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
313     DBUG_RETURN(-1);
314   }
315 
316   DBUG_RETURN(0);
317 }
318 
has_no_default_value(THD * thd,Field * field,TABLE_LIST * table_list)319 static bool has_no_default_value(THD *thd, Field *field, TABLE_LIST *table_list)
320 {
321   if ((field->flags & NO_DEFAULT_VALUE_FLAG) && field->real_type() != MYSQL_TYPE_ENUM)
322   {
323     bool view= false;
324     if (table_list)
325     {
326       table_list= table_list->top_table();
327       view= table_list->view != NULL;
328     }
329     if (view)
330     {
331       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_VIEW_FIELD,
332                           ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
333                           table_list->view_db.str, table_list->view_name.str);
334     }
335     else
336     {
337       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_FIELD,
338                           ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
339                           field->field_name.str);
340     }
341     return thd->really_abort_on_warning();
342   }
343   return false;
344 }
345 
346 
347 /**
348   Check if update fields are correct.
349 
350   @param thd                  The current thread.
351   @param insert_table_list    The table we are inserting into (may be view)
352   @param update_fields        The update fields.
353   @param update_values        The update values.
354   @param fields_and_values_from_different_maps If 'update_values' are allowed to
355   refer to other tables than those of 'update_fields'
356   @param map                  See check_view_single_update
357 
358   @note
359   If the update fields include an autoinc field, set the
360   table->next_number_field_updated flag.
361 
362   @returns 0 if success, -1 if error
363 */
364 
check_update_fields(THD * thd,TABLE_LIST * insert_table_list,List<Item> & update_fields,List<Item> & update_values,bool fields_and_values_from_different_maps,table_map * map)365 static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
366                                List<Item> &update_fields,
367                                List<Item> &update_values,
368                                bool fields_and_values_from_different_maps,
369                                table_map *map)
370 {
371   TABLE *table= insert_table_list->table;
372   my_bool UNINIT_VAR(autoinc_mark);
373 
374   table->next_number_field_updated= FALSE;
375 
376   if (table->found_next_number_field)
377   {
378     /*
379       Unmark the auto_increment field so that we can check if this is modified
380       by update_fields
381     */
382     autoinc_mark= bitmap_test_and_clear(table->write_set,
383                                         table->found_next_number_field->
384                                         field_index);
385   }
386 
387   /* Check the fields we are going to modify */
388   if (setup_fields(thd, Ref_ptr_array(),
389                    update_fields, MARK_COLUMNS_WRITE, 0, NULL, 0))
390     return -1;
391 
392   if (insert_table_list->is_view() &&
393       insert_table_list->is_merged_derived() &&
394       check_view_single_update(update_fields,
395                                fields_and_values_from_different_maps ?
396                                (List<Item>*) 0 : &update_values,
397                                insert_table_list, map, false))
398     return -1;
399 
400   if (table->default_field)
401     table->mark_default_fields_for_write(FALSE);
402 
403   if (table->found_next_number_field)
404   {
405     if (bitmap_is_set(table->write_set,
406                       table->found_next_number_field->field_index))
407       table->next_number_field_updated= TRUE;
408 
409     if (autoinc_mark)
410       bitmap_set_bit(table->write_set,
411                      table->found_next_number_field->field_index);
412   }
413 
414   return 0;
415 }
416 
417 /**
418   Upgrade table-level lock of INSERT statement to TL_WRITE if
419   a more concurrent lock is infeasible for some reason. This is
420   necessary for engines without internal locking support (MyISAM).
421   An engine with internal locking implementation might later
422   downgrade the lock in handler::store_lock() method.
423 */
424 
425 static
upgrade_lock_type(THD * thd,thr_lock_type * lock_type,enum_duplicates duplic)426 void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
427                        enum_duplicates duplic)
428 {
429   if (duplic == DUP_UPDATE ||
430       (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
431   {
432     *lock_type= TL_WRITE_DEFAULT;
433     return;
434   }
435 
436   if (*lock_type == TL_WRITE_DELAYED)
437   {
438     /*
439       We do not use delayed threads if:
440       - we're running in the safe mode or skip-new mode -- the
441         feature is disabled in these modes
442       - we're executing this statement on a replication slave --
443         we need to ensure serial execution of queries on the
444         slave
445       - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
446         insert cannot be concurrent
447       - this statement is directly or indirectly invoked from
448         a stored function or trigger (under pre-locking) - to
449         avoid deadlocks, since INSERT DELAYED involves a lock
450         upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
451         attempt while keeping other table level locks.
452       - this statement itself may require pre-locking.
453         We should upgrade the lock even though in most cases
454         delayed functionality may work. Unfortunately, we can't
455         easily identify whether the subject table is not used in
456         the statement indirectly via a stored function or trigger:
457         if it is used, that will lead to a deadlock between the
458         client connection and the delayed thread.
459     */
460     if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
461         thd->variables.max_insert_delayed_threads == 0 ||
462         thd->locked_tables_mode > LTM_LOCK_TABLES ||
463         thd->lex->uses_stored_routines() /*||
464         thd->lex->describe*/)
465     {
466       *lock_type= TL_WRITE;
467       return;
468     }
469     if (thd->slave_thread)
470     {
471       /* Try concurrent insert */
472       *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
473                   TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
474       return;
475     }
476 
477     bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
478     if (WSREP_BINLOG_FORMAT(global_system_variables.binlog_format) == BINLOG_FORMAT_STMT &&
479         log_on && mysql_bin_log.is_open())
480     {
481       /*
482         Statement-based binary logging does not work in this case, because:
483         a) two concurrent statements may have their rows intermixed in the
484         queue, leading to autoincrement replication problems on slave (because
485         the values generated used for one statement don't depend only on the
486         value generated for the first row of this statement, so are not
487         replicable)
488         b) if first row of the statement has an error the full statement is
489         not binlogged, while next rows of the statement may be inserted.
490         c) if first row succeeds, statement is binlogged immediately with a
491         zero error code (i.e. "no error"), if then second row fails, query
492         will fail on slave too and slave will stop (wrongly believing that the
493         master got no error).
494         So we fallback to non-delayed INSERT.
495         Note that to be fully correct, we should test the "binlog format which
496         the delayed thread is going to use for this row". But in the common case
497         where the global binlog format is not changed and the session binlog
498         format may be changed, that is equal to the global binlog format.
499         We test it without mutex for speed reasons (condition rarely true), and
500         in the common case (global not changed) it is as good as without mutex;
501         if global value is changed, anyway there is uncertainty as the delayed
502         thread may be old and use the before-the-change value.
503       */
504       *lock_type= TL_WRITE;
505     }
506   }
507 }
508 
509 
510 /**
511   Find or create a delayed insert thread for the first table in
512   the table list, then open and lock the remaining tables.
513   If a table can not be used with insert delayed, upgrade the lock
514   and open and lock all tables using the standard mechanism.
515 
516   @param thd         thread context
517   @param table_list  list of "descriptors" for tables referenced
518                      directly in statement SQL text.
519                      The first element in the list corresponds to
520                      the destination table for inserts, remaining
521                      tables, if any, are usually tables referenced
522                      by sub-queries in the right part of the
523                      INSERT.
524 
525   @return Status of the operation. In case of success 'table'
526   member of every table_list element points to an instance of
527   class TABLE.
528 
529   @sa open_and_lock_tables for more information about MySQL table
530   level locking
531 */
532 
533 static
open_and_lock_for_insert_delayed(THD * thd,TABLE_LIST * table_list)534 bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
535 {
536   MDL_request protection_request;
537   DBUG_ENTER("open_and_lock_for_insert_delayed");
538 
539 #ifndef EMBEDDED_LIBRARY
540   /* INSERT DELAYED is not allowed in a read only transaction. */
541   if (thd->tx_read_only)
542   {
543     my_error(ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION, MYF(0));
544     DBUG_RETURN(true);
545   }
546 
547   /*
548     In order for the deadlock detector to be able to find any deadlocks
549     caused by the handler thread waiting for GRL or this table, we acquire
550     protection against GRL (global IX metadata lock) and metadata lock on
551     table to being inserted into inside the connection thread.
552     If this goes ok, the tickets are cloned and added to the list of granted
553     locks held by the handler thread.
554   */
555   if (thd->has_read_only_protection())
556     DBUG_RETURN(TRUE);
557 
558   MDL_REQUEST_INIT(&protection_request, MDL_key::BACKUP, "", "",
559                    MDL_BACKUP_DML, MDL_STATEMENT);
560 
561   if (thd->mdl_context.acquire_lock(&protection_request,
562                                     thd->variables.lock_wait_timeout))
563     DBUG_RETURN(TRUE);
564 
565   if (thd->mdl_context.acquire_lock(&table_list->mdl_request,
566                                     thd->variables.lock_wait_timeout))
567     /*
568       If a lock can't be acquired, it makes no sense to try normal insert.
569       Therefore we just abort the statement.
570     */
571     DBUG_RETURN(TRUE);
572 
573   bool error= FALSE;
574   if (delayed_get_table(thd, &protection_request, table_list))
575     error= TRUE;
576   else if (table_list->table)
577   {
578     /*
579       Open tables used for sub-selects or in stored functions, will also
580       cache these functions.
581     */
582     if (open_and_lock_tables(thd, table_list->next_global, TRUE, 0))
583     {
584       end_delayed_insert(thd);
585       error= TRUE;
586     }
587     else
588     {
589       /*
590         First table was not processed by open_and_lock_tables(),
591         we need to set updatability flag "by hand".
592       */
593       if (!table_list->derived && !table_list->view)
594         table_list->updatable= 1;  // usual table
595     }
596   }
597 
598   /*
599     We can't release protection against GRL and metadata lock on the table
600     being inserted into here. These locks might be required, for example,
601     because this INSERT DELAYED calls functions which may try to update
602     this or another tables (updating the same table is of course illegal,
603     but such an attempt can be discovered only later during statement
604     execution).
605   */
606 
607   /*
608     Reset the ticket in case we end up having to use normal insert and
609     therefore will reopen the table and reacquire the metadata lock.
610   */
611   table_list->mdl_request.ticket= NULL;
612 
613   if (error || table_list->table)
614     DBUG_RETURN(error);
615 #endif
616   /*
617     * This is embedded library and we don't have auxiliary
618     threads OR
619     * a lock upgrade was requested inside delayed_get_table
620       because
621       - there are too many delayed insert threads OR
622       - the table has triggers.
623     Use a normal insert.
624   */
625   table_list->lock_type= TL_WRITE;
626   DBUG_RETURN(open_and_lock_tables(thd, table_list, TRUE, 0));
627 }
628 
629 
630 /**
631   Create a new query string for removing DELAYED keyword for
632   multi INSERT DEALAYED statement.
633 
634   @param[in] thd                 Thread handler
635   @param[in] buf                 Query string
636 
637   @return
638              0           ok
639              1           error
640 */
641 static int
create_insert_stmt_from_insert_delayed(THD * thd,String * buf)642 create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
643 {
644   /* Make a copy of thd->query() and then remove the "DELAYED" keyword */
645   if (buf->append(thd->query()) ||
646       buf->replace(thd->lex->keyword_delayed_begin_offset,
647                    thd->lex->keyword_delayed_end_offset -
648                    thd->lex->keyword_delayed_begin_offset, NULL, 0))
649     return 1;
650   return 0;
651 }
652 
653 
save_insert_query_plan(THD * thd,TABLE_LIST * table_list)654 static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
655 {
656   Explain_insert* explain= new (thd->mem_root) Explain_insert(thd->mem_root);
657   explain->table_name.append(table_list->table->alias);
658 
659   thd->lex->explain->add_insert_plan(explain);
660 
661   /* Save subquery children */
662   for (SELECT_LEX_UNIT *unit= thd->lex->first_select_lex()->first_inner_unit();
663        unit;
664        unit= unit->next_unit())
665   {
666     if (unit->explainable())
667       explain->add_child(unit->first_select()->select_number);
668   }
669 }
670 
671 
field_to_fill()672 Field **TABLE::field_to_fill()
673 {
674   return triggers && triggers->nullable_fields() ? triggers->nullable_fields() : field;
675 }
676 
677 
678 /**
679   INSERT statement implementation
680 
681   SYNOPSIS
682   mysql_insert()
683   result    NULL if the insert is not outputing results
684             via 'RETURNING' clause.
685 
686   @note Like implementations of other DDL/DML in MySQL, this function
687   relies on the caller to close the thread tables. This is done in the
688   end of dispatch_command().
689 */
mysql_insert(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<List_item> & values_list,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,bool ignore,select_result * result)690 bool mysql_insert(THD *thd, TABLE_LIST *table_list,
691                   List<Item> &fields, List<List_item> &values_list,
692                   List<Item> &update_fields, List<Item> &update_values,
693                   enum_duplicates duplic, bool ignore, select_result *result)
694 {
695   bool retval= true;
696   int error, res;
697   bool transactional_table, joins_freed= FALSE;
698   bool changed;
699   const bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
700   bool using_bulk_insert= 0;
701   uint value_count;
702   ulong counter = 1;
703   /* counter of iteration in bulk PS operation*/
704   ulonglong iteration= 0;
705   ulonglong id;
706   COPY_INFO info;
707   TABLE *table= 0;
708   List_iterator_fast<List_item> its(values_list);
709   List_item *values;
710   Name_resolution_context *context;
711   Name_resolution_context_state ctx_state;
712   SELECT_LEX   *returning= thd->lex->has_returning() ? thd->lex->returning() : 0;
713   unsigned char *readbuff= NULL;
714 
715 #ifndef EMBEDDED_LIBRARY
716   char *query= thd->query();
717   /*
718     log_on is about delayed inserts only.
719     By default, both logs are enabled (this won't cause problems if the server
720     runs without --log-bin).
721   */
722   bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
723 #endif
724   thr_lock_type lock_type;
725   Item *unused_conds= 0;
726   DBUG_ENTER("mysql_insert");
727 
728   bzero((char*) &info,sizeof(info));
729   create_explain_query(thd->lex, thd->mem_root);
730   /*
731     Upgrade lock type if the requested lock is incompatible with
732     the current connection mode or table operation.
733   */
734   upgrade_lock_type(thd, &table_list->lock_type, duplic);
735 
736   /*
737     We can't write-delayed into a table locked with LOCK TABLES:
738     this will lead to a deadlock, since the delayed thread will
739     never be able to get a lock on the table.
740   */
741   if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables_mode &&
742       find_locked_table(thd->open_tables, table_list->db.str,
743                         table_list->table_name.str))
744   {
745     my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
746              table_list->table_name.str);
747     DBUG_RETURN(TRUE);
748   }
749 
750   if (table_list->lock_type == TL_WRITE_DELAYED)
751   {
752     if (open_and_lock_for_insert_delayed(thd, table_list))
753       DBUG_RETURN(TRUE);
754   }
755   else
756   {
757     if (open_and_lock_tables(thd, table_list, TRUE, 0))
758       DBUG_RETURN(TRUE);
759   }
760 
761   THD_STAGE_INFO(thd, stage_init_update);
762   lock_type= table_list->lock_type;
763   thd->lex->used_tables=0;
764   values= its++;
765   if (bulk_parameters_set(thd))
766     DBUG_RETURN(TRUE);
767   value_count= values->elements;
768 
769   if ((res= mysql_prepare_insert(thd, table_list, fields, values,
770                                  update_fields, update_values, duplic,
771                                  &unused_conds, FALSE)))
772   {
773     retval= thd->is_error();
774     if (res < 0)
775     {
776       /*
777         Insert should be ignored but we have to log the query in statement
778         format in the binary log
779       */
780       if (thd->binlog_current_query_unfiltered())
781         retval= 1;
782     }
783     goto abort;
784   }
785   /* mysql_prepare_insert sets table_list->table if it was not set */
786   table= table_list->table;
787 
788   /* Prepares LEX::returing_list if it is not empty */
789   if (returning)
790   {
791     result->prepare(returning->item_list, NULL);
792     if (thd->is_bulk_op())
793     {
794       /*
795         It is RETURNING which needs network buffer to write result set and
796         it is array binfing which need network buffer to read parameters.
797         So we allocate yet another network buffer.
798         The old buffer will be freed at the end of operation.
799       */
800       DBUG_ASSERT(thd->protocol == &thd->protocol_binary);
801       readbuff= thd->net.buff; // old buffer
802       if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
803       {
804         readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
805         goto abort;
806       }
807     }
808   }
809 
810   context= &thd->lex->first_select_lex()->context;
811   /*
812     These three asserts test the hypothesis that the resetting of the name
813     resolution context below is not necessary at all since the list of local
814     tables for INSERT always consists of one table.
815   */
816   DBUG_ASSERT(!table_list->next_local);
817   DBUG_ASSERT(!context->table_list->next_local);
818   DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
819 
820   /* Save the state of the current name resolution context. */
821   ctx_state.save_state(context, table_list);
822 
823   /*
824     Perform name resolution only in the first table - 'table_list',
825     which is the table that is inserted into.
826   */
827   table_list->next_local= 0;
828   context->resolve_in_table_list_only(table_list);
829   switch_to_nullable_trigger_fields(*values, table);
830 
831   while ((values= its++))
832   {
833     counter++;
834     if (values->elements != value_count)
835     {
836       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
837       goto abort;
838     }
839     if (setup_fields(thd, Ref_ptr_array(),
840                      *values, MARK_COLUMNS_READ, 0, NULL, 0))
841       goto abort;
842     switch_to_nullable_trigger_fields(*values, table);
843   }
844   its.rewind ();
845 
846   /* Restore the current context. */
847   ctx_state.restore_state(context, table_list);
848 
849   if (thd->lex->unit.first_select()->optimize_unflattened_subqueries(false))
850   {
851     goto abort;
852   }
853   save_insert_query_plan(thd, table_list);
854   if (thd->lex->describe)
855   {
856     retval= thd->lex->explain->send_explain(thd);
857     goto abort;
858   }
859 
860   /*
861     Fill in the given fields and dump it to the table file
862   */
863   info.ignore= ignore;
864   info.handle_duplicates=duplic;
865   info.update_fields= &update_fields;
866   info.update_values= &update_values;
867   info.view= (table_list->view ? table_list : 0);
868   info.table_list= table_list;
869 
870   /*
871     Count warnings for all inserts.
872     For single line insert, generate an error if try to set a NOT NULL field
873     to NULL.
874   */
875   thd->count_cuted_fields= ((values_list.elements == 1 &&
876                              !ignore) ?
877 			    CHECK_FIELD_ERROR_FOR_NULL :
878 			    CHECK_FIELD_WARN);
879   thd->cuted_fields = 0L;
880   table->next_number_field=table->found_next_number_field;
881 
882 #ifdef HAVE_REPLICATION
883   if (thd->rgi_slave &&
884       (info.handle_duplicates == DUP_UPDATE) &&
885       (table->next_number_field != NULL) &&
886       rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
887     goto abort;
888 #endif
889 
890   error=0;
891   if (duplic == DUP_REPLACE &&
892       (!table->triggers || !table->triggers->has_delete_triggers()))
893     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
894   if (duplic == DUP_UPDATE)
895     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
896   /*
897     let's *try* to start bulk inserts. It won't necessary
898     start them as values_list.elements should be greater than
899     some - handler dependent - threshold.
900     We should not start bulk inserts if this statement uses
901     functions or invokes triggers since they may access
902     to the same table and therefore should not see its
903     inconsistent state created by this optimization.
904     So we call start_bulk_insert to perform nesessary checks on
905     values_list.elements, and - if nothing else - to initialize
906     the code to make the call of end_bulk_insert() below safe.
907   */
908 #ifndef EMBEDDED_LIBRARY
909   if (lock_type != TL_WRITE_DELAYED)
910 #endif /* EMBEDDED_LIBRARY */
911   {
912     bool create_lookup_handler= duplic != DUP_ERROR;
913     if (duplic != DUP_ERROR || ignore)
914     {
915       create_lookup_handler= true;
916       table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
917       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
918       {
919         if (table->file->ha_rnd_init_with_error(0))
920           goto abort;
921       }
922     }
923     table->file->prepare_for_insert(create_lookup_handler);
924     /**
925       This is a simple check for the case when the table has a trigger
926       that reads from it, or when the statement invokes a stored function
927       that reads from the table being inserted to.
928       Engines can't handle a bulk insert in parallel with a read form the
929       same table in the same connection.
930     */
931     if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
932        values_list.elements > 1)
933     {
934       using_bulk_insert= 1;
935       table->file->ha_start_bulk_insert(values_list.elements);
936     }
937     else
938       table->file->ha_reset_copy_info();
939   }
940 
941   thd->abort_on_warning= !ignore && thd->is_strict_mode();
942 
943   table->reset_default_fields();
944   table->prepare_triggers_for_insert_stmt_or_event();
945   table->mark_columns_needed_for_insert();
946 
947   if (fields.elements || !value_count || table_list->view != 0)
948   {
949     if (table->triggers &&
950         table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
951     {
952       /* BEFORE INSERT triggers exist, the check will be done later, per row */
953     }
954     else if (check_that_all_fields_are_given_values(thd, table, table_list))
955     {
956       error= 1;
957       goto values_loop_end;
958     }
959   }
960 
961   if (table_list->prepare_where(thd, 0, TRUE) ||
962       table_list->prepare_check_option(thd))
963     error= 1;
964 
965   switch_to_nullable_trigger_fields(fields, table);
966   switch_to_nullable_trigger_fields(update_fields, table);
967   switch_to_nullable_trigger_fields(update_values, table);
968 
969   if (fields.elements || !value_count)
970   {
971     /*
972       There are possibly some default values:
973       INSERT INTO t1 (fields) VALUES ...
974       INSERT INTO t1 VALUES ()
975     */
976     if (table->validate_default_values_of_unset_fields(thd))
977     {
978       error= 1;
979       goto values_loop_end;
980     }
981   }
982   /*
983     If statement returns result set, we need to send the result set metadata
984     to the client so that it knows that it has to expect an EOF or ERROR.
985     At this point we have all the required information to send the result set
986     metadata.
987   */
988   if (returning &&
989       result->send_result_set_metadata(returning->item_list,
990                                 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
991     goto values_loop_end;
992 
993   THD_STAGE_INFO(thd, stage_update);
994   thd->decide_logging_format_low(table);
995   do
996   {
997     DBUG_PRINT("info", ("iteration %llu", iteration));
998     if (iteration && bulk_parameters_set(thd))
999     {
1000       error= 1;
1001       goto values_loop_end;
1002     }
1003 
1004     while ((values= its++))
1005     {
1006       if (fields.elements || !value_count)
1007       {
1008         /*
1009           There are possibly some default values:
1010           INSERT INTO t1 (fields) VALUES ...
1011           INSERT INTO t1 VALUES ()
1012         */
1013         restore_record(table,s->default_values);	// Get empty record
1014         table->reset_default_fields();
1015         if (unlikely(fill_record_n_invoke_before_triggers(thd, table, fields,
1016                                                           *values, 0,
1017                                                           TRG_EVENT_INSERT)))
1018         {
1019           if (values_list.elements != 1 && ! thd->is_error())
1020           {
1021             info.records++;
1022             continue;
1023           }
1024           /*
1025             TODO: set thd->abort_on_warning if values_list.elements == 1
1026 	    and check that all items return warning in case of problem with
1027 	    storing field.
1028           */
1029 	  error=1;
1030 	  break;
1031         }
1032       }
1033       else
1034       {
1035         /*
1036           No field list, all fields are set explicitly:
1037           INSERT INTO t1 VALUES (values)
1038         */
1039         if (thd->lex->used_tables || // Column used in values()
1040             table->s->visible_fields != table->s->fields)
1041 	  restore_record(table,s->default_values);	// Get empty record
1042         else
1043         {
1044           TABLE_SHARE *share= table->s;
1045 
1046           /*
1047             Fix delete marker. No need to restore rest of record since it will
1048             be overwritten by fill_record() anyway (and fill_record() does not
1049             use default values in this case).
1050           */
1051           table->record[0][0]= share->default_values[0];
1052 
1053           /* Fix undefined null_bits. */
1054           if (share->null_bytes > 1 && share->last_null_bit_pos)
1055           {
1056             table->record[0][share->null_bytes - 1]=
1057               share->default_values[share->null_bytes - 1];
1058           }
1059         }
1060         table->reset_default_fields();
1061         if (unlikely(fill_record_n_invoke_before_triggers(thd, table,
1062                                                           table->
1063                                                           field_to_fill(),
1064                                                           *values, 0,
1065                                                           TRG_EVENT_INSERT)))
1066         {
1067           if (values_list.elements != 1 && ! thd->is_error())
1068 	  {
1069 	    info.records++;
1070 	    continue;
1071 	  }
1072 	  error=1;
1073 	  break;
1074         }
1075       }
1076 
1077       /*
1078         with triggers a field can get a value *conditionally*, so we have to
1079         repeat has_no_default_value() check for every row
1080       */
1081       if (table->triggers &&
1082           table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
1083       {
1084         for (Field **f=table->field ; *f ; f++)
1085         {
1086           if (unlikely(!(*f)->has_explicit_value() &&
1087                        has_no_default_value(thd, *f, table_list)))
1088           {
1089             error= 1;
1090             goto values_loop_end;
1091           }
1092         }
1093       }
1094 
1095       if ((res= table_list->view_check_option(thd,
1096                                               (values_list.elements == 1 ?
1097                                                0 :
1098                                                ignore))) ==
1099           VIEW_CHECK_SKIP)
1100         continue;
1101       else if (res == VIEW_CHECK_ERROR)
1102       {
1103         error= 1;
1104         break;
1105       }
1106 
1107 #ifndef EMBEDDED_LIBRARY
1108       if (lock_type == TL_WRITE_DELAYED)
1109       {
1110         LEX_STRING const st_query = { query, thd->query_length() };
1111         DEBUG_SYNC(thd, "before_write_delayed");
1112         error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
1113         DEBUG_SYNC(thd, "after_write_delayed");
1114         query=0;
1115       }
1116       else
1117 #endif
1118       error= write_record(thd, table, &info, result);
1119       if (unlikely(error))
1120         break;
1121       thd->get_stmt_da()->inc_current_row_for_warning();
1122     }
1123     its.rewind();
1124     iteration++;
1125   } while (bulk_parameters_iterations(thd));
1126 
1127 values_loop_end:
1128   free_underlaid_joins(thd, thd->lex->first_select_lex());
1129   joins_freed= TRUE;
1130 
1131   /*
1132     Now all rows are inserted. Time to update logs and sends response to
1133     user
1134   */
1135 #ifndef EMBEDDED_LIBRARY
1136   if (unlikely(lock_type == TL_WRITE_DELAYED))
1137   {
1138     if (likely(!error))
1139     {
1140       info.copied=values_list.elements;
1141       end_delayed_insert(thd);
1142     }
1143   }
1144   else
1145 #endif
1146   {
1147     /*
1148       Do not do this release if this is a delayed insert, it would steal
1149       auto_inc values from the delayed_insert thread as they share TABLE.
1150     */
1151     table->file->ha_release_auto_increment();
1152     if (using_bulk_insert)
1153     {
1154       if (unlikely(table->file->ha_end_bulk_insert()) &&
1155           !error)
1156       {
1157         table->file->print_error(my_errno,MYF(0));
1158         error=1;
1159       }
1160     }
1161     /* Get better status from handler if handler supports it */
1162     if (table->file->copy_info.records)
1163     {
1164       DBUG_ASSERT(info.copied >= table->file->copy_info.copied);
1165       info.touched= table->file->copy_info.touched;
1166       info.copied=  table->file->copy_info.copied;
1167       info.deleted= table->file->copy_info.deleted;
1168       info.updated= table->file->copy_info.updated;
1169     }
1170     if (duplic != DUP_ERROR || ignore)
1171     {
1172       table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1173       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1174         table->file->ha_rnd_end();
1175     }
1176 
1177     transactional_table= table->file->has_transactions_and_rollback();
1178 
1179     if (likely(changed= (info.copied || info.deleted || info.updated)))
1180     {
1181       /*
1182         Invalidate the table in the query cache if something changed.
1183         For the transactional algorithm to work the invalidation must be
1184         before binlog writing and ha_autocommit_or_rollback
1185       */
1186       query_cache_invalidate3(thd, table_list, 1);
1187     }
1188 
1189     if (thd->transaction->stmt.modified_non_trans_table)
1190       thd->transaction->all.modified_non_trans_table= TRUE;
1191     thd->transaction->all.m_unsafe_rollback_flags|=
1192       (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
1193 
1194     if (error <= 0 ||
1195         thd->transaction->stmt.modified_non_trans_table ||
1196 	was_insert_delayed)
1197     {
1198       if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
1199       {
1200         int errcode= 0;
1201 	if (error <= 0)
1202         {
1203 	  /*
1204 	    [Guilhem wrote] Temporary errors may have filled
1205 	    thd->net.last_error/errno.  For example if there has
1206 	    been a disk full error when writing the row, and it was
1207 	    MyISAM, then thd->net.last_error/errno will be set to
1208             "disk full"... and the mysql_file_pwrite() will wait until free
1209 	    space appears, and so when it finishes then the
1210 	    write_row() was entirely successful
1211 	  */
1212 	  /* todo: consider removing */
1213 	  thd->clear_error();
1214 	}
1215         else
1216           errcode= query_error_code(thd, thd->killed == NOT_KILLED);
1217 
1218         ScopedStatementReplication scoped_stmt_rpl(
1219             table->versioned(VERS_TRX_ID) ? thd : NULL);
1220        /* bug#22725:
1221 
1222 	A query which per-row-loop can not be interrupted with
1223 	KILLED, like INSERT, and that does not invoke stored
1224 	routines can be binlogged with neglecting the KILLED error.
1225 
1226 	If there was no error (error == zero) until after the end of
1227 	inserting loop the KILLED flag that appeared later can be
1228 	disregarded since previously possible invocation of stored
1229 	routines did not result in any error due to the KILLED.  In
1230 	such case the flag is ignored for constructing binlog event.
1231 	*/
1232 	DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0);
1233         if (was_insert_delayed && table_list->lock_type == TL_WRITE)
1234         {
1235           /* Binlog INSERT DELAYED as INSERT without DELAYED. */
1236           String log_query;
1237           if (create_insert_stmt_from_insert_delayed(thd, &log_query))
1238           {
1239             sql_print_error("Event Error: An error occurred while creating query string"
1240                             "for INSERT DELAYED stmt, before writing it into binary log.");
1241 
1242             error= 1;
1243           }
1244           else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1245                                      log_query.c_ptr(), log_query.length(),
1246                                      transactional_table, FALSE, FALSE,
1247                                      errcode) > 0)
1248             error= 1;
1249         }
1250         else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1251 			           thd->query(), thd->query_length(),
1252 			           transactional_table, FALSE, FALSE,
1253                                    errcode) > 0)
1254 	  error= 1;
1255       }
1256     }
1257     DBUG_ASSERT(transactional_table || !changed ||
1258                 thd->transaction->stmt.modified_non_trans_table);
1259   }
1260   THD_STAGE_INFO(thd, stage_end);
1261   /*
1262     We'll report to the client this id:
1263     - if the table contains an autoincrement column and we successfully
1264     inserted an autogenerated value, the autogenerated value.
1265     - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
1266     called, X.
1267     - if the table contains an autoincrement column, and some rows were
1268     inserted, the id of the last "inserted" row (if IGNORE, that value may not
1269     have been really inserted but ignored).
1270   */
1271   id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
1272     thd->first_successful_insert_id_in_cur_stmt :
1273     (thd->arg_of_last_insert_id_function ?
1274      thd->first_successful_insert_id_in_prev_stmt :
1275      ((table->next_number_field && info.copied) ?
1276      table->next_number_field->val_int() : 0));
1277   table->next_number_field=0;
1278   thd->count_cuted_fields= CHECK_FIELD_IGNORE;
1279   table->auto_increment_field_not_null= FALSE;
1280   if (duplic == DUP_REPLACE &&
1281       (!table->triggers || !table->triggers->has_delete_triggers()))
1282     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1283 
1284   if (unlikely(error))
1285     goto abort;
1286   if (thd->lex->analyze_stmt)
1287   {
1288     retval= 0;
1289     goto abort;
1290   }
1291   DBUG_PRINT("info", ("touched: %llu  copied: %llu  updated: %llu  deleted: %llu",
1292                       (ulonglong) info.touched, (ulonglong) info.copied,
1293                       (ulonglong) info.updated, (ulonglong) info.deleted));
1294 
1295   if ((iteration * values_list.elements) == 1 &&
1296       (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
1297   {
1298     /*
1299       Client expects an EOF/OK packet if result set metadata was sent. If
1300       LEX::has_returning and the statement returns result set
1301       we send EOF which is the indicator of the end of the row stream.
1302       Oherwise we send an OK packet i.e when the statement returns only the
1303       status information
1304     */
1305    if (returning)
1306       result->send_eof();
1307    else
1308       my_ok(thd, info.copied + info.deleted +
1309                ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1310                 info.touched : info.updated), id);
1311   }
1312   else
1313   {
1314     char buff[160];
1315     ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1316                      info.touched : info.updated);
1317 
1318     if (ignore)
1319       sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1320               (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
1321               (ulong) (info.records - info.copied),
1322               (long) thd->get_stmt_da()->current_statement_warn_count());
1323     else
1324       sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1325               (ulong) (info.deleted + updated),
1326               (long) thd->get_stmt_da()->current_statement_warn_count());
1327     if (returning)
1328       result->send_eof();
1329     else
1330       ::my_ok(thd, info.copied + info.deleted + updated, id, buff);
1331   }
1332   thd->abort_on_warning= 0;
1333   if (thd->lex->current_select->first_cond_optimization)
1334   {
1335     thd->lex->current_select->save_leaf_tables(thd);
1336     thd->lex->current_select->first_cond_optimization= 0;
1337   }
1338   if (readbuff)
1339     my_free(readbuff);
1340   DBUG_RETURN(FALSE);
1341 
1342 abort:
1343 #ifndef EMBEDDED_LIBRARY
1344   if (lock_type == TL_WRITE_DELAYED)
1345   {
1346     end_delayed_insert(thd);
1347     /*
1348       In case of an error (e.g. data truncation), the data type specific data
1349       in fields (e.g. Field_blob::value) was not taken over
1350       by the delayed writer thread. All fields in table_list->table
1351       will be freed by free_root() soon. We need to free the specific
1352       data before free_root() to avoid a memory leak.
1353     */
1354     for (Field **ptr= table_list->table->field ; *ptr ; ptr++)
1355       (*ptr)->free();
1356   }
1357 #endif
1358   if (table != NULL)
1359     table->file->ha_release_auto_increment();
1360 
1361   if (!joins_freed)
1362     free_underlaid_joins(thd, thd->lex->first_select_lex());
1363   thd->abort_on_warning= 0;
1364   if (readbuff)
1365     my_free(readbuff);
1366   DBUG_RETURN(retval);
1367 }
1368 
1369 
1370 /*
1371   Additional check for insertability for VIEW
1372 
1373   SYNOPSIS
1374     check_view_insertability()
1375     thd     - thread handler
1376     view    - reference on VIEW
1377 
1378   IMPLEMENTATION
1379     A view is insertable if the folloings are true:
1380     - All columns in the view are columns from a table
1381     - All not used columns in table have a default values
1382     - All field in view are unique (not referring to the same column)
1383 
1384   RETURN
1385     FALSE - OK
1386       view->contain_auto_increment is 1 if and only if the view contains an
1387       auto_increment field
1388 
1389     TRUE  - can't be used for insert
1390 */
1391 
check_view_insertability(THD * thd,TABLE_LIST * view)1392 static bool check_view_insertability(THD * thd, TABLE_LIST *view)
1393 {
1394   uint num= view->view->first_select_lex()->item_list.elements;
1395   TABLE *table= view->table;
1396   Field_translator *trans_start= view->field_translation,
1397 		   *trans_end= trans_start + num;
1398   Field_translator *trans;
1399   uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
1400   uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1401   MY_BITMAP used_fields;
1402   enum_column_usage saved_column_usage= thd->column_usage;
1403   DBUG_ENTER("check_key_in_view");
1404 
1405   if (!used_fields_buff)
1406     DBUG_RETURN(TRUE);  // EOM
1407 
1408   DBUG_ASSERT(view->table != 0 && view->field_translation != 0);
1409 
1410   (void) my_bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
1411   bitmap_clear_all(&used_fields);
1412 
1413   view->contain_auto_increment= 0;
1414   /*
1415     we must not set query_id for fields as they're not
1416     really used in this context
1417   */
1418   thd->column_usage= COLUMNS_WRITE;
1419   /* check simplicity and prepare unique test of view */
1420   for (trans= trans_start; trans != trans_end; trans++)
1421   {
1422     if (trans->item->fix_fields_if_needed(thd, &trans->item))
1423     {
1424       thd->column_usage= saved_column_usage;
1425       DBUG_RETURN(TRUE);
1426     }
1427     Item_field *field;
1428     /* simple SELECT list entry (field without expression) */
1429     if (!(field= trans->item->field_for_view_update()))
1430     {
1431       thd->column_usage= saved_column_usage;
1432       DBUG_RETURN(TRUE);
1433     }
1434     if (field->field->unireg_check == Field::NEXT_NUMBER)
1435       view->contain_auto_increment= 1;
1436     /* prepare unique test */
1437     /*
1438       remove collation (or other transparent for update function) if we have
1439       it
1440     */
1441     trans->item= field;
1442   }
1443   thd->column_usage= saved_column_usage;
1444   /* unique test */
1445   for (trans= trans_start; trans != trans_end; trans++)
1446   {
1447     /* Thanks to test above, we know that all columns are of type Item_field */
1448     Item_field *field= (Item_field *)trans->item;
1449     /* check fields belong to table in which we are inserting */
1450     if (field->field->table == table &&
1451         bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1452       DBUG_RETURN(TRUE);
1453   }
1454 
1455   DBUG_RETURN(FALSE);
1456 }
1457 
1458 
1459 /**
1460   TODO remove when MDEV-17395 will be closed
1461 
1462   Checks if REPLACE or ON DUPLICATE UPDATE was executed on table containing
1463   WITHOUT OVERLAPS key.
1464 
1465   @return
1466   0 if no error
1467   ER_NOT_SUPPORTED_YET if the above condidion was met
1468  */
check_duplic_insert_without_overlaps(THD * thd,TABLE * table,enum_duplicates duplic)1469 int check_duplic_insert_without_overlaps(THD *thd, TABLE *table,
1470                                          enum_duplicates duplic)
1471 {
1472   if (duplic == DUP_REPLACE || duplic == DUP_UPDATE)
1473   {
1474     for (uint k = 0; k < table->s->keys; k++)
1475     {
1476       if (table->key_info[k].without_overlaps)
1477       {
1478         my_error(ER_NOT_SUPPORTED_YET, MYF(0), "WITHOUT OVERLAPS");
1479         return ER_NOT_SUPPORTED_YET;
1480       }
1481     }
1482   }
1483   return 0;
1484 }
1485 
1486 /*
1487   Check if table can be updated
1488 
1489   SYNOPSIS
1490      mysql_prepare_insert_check_table()
1491      thd		Thread handle
1492      table_list		Table list
1493      fields		List of fields to be updated
1494      where		Pointer to where clause
1495      select_insert      Check is making for SELECT ... INSERT
1496 
1497    RETURN
1498      FALSE ok
1499      TRUE  ERROR
1500 */
1501 
mysql_prepare_insert_check_table(THD * thd,TABLE_LIST * table_list,List<Item> & fields,bool select_insert)1502 static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1503                                              List<Item> &fields,
1504                                              bool select_insert)
1505 {
1506   bool insert_into_view= (table_list->view != 0);
1507   DBUG_ENTER("mysql_prepare_insert_check_table");
1508 
1509   if (!table_list->single_table_updatable())
1510   {
1511     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
1512     DBUG_RETURN(TRUE);
1513   }
1514   /*
1515      first table in list is the one we'll INSERT into, requires INSERT_ACL.
1516      all others require SELECT_ACL only. the ACL requirement below is for
1517      new leaves only anyway (view-constituents), so check for SELECT rather
1518      than INSERT.
1519   */
1520 
1521   if (setup_tables_and_check_access(thd,
1522                                     &thd->lex->first_select_lex()->context,
1523                                     &thd->lex->first_select_lex()->
1524                                       top_join_list,
1525                                     table_list,
1526                                     thd->lex->first_select_lex()->leaf_tables,
1527                                     select_insert, INSERT_ACL, SELECT_ACL,
1528                                     TRUE))
1529     DBUG_RETURN(TRUE);
1530 
1531   if (insert_into_view && !fields.elements)
1532   {
1533     thd->lex->empty_field_list_on_rset= 1;
1534     if (!thd->lex->first_select_lex()->leaf_tables.head()->table ||
1535         table_list->is_multitable())
1536     {
1537       my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1538                table_list->view_db.str, table_list->view_name.str);
1539       DBUG_RETURN(TRUE);
1540     }
1541     DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
1542   }
1543 
1544   DBUG_RETURN(FALSE);
1545 }
1546 
1547 
1548 /*
1549   Get extra info for tables we insert into
1550 
1551   @param table     table(TABLE object) we insert into,
1552                    might be NULL in case of view
1553   @param           table(TABLE_LIST object) or view we insert into
1554 */
1555 
prepare_for_positional_update(TABLE * table,TABLE_LIST * tables)1556 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1557 {
1558   if (table)
1559   {
1560     if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1561       table->prepare_for_position();
1562     return;
1563   }
1564 
1565   DBUG_ASSERT(tables->view);
1566   List_iterator<TABLE_LIST> it(*tables->view_tables);
1567   TABLE_LIST *tbl;
1568   while ((tbl= it++))
1569     prepare_for_positional_update(tbl->table, tbl);
1570 
1571   return;
1572 }
1573 
1574 
1575 /*
1576   Prepare items in INSERT statement
1577 
1578   SYNOPSIS
1579     mysql_prepare_insert()
1580     thd                 Thread handler
1581     table_list          Global/local table list
1582     where               Where clause (for insert ... select)
1583     select_insert       TRUE if INSERT ... SELECT statement
1584 
1585   TODO (in far future)
1586     In cases of:
1587     INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1588     ON DUPLICATE KEY ...
1589     we should be able to refer to sum1 in the ON DUPLICATE KEY part
1590 
1591   WARNING
1592     You MUST set table->insert_values to 0 after calling this function
1593     before releasing the table object.
1594 
1595   RETURN VALUE
1596     0  OK
1597     >0 error
1598     <0 insert should be ignored
1599 */
1600 
mysql_prepare_insert(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List_item * values,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,COND ** where,bool select_insert)1601 int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1602                          List<Item> &fields, List_item *values,
1603                          List<Item> &update_fields, List<Item> &update_values,
1604                          enum_duplicates duplic, COND **where,
1605                          bool select_insert)
1606 {
1607   SELECT_LEX *select_lex= thd->lex->first_select_lex();
1608   Name_resolution_context *context= &select_lex->context;
1609   Name_resolution_context_state ctx_state;
1610   bool insert_into_view= (table_list->view != 0);
1611   bool res= 0;
1612   table_map map= 0;
1613   TABLE *table;
1614   DBUG_ENTER("mysql_prepare_insert");
1615   DBUG_PRINT("enter", ("table_list: %p  view: %d",
1616 		       table_list, (int) insert_into_view));
1617   /* INSERT should have a SELECT or VALUES clause */
1618   DBUG_ASSERT (!select_insert || !values);
1619 
1620   if (mysql_handle_derived(thd->lex, DT_INIT))
1621     DBUG_RETURN(1);
1622   if (table_list->handle_derived(thd->lex, DT_MERGE_FOR_INSERT))
1623     DBUG_RETURN(1);
1624   if (thd->lex->handle_list_of_derived(table_list, DT_PREPARE))
1625     DBUG_RETURN(1);
1626 
1627   if (duplic == DUP_UPDATE)
1628   {
1629     /* it should be allocated before Item::fix_fields() */
1630     if (table_list->set_insert_values(thd->mem_root))
1631       DBUG_RETURN(1);
1632   }
1633 
1634   table= table_list->table;
1635 
1636   if (table->file->check_if_updates_are_ignored("INSERT"))
1637     DBUG_RETURN(-1);
1638 
1639   if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
1640     DBUG_RETURN(1);
1641 
1642   /* Prepare the fields in the statement. */
1643   if (values)
1644   {
1645     /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
1646     DBUG_ASSERT (!select_lex->group_list.elements);
1647 
1648     /* Save the state of the current name resolution context. */
1649     ctx_state.save_state(context, table_list);
1650 
1651     /*
1652       Perform name resolution only in the first table - 'table_list',
1653       which is the table that is inserted into.
1654      */
1655     table_list->next_local= 0;
1656     context->resolve_in_table_list_only(table_list);
1657 
1658     res= setup_returning_fields(thd, table_list) ||
1659          setup_fields(thd, Ref_ptr_array(),
1660                       *values, MARK_COLUMNS_READ, 0, NULL, 0) ||
1661           check_insert_fields(thd, context->table_list, fields, *values,
1662                               !insert_into_view, 0, &map);
1663 
1664     if (!res)
1665       res= setup_fields(thd, Ref_ptr_array(),
1666                         update_values, MARK_COLUMNS_READ, 0, NULL, 0);
1667 
1668     if (!res && duplic == DUP_UPDATE)
1669     {
1670       select_lex->no_wrap_view_item= TRUE;
1671       res= check_update_fields(thd, context->table_list, update_fields,
1672                                update_values, false, &map);
1673       select_lex->no_wrap_view_item= FALSE;
1674     }
1675 
1676     /* Restore the current context. */
1677     ctx_state.restore_state(context, table_list);
1678   }
1679 
1680   if (res)
1681     DBUG_RETURN(res);
1682 
1683   if (check_duplic_insert_without_overlaps(thd, table, duplic) != 0)
1684     DBUG_RETURN(true);
1685 
1686   if (table->versioned(VERS_TIMESTAMP) && duplic == DUP_REPLACE)
1687   {
1688     // Additional memory may be required to create historical items.
1689     if (table_list->set_insert_values(thd->mem_root))
1690       DBUG_RETURN(1);
1691   }
1692 
1693   if (!select_insert)
1694   {
1695     Item *fake_conds= 0;
1696     TABLE_LIST *duplicate;
1697     if ((duplicate= unique_table(thd, table_list, table_list->next_global,
1698                                  CHECK_DUP_ALLOW_DIFFERENT_ALIAS)))
1699     {
1700       update_non_unique_table_error(table_list, "INSERT", duplicate);
1701       DBUG_RETURN(1);
1702     }
1703     select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
1704   }
1705   /*
1706     Only call prepare_for_posistion() if we are not performing a DELAYED
1707     operation. It will instead be executed by delayed insert thread.
1708   */
1709   if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1710     prepare_for_positional_update(table, table_list);
1711   DBUG_RETURN(0);
1712 }
1713 
1714 
1715 	/* Check if there is more uniq keys after field */
1716 
last_uniq_key(TABLE * table,uint keynr)1717 static int last_uniq_key(TABLE *table,uint keynr)
1718 {
1719   /*
1720     When an underlying storage engine informs that the unique key
1721     conflicts are not reported in the ascending order by setting
1722     the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1723     information to determine the last key conflict.
1724 
1725     The information about the last key conflict will be used to
1726     do a replace of the new row on the conflicting row, rather
1727     than doing a delete (of old row) + insert (of new row).
1728 
1729     Hence check for this flag and disable replacing the last row
1730     by returning 0 always. Returning 0 will result in doing
1731     a delete + insert always.
1732   */
1733   if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1734     return 0;
1735 
1736   while (++keynr < table->s->keys)
1737     if (table->key_info[keynr].flags & HA_NOSAME)
1738       return 0;
1739   return 1;
1740 }
1741 
1742 
1743 /*
1744  Inserts one historical row to a table.
1745 
1746  Copies content of the row from table->record[1] to table->record[0],
1747  sets Sys_end to now() and calls ha_write_row() .
1748 */
1749 
vers_insert_history_row(TABLE * table)1750 int vers_insert_history_row(TABLE *table)
1751 {
1752   DBUG_ASSERT(table->versioned(VERS_TIMESTAMP));
1753   if (!table->vers_write)
1754     return 0;
1755   restore_record(table,record[1]);
1756 
1757   // Set Sys_end to now()
1758   table->vers_update_end();
1759 
1760   Field *row_start= table->vers_start_field();
1761   Field *row_end= table->vers_end_field();
1762   if (row_start->cmp(row_start->ptr, row_end->ptr) >= 0)
1763     return 0;
1764 
1765   if (table->vfield &&
1766       table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_READ))
1767     return HA_ERR_GENERIC;
1768 
1769   return table->file->ha_write_row(table->record[0]);
1770 }
1771 
1772 /*
1773   Write a record to table with optional deleting of conflicting records,
1774   invoke proper triggers if needed.
1775 
1776   SYNOPSIS
1777      write_record()
1778       thd   - thread context
1779       table - table to which record should be written
1780       info  - COPY_INFO structure describing handling of duplicates
1781               and which is used for counting number of records inserted
1782               and deleted.
1783       sink  - result sink for the RETURNING clause
1784 
1785   NOTE
1786     Once this record will be written to table after insert trigger will
1787     be invoked. If instead of inserting new record we will update old one
1788     then both on update triggers will work instead. Similarly both on
1789     delete triggers will be invoked if we will delete conflicting records.
1790 
1791     Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which
1792     is updated didn't have transactions.
1793 
1794   RETURN VALUE
1795     0     - success
1796     non-0 - error
1797 */
1798 
1799 
write_record(THD * thd,TABLE * table,COPY_INFO * info,select_result * sink)1800 int write_record(THD *thd, TABLE *table, COPY_INFO *info, select_result *sink)
1801 {
1802   int error, trg_error= 0;
1803   char *key=0;
1804   MY_BITMAP *save_read_set, *save_write_set;
1805   ulonglong prev_insert_id= table->file->next_insert_id;
1806   ulonglong insert_id_for_cur_row= 0;
1807   ulonglong prev_insert_id_for_cur_row= 0;
1808   DBUG_ENTER("write_record");
1809 
1810   info->records++;
1811   save_read_set= table->read_set;
1812   save_write_set= table->write_set;
1813 
1814   if (info->handle_duplicates == DUP_REPLACE ||
1815       info->handle_duplicates == DUP_UPDATE)
1816   {
1817     while (unlikely(error=table->file->ha_write_row(table->record[0])))
1818     {
1819       uint key_nr;
1820       /*
1821         If we do more than one iteration of this loop, from the second one the
1822         row will have an explicit value in the autoinc field, which was set at
1823         the first call of handler::update_auto_increment(). So we must save
1824         the autogenerated value to avoid thd->insert_id_for_cur_row to become
1825         0.
1826       */
1827       if (table->file->insert_id_for_cur_row > 0)
1828         insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1829       else
1830         table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1831       bool is_duplicate_key_error;
1832       if (table->file->is_fatal_error(error, HA_CHECK_ALL))
1833         goto err;
1834       is_duplicate_key_error=
1835         table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP);
1836       if (!is_duplicate_key_error)
1837       {
1838         /*
1839           We come here when we had an ignorable error which is not a duplicate
1840           key error. In this we ignore error if ignore flag is set, otherwise
1841           report error as usual. We will not do any duplicate key processing.
1842         */
1843         if (info->ignore)
1844         {
1845           table->file->print_error(error, MYF(ME_WARNING));
1846           goto after_trg_or_ignored_err; /* Ignoring a not fatal error */
1847         }
1848         goto err;
1849       }
1850       if (unlikely((int) (key_nr = table->file->get_dup_key(error)) < 0))
1851       {
1852 	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
1853 	goto err;
1854       }
1855       DEBUG_SYNC(thd, "write_row_replace");
1856 
1857       /* Read all columns for the row we are going to replace */
1858       table->use_all_columns();
1859       /*
1860 	Don't allow REPLACE to replace a row when a auto_increment column
1861 	was used.  This ensures that we don't get a problem when the
1862 	whole range of the key has been used.
1863       */
1864       if (info->handle_duplicates == DUP_REPLACE && table->next_number_field &&
1865           key_nr == table->s->next_number_index && insert_id_for_cur_row > 0)
1866 	goto err;
1867       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1868       {
1869         DBUG_ASSERT(table->file->inited == handler::RND);
1870 	if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1871 	  goto err;
1872       }
1873       else
1874       {
1875 	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1876 	{
1877 	  error=my_errno;
1878 	  goto err;
1879 	}
1880 
1881 	if (!key)
1882 	{
1883 	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length)))
1884 	  {
1885 	    error=ENOMEM;
1886 	    goto err;
1887 	  }
1888 	}
1889 	key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1890         key_part_map keypart_map= (1 << table->key_info[key_nr].user_defined_key_parts) - 1;
1891 	if ((error= (table->file->ha_index_read_idx_map(table->record[1],
1892                                                         key_nr, (uchar*) key,
1893                                                         keypart_map,
1894                                                         HA_READ_KEY_EXACT))))
1895 	  goto err;
1896       }
1897       if (table->vfield)
1898       {
1899         /*
1900           We have not yet called update_virtual_fields(VOL_UPDATE_FOR_READ)
1901           in handler methods for the just read row in record[1].
1902         */
1903         table->move_fields(table->field, table->record[1], table->record[0]);
1904         int verr = table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE);
1905         table->move_fields(table->field, table->record[0], table->record[1]);
1906         if (verr)
1907           goto err;
1908       }
1909       if (info->handle_duplicates == DUP_UPDATE)
1910       {
1911         int res= 0;
1912         /*
1913           We don't check for other UNIQUE keys - the first row
1914           that matches, is updated. If update causes a conflict again,
1915           an error is returned
1916         */
1917 	DBUG_ASSERT(table->insert_values != NULL);
1918         store_record(table,insert_values);
1919         restore_record(table,record[1]);
1920         table->reset_default_fields();
1921 
1922         /*
1923           in INSERT ... ON DUPLICATE KEY UPDATE the set of modified fields can
1924           change per row. Thus, we have to do reset_default_fields() per row.
1925           Twice (before insert and before update).
1926         */
1927         DBUG_ASSERT(info->update_fields->elements ==
1928                     info->update_values->elements);
1929         if (fill_record_n_invoke_before_triggers(thd, table,
1930                                                  *info->update_fields,
1931                                                  *info->update_values,
1932                                                  info->ignore,
1933                                                  TRG_EVENT_UPDATE))
1934           goto before_trg_err;
1935 
1936         bool different_records= (!records_are_comparable(table) ||
1937                                  compare_record(table));
1938         /*
1939           Default fields must be updated before checking view updateability.
1940           This branch of INSERT is executed only when a UNIQUE key was violated
1941           with the ON DUPLICATE KEY UPDATE option. In this case the INSERT
1942           operation is transformed to an UPDATE, and the default fields must
1943           be updated as if this is an UPDATE.
1944         */
1945         if (different_records && table->default_field)
1946           table->evaluate_update_default_function();
1947 
1948         /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1949         res= info->table_list->view_check_option(table->in_use, info->ignore);
1950         if (res == VIEW_CHECK_SKIP)
1951           goto after_trg_or_ignored_err;
1952         if (res == VIEW_CHECK_ERROR)
1953           goto before_trg_err;
1954 
1955         table->file->restore_auto_increment(prev_insert_id);
1956         info->touched++;
1957         if (different_records)
1958         {
1959           if (unlikely(error=table->file->ha_update_row(table->record[1],
1960                                                         table->record[0])) &&
1961               error != HA_ERR_RECORD_IS_THE_SAME)
1962           {
1963             if (info->ignore &&
1964                 !table->file->is_fatal_error(error, HA_CHECK_ALL))
1965             {
1966               if (!(thd->variables.old_behavior &
1967                     OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
1968                 table->file->print_error(error, MYF(ME_WARNING));
1969               goto after_trg_or_ignored_err;
1970             }
1971             goto err;
1972           }
1973 
1974           if (error != HA_ERR_RECORD_IS_THE_SAME)
1975           {
1976             info->updated++;
1977             if (table->versioned())
1978             {
1979               if (table->versioned(VERS_TIMESTAMP))
1980               {
1981                 store_record(table, record[2]);
1982                 if ((error= vers_insert_history_row(table)))
1983                 {
1984                   info->last_errno= error;
1985                   table->file->print_error(error, MYF(0));
1986                   trg_error= 1;
1987                   restore_record(table, record[2]);
1988                   goto after_trg_or_ignored_err;
1989                 }
1990                 restore_record(table, record[2]);
1991               }
1992               info->copied++;
1993             }
1994           }
1995           else
1996             error= 0;
1997           /*
1998             If ON DUP KEY UPDATE updates a row instead of inserting
1999             one, it's like a regular UPDATE statement: it should not
2000             affect the value of a next SELECT LAST_INSERT_ID() or
2001             mysql_insert_id().  Except if LAST_INSERT_ID(#) was in the
2002             INSERT query, which is handled separately by
2003             THD::arg_of_last_insert_id_function.
2004           */
2005           prev_insert_id_for_cur_row= table->file->insert_id_for_cur_row;
2006           insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
2007           trg_error= (table->triggers &&
2008                       table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
2009                                                         TRG_ACTION_AFTER, TRUE));
2010           info->copied++;
2011         }
2012 
2013         /*
2014           Only update next_insert_id if the AUTO_INCREMENT value was explicitly
2015           updated, so we don't update next_insert_id with the value from the
2016           row being updated. Otherwise reset next_insert_id to what it was
2017           before the duplicate key error, since that value is unused.
2018         */
2019         if (table->next_number_field_updated)
2020         {
2021           DBUG_ASSERT(table->next_number_field != NULL);
2022 
2023           table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
2024         }
2025         else if (prev_insert_id_for_cur_row)
2026         {
2027           table->file->restore_auto_increment(prev_insert_id_for_cur_row);
2028         }
2029         goto ok;
2030       }
2031       else /* DUP_REPLACE */
2032       {
2033 	/*
2034 	  The manual defines the REPLACE semantics that it is either
2035 	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
2036 	  InnoDB do not function in the defined way if we allow MySQL
2037 	  to convert the latter operation internally to an UPDATE.
2038           We also should not perform this conversion if we have
2039           timestamp field with ON UPDATE which is different from DEFAULT.
2040           Another case when conversion should not be performed is when
2041           we have ON DELETE trigger on table so user may notice that
2042           we cheat here. Note that it is ok to do such conversion for
2043           tables which have ON UPDATE but have no ON DELETE triggers,
2044           we just should not expose this fact to users by invoking
2045           ON UPDATE triggers.
2046         */
2047         if (last_uniq_key(table,key_nr) &&
2048             !table->file->referenced_by_foreign_key() &&
2049             (!table->triggers || !table->triggers->has_delete_triggers()))
2050         {
2051           if (table->versioned(VERS_TRX_ID))
2052           {
2053             bitmap_set_bit(table->write_set, table->vers_start_field()->field_index);
2054             table->file->column_bitmaps_signal();
2055             table->vers_start_field()->store(0, false);
2056           }
2057           if (unlikely(error= table->file->ha_update_row(table->record[1],
2058                                                          table->record[0])) &&
2059               error != HA_ERR_RECORD_IS_THE_SAME)
2060             goto err;
2061           if (likely(!error))
2062           {
2063             info->deleted++;
2064             if (!table->file->has_transactions())
2065               thd->transaction->stmt.modified_non_trans_table= TRUE;
2066             if (table->versioned(VERS_TIMESTAMP))
2067             {
2068               store_record(table, record[2]);
2069               error= vers_insert_history_row(table);
2070               restore_record(table, record[2]);
2071               if (unlikely(error))
2072                 goto err;
2073             }
2074           }
2075           else
2076             error= 0;   // error was HA_ERR_RECORD_IS_THE_SAME
2077           /*
2078             Since we pretend that we have done insert we should call
2079             its after triggers.
2080           */
2081           goto after_trg_n_copied_inc;
2082         }
2083         else
2084         {
2085           if (table->triggers &&
2086               table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
2087                                                 TRG_ACTION_BEFORE, TRUE))
2088             goto before_trg_err;
2089 
2090           if (!table->versioned(VERS_TIMESTAMP))
2091             error= table->file->ha_delete_row(table->record[1]);
2092           else
2093           {
2094             store_record(table, record[2]);
2095             restore_record(table, record[1]);
2096             table->vers_update_end();
2097             error= table->file->ha_update_row(table->record[1],
2098                                               table->record[0]);
2099             restore_record(table, record[2]);
2100           }
2101           if (unlikely(error))
2102             goto err;
2103           if (!table->versioned(VERS_TIMESTAMP))
2104             info->deleted++;
2105           else
2106             info->updated++;
2107           if (!table->file->has_transactions_and_rollback())
2108             thd->transaction->stmt.modified_non_trans_table= TRUE;
2109           if (table->triggers &&
2110               table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
2111                                                 TRG_ACTION_AFTER, TRUE))
2112           {
2113             trg_error= 1;
2114             goto after_trg_or_ignored_err;
2115           }
2116           /* Let us attempt do write_row() once more */
2117         }
2118       }
2119     }
2120 
2121     /*
2122       If more than one iteration of the above while loop is done, from
2123       the second one the row being inserted will have an explicit
2124       value in the autoinc field, which was set at the first call of
2125       handler::update_auto_increment(). This value is saved to avoid
2126       thd->insert_id_for_cur_row becoming 0. Use this saved autoinc
2127       value.
2128      */
2129     if (table->file->insert_id_for_cur_row == 0)
2130       table->file->insert_id_for_cur_row= insert_id_for_cur_row;
2131 
2132     /*
2133       Restore column maps if they where replaced during an duplicate key
2134       problem.
2135     */
2136     if (table->read_set != save_read_set ||
2137         table->write_set != save_write_set)
2138       table->column_bitmaps_set(save_read_set, save_write_set);
2139   }
2140   else if (unlikely((error=table->file->ha_write_row(table->record[0]))))
2141   {
2142     DEBUG_SYNC(thd, "write_row_noreplace");
2143     if (!info->ignore ||
2144         table->file->is_fatal_error(error, HA_CHECK_ALL))
2145       goto err;
2146     if (!(thd->variables.old_behavior &
2147           OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
2148       table->file->print_error(error, MYF(ME_WARNING));
2149     table->file->restore_auto_increment(prev_insert_id);
2150     goto after_trg_or_ignored_err;
2151   }
2152 
2153 after_trg_n_copied_inc:
2154   info->copied++;
2155   thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
2156   trg_error= (table->triggers &&
2157               table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
2158                                                 TRG_ACTION_AFTER, TRUE));
2159 
2160 ok:
2161   /*
2162     We send the row after writing it to the table so that the
2163     correct values are sent to the client. Otherwise it won't show
2164     autoinc values (generated inside the handler::ha_write()) and
2165     values updated in ON DUPLICATE KEY UPDATE.
2166   */
2167   if (sink && sink->send_data(thd->lex->returning()->item_list) < 0)
2168     trg_error= 1;
2169 
2170 after_trg_or_ignored_err:
2171   if (key)
2172     my_safe_afree(key,table->s->max_unique_length);
2173   if (!table->file->has_transactions_and_rollback())
2174     thd->transaction->stmt.modified_non_trans_table= TRUE;
2175   DBUG_RETURN(trg_error);
2176 
2177 err:
2178   info->last_errno= error;
2179   table->file->print_error(error,MYF(0));
2180 
2181 before_trg_err:
2182   table->file->restore_auto_increment(prev_insert_id);
2183   if (key)
2184     my_safe_afree(key, table->s->max_unique_length);
2185   table->column_bitmaps_set(save_read_set, save_write_set);
2186   DBUG_RETURN(1);
2187 }
2188 
2189 
2190 /******************************************************************************
2191   Check that there aren't any null_fields
2192 ******************************************************************************/
2193 
2194 
check_that_all_fields_are_given_values(THD * thd,TABLE * entry,TABLE_LIST * table_list)2195 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, TABLE_LIST *table_list)
2196 {
2197   int err= 0;
2198   MY_BITMAP *write_set= entry->write_set;
2199 
2200   for (Field **field=entry->field ; *field ; field++)
2201   {
2202     if (!bitmap_is_set(write_set, (*field)->field_index) &&
2203         !(*field)->vers_sys_field() &&
2204         has_no_default_value(thd, *field, table_list) &&
2205         ((*field)->real_type() != MYSQL_TYPE_ENUM))
2206       err=1;
2207   }
2208   return thd->abort_on_warning ? err : 0;
2209 }
2210 
2211 /*****************************************************************************
2212   Handling of delayed inserts
2213   A thread is created for each table that one uses with the DELAYED attribute.
2214 *****************************************************************************/
2215 
2216 #ifndef EMBEDDED_LIBRARY
2217 
2218 class delayed_row :public ilink {
2219 public:
2220   char *record;
2221   enum_duplicates dup;
2222   my_time_t start_time;
2223   ulong start_time_sec_part;
2224   sql_mode_t sql_mode;
2225   bool auto_increment_field_not_null;
2226   bool ignore, log_query, query_start_sec_part_used;
2227   bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2228   ulonglong first_successful_insert_id_in_prev_stmt;
2229   ulonglong forced_insert_id;
2230   ulong auto_increment_increment;
2231   ulong auto_increment_offset;
2232   LEX_STRING query;
2233   Time_zone *time_zone;
2234   char *user, *host, *ip;
2235   query_id_t query_id;
2236   my_thread_id thread_id;
2237 
delayed_row(LEX_STRING const query_arg,enum_duplicates dup_arg,bool ignore_arg,bool log_query_arg)2238   delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
2239               bool ignore_arg, bool log_query_arg)
2240     : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
2241       forced_insert_id(0), query(query_arg), time_zone(0),
2242       user(0), host(0), ip(0)
2243     {}
~delayed_row()2244   ~delayed_row()
2245   {
2246     my_free(query.str);
2247     my_free(record);
2248   }
2249 };
2250 
2251 /**
2252   Delayed_insert - context of a thread responsible for delayed insert
2253   into one table. When processing delayed inserts, we create an own
2254   thread for every distinct table. Later on all delayed inserts directed
2255   into that table are handled by a dedicated thread.
2256 */
2257 
2258 class Delayed_insert :public ilink {
2259   uint locks_in_memory;
2260   thr_lock_type delayed_lock;
2261 public:
2262   THD thd;
2263   TABLE *table;
2264   mysql_mutex_t mutex;
2265   mysql_cond_t cond, cond_client;
2266   uint tables_in_use, stacked_inserts;
2267   volatile bool status;
2268   bool retry;
2269   /**
2270     When the handler thread starts, it clones a metadata lock ticket
2271     which protects against GRL and ticket for the table to be inserted.
2272     This is done to allow the deadlock detector to detect deadlocks
2273     resulting from these locks.
2274     Before this is done, the connection thread cannot safely exit
2275     without causing problems for clone_ticket().
2276     Once handler_thread_initialized has been set, it is safe for the
2277     connection thread to exit.
2278     Access to handler_thread_initialized is protected by di->mutex.
2279   */
2280   bool handler_thread_initialized;
2281   COPY_INFO info;
2282   I_List<delayed_row> rows;
2283   ulong group_count;
2284   TABLE_LIST table_list;			// Argument
2285   /**
2286     Request for IX metadata lock protecting against GRL which is
2287     passed from connection thread to the handler thread.
2288   */
2289   MDL_request grl_protection;
Delayed_insert(SELECT_LEX * current_select)2290   Delayed_insert(SELECT_LEX *current_select)
2291     :locks_in_memory(0), thd(next_thread_id()),
2292      table(0),tables_in_use(0), stacked_inserts(0),
2293      status(0), retry(0), handler_thread_initialized(FALSE), group_count(0)
2294   {
2295     DBUG_ENTER("Delayed_insert constructor");
2296     thd.security_ctx->user=(char*) delayed_user;
2297     thd.security_ctx->host=(char*) my_localhost;
2298     thd.security_ctx->ip= NULL;
2299     thd.query_id= 0;
2300     strmake_buf(thd.security_ctx->priv_user, thd.security_ctx->user);
2301     thd.current_tablenr=0;
2302     thd.set_command(COM_DELAYED_INSERT);
2303     thd.lex->current_select= current_select;
2304     thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
2305     /*
2306       Prevent changes to global.lock_wait_timeout from affecting
2307       delayed insert threads as any timeouts in delayed inserts
2308       are not communicated to the client.
2309     */
2310     thd.variables.lock_wait_timeout= LONG_TIMEOUT;
2311 
2312     bzero((char*) &thd.net, sizeof(thd.net));		// Safety
2313     bzero((char*) &table_list, sizeof(table_list));	// Safety
2314     thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
2315     thd.security_ctx->host_or_ip= "";
2316     bzero((char*) &info,sizeof(info));
2317     mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
2318     mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
2319     mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
2320     mysql_mutex_lock(&LOCK_delayed_insert);
2321     delayed_insert_threads++;
2322     mysql_mutex_unlock(&LOCK_delayed_insert);
2323     delayed_lock= global_system_variables.low_priority_updates ?
2324                                           TL_WRITE_LOW_PRIORITY : TL_WRITE;
2325     DBUG_VOID_RETURN;
2326   }
~Delayed_insert()2327   ~Delayed_insert()
2328   {
2329     /* The following is not really needed, but just for safety */
2330     delayed_row *row;
2331     while ((row=rows.get()))
2332       delete row;
2333     if (table)
2334     {
2335       close_thread_tables(&thd);
2336       thd.mdl_context.release_transactional_locks(&thd);
2337     }
2338     mysql_mutex_destroy(&mutex);
2339     mysql_cond_destroy(&cond);
2340     mysql_cond_destroy(&cond_client);
2341 
2342     server_threads.erase(&thd);
2343     mysql_mutex_assert_owner(&LOCK_delayed_insert);
2344     delayed_insert_threads--;
2345 
2346     my_free(thd.query());
2347     thd.security_ctx->user= 0;
2348     thd.security_ctx->host= 0;
2349   }
2350 
2351   /* The following is for checking when we can delete ourselves */
lock()2352   inline void lock()
2353   {
2354     locks_in_memory++;				// Assume LOCK_delay_insert
2355   }
unlock()2356   void unlock()
2357   {
2358     mysql_mutex_lock(&LOCK_delayed_insert);
2359     if (!--locks_in_memory)
2360     {
2361       mysql_mutex_lock(&mutex);
2362       if (thd.killed && ! stacked_inserts && ! tables_in_use)
2363       {
2364         mysql_cond_signal(&cond);
2365 	status=1;
2366       }
2367       mysql_mutex_unlock(&mutex);
2368     }
2369     mysql_mutex_unlock(&LOCK_delayed_insert);
2370   }
lock_count()2371   inline uint lock_count() { return locks_in_memory; }
2372 
2373   TABLE* get_local_table(THD* client_thd);
2374   bool open_and_lock_table();
2375   bool handle_inserts(void);
2376 };
2377 
2378 
2379 I_List<Delayed_insert> delayed_threads;
2380 
2381 
2382 /**
2383   Return an instance of delayed insert thread that can handle
2384   inserts into a given table, if it exists. Otherwise return NULL.
2385 */
2386 
2387 static
find_handler(THD * thd,TABLE_LIST * table_list)2388 Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
2389 {
2390   THD_STAGE_INFO(thd, stage_waiting_for_delay_list);
2391   mysql_mutex_lock(&LOCK_delayed_insert);       // Protect master list
2392   I_List_iterator<Delayed_insert> it(delayed_threads);
2393   Delayed_insert *di;
2394   while ((di= it++))
2395   {
2396     if (!cmp(&table_list->db, &di->table_list.db) &&
2397 	!cmp(&table_list->table_name, &di->table_list.table_name))
2398     {
2399       di->lock();
2400       break;
2401     }
2402   }
2403   mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2404   return di;
2405 }
2406 
2407 
2408 /**
2409   Attempt to find or create a delayed insert thread to handle inserts
2410   into this table.
2411 
2412   @return In case of success, table_list->table points to a local copy
2413           of the delayed table or is set to NULL, which indicates a
2414           request for lock upgrade. In case of failure, value of
2415           table_list->table is undefined.
2416   @retval TRUE  - this thread ran out of resources OR
2417                 - a newly created delayed insert thread ran out of
2418                   resources OR
2419                 - the created thread failed to open and lock the table
2420                   (e.g. because it does not exist) OR
2421                 - the table opened in the created thread turned out to
2422                   be a view
2423   @retval FALSE - table successfully opened OR
2424                 - too many delayed insert threads OR
2425                 - the table has triggers and we have to fall back to
2426                   a normal INSERT
2427                 Two latter cases indicate a request for lock upgrade.
2428 
2429   XXX: why do we regard INSERT DELAYED into a view as an error and
2430   do not simply perform a lock upgrade?
2431 
2432   TODO: The approach with using two mutexes to work with the
2433   delayed thread list -- LOCK_delayed_insert and
2434   LOCK_delayed_create -- is redundant, and we only need one of
2435   them to protect the list.  The reason we have two locks is that
2436   we do not want to block look-ups in the list while we're waiting
2437   for the newly created thread to open the delayed table. However,
2438   this wait itself is redundant -- we always call get_local_table
2439   later on, and there wait again until the created thread acquires
2440   a table lock.
2441 
2442   As is redundant the concept of locks_in_memory, since we already
2443   have another counter with similar semantics - tables_in_use,
2444   both of them are devoted to counting the number of producers for
2445   a given consumer (delayed insert thread), only at different
2446   stages of producer-consumer relationship.
2447 
2448   The 'status' variable in Delayed_insert is redundant
2449   too, since there is already di->stacked_inserts.
2450 */
2451 
2452 static
delayed_get_table(THD * thd,MDL_request * grl_protection_request,TABLE_LIST * table_list)2453 bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
2454                        TABLE_LIST *table_list)
2455 {
2456   int error;
2457   Delayed_insert *di;
2458   DBUG_ENTER("delayed_get_table");
2459 
2460   /* Must be set in the parser */
2461   DBUG_ASSERT(table_list->db.str);
2462 
2463   /* Find the thread which handles this table. */
2464   if (!(di= find_handler(thd, table_list)))
2465   {
2466     /*
2467       No match. Create a new thread to handle the table, but
2468       no more than max_insert_delayed_threads.
2469     */
2470     if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
2471       DBUG_RETURN(0);
2472     THD_STAGE_INFO(thd, stage_creating_delayed_handler);
2473     mysql_mutex_lock(&LOCK_delayed_create);
2474     /*
2475       The first search above was done without LOCK_delayed_create.
2476       Another thread might have created the handler in between. Search again.
2477     */
2478     if (! (di= find_handler(thd, table_list)))
2479     {
2480       if (!(di= new Delayed_insert(thd->lex->current_select)))
2481         goto end_create;
2482 
2483       /*
2484         Annotating delayed inserts is not supported.
2485       */
2486       di->thd.variables.binlog_annotate_row_events= 0;
2487 
2488       di->thd.set_db(&table_list->db);
2489       di->thd.set_query(my_strndup(PSI_INSTRUMENT_ME,
2490                                    table_list->table_name.str,
2491                                    table_list->table_name.length,
2492                                    MYF(MY_WME | ME_FATAL)),
2493                         table_list->table_name.length, system_charset_info);
2494       if (di->thd.db.str == NULL || di->thd.query() == NULL)
2495       {
2496         /* The error is reported */
2497 	delete di;
2498         goto end_create;
2499       }
2500       di->table_list= *table_list;			// Needed to open table
2501       /* Replace volatile strings with local copies */
2502       di->table_list.alias.str=    di->table_list.table_name.str=    di->thd.query();
2503       di->table_list.alias.length= di->table_list.table_name.length= di->thd.query_length();
2504       di->table_list.db= di->thd.db;
2505       /*
2506         Nulify select_lex because, if the thread that spawned the current one
2507         disconnects, the select_lex will point to freed memory.
2508       */
2509       di->table_list.select_lex= NULL;
2510       /*
2511         We need the tickets so that they can be cloned in
2512         handle_delayed_insert
2513       */
2514       MDL_REQUEST_INIT(&di->grl_protection, MDL_key::BACKUP, "", "",
2515                        MDL_BACKUP_DML, MDL_STATEMENT);
2516       di->grl_protection.ticket= grl_protection_request->ticket;
2517       init_mdl_requests(&di->table_list);
2518       di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
2519 
2520       di->lock();
2521       mysql_mutex_lock(&di->mutex);
2522       if ((error= mysql_thread_create(key_thread_delayed_insert,
2523                                       &di->thd.real_id, &connection_attrib,
2524                                       handle_delayed_insert, (void*) di)))
2525       {
2526 	DBUG_PRINT("error",
2527 		   ("Can't create thread to handle delayed insert (error %d)",
2528 		    error));
2529         mysql_mutex_unlock(&di->mutex);
2530 	di->unlock();
2531 	delete di;
2532 	my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATAL), error);
2533         goto end_create;
2534       }
2535 
2536       /*
2537         Wait until table is open unless the handler thread or the connection
2538         thread has been killed. Note that we in all cases must wait until the
2539         handler thread has been properly initialized before exiting. Otherwise
2540         we risk doing clone_ticket() on a ticket that is no longer valid.
2541       */
2542       THD_STAGE_INFO(thd, stage_waiting_for_handler_open);
2543       while (!di->handler_thread_initialized ||
2544              (!di->thd.killed && !di->table && !thd->killed))
2545       {
2546         mysql_cond_wait(&di->cond_client, &di->mutex);
2547       }
2548       mysql_mutex_unlock(&di->mutex);
2549       THD_STAGE_INFO(thd, stage_got_old_table);
2550       if (thd->killed)
2551       {
2552         di->unlock();
2553         goto end_create;
2554       }
2555       if (di->thd.killed)
2556       {
2557         if (di->thd.is_error() && ! di->retry)
2558         {
2559           /*
2560             Copy the error message. Note that we don't treat fatal
2561             errors in the delayed thread as fatal errors in the
2562             main thread. If delayed thread was killed, we don't
2563             want to send "Server shutdown in progress" in the
2564             INSERT THREAD.
2565           */
2566           my_message(di->thd.get_stmt_da()->sql_errno(),
2567                      di->thd.get_stmt_da()->message(),
2568                      MYF(0));
2569         }
2570         di->unlock();
2571         goto end_create;
2572       }
2573       mysql_mutex_lock(&LOCK_delayed_insert);
2574       delayed_threads.append(di);
2575       mysql_mutex_unlock(&LOCK_delayed_insert);
2576     }
2577     mysql_mutex_unlock(&LOCK_delayed_create);
2578   }
2579 
2580   mysql_mutex_lock(&di->mutex);
2581   table_list->table= di->get_local_table(thd);
2582   mysql_mutex_unlock(&di->mutex);
2583   if (table_list->table)
2584   {
2585     DBUG_ASSERT(! thd->is_error());
2586     thd->di= di;
2587   }
2588   /* Unlock the delayed insert object after its last access. */
2589   di->unlock();
2590   DBUG_PRINT("exit", ("table_list->table: %p", table_list->table));
2591   DBUG_RETURN(thd->is_error());
2592 
2593 end_create:
2594   mysql_mutex_unlock(&LOCK_delayed_create);
2595   DBUG_PRINT("exit", ("is_error(): %d", thd->is_error()));
2596   DBUG_RETURN(thd->is_error());
2597 }
2598 
2599 #define memdup_vcol(thd, vcol)                                            \
2600   if (vcol)                                                               \
2601   {                                                                       \
2602     (vcol)= (Virtual_column_info*)(thd)->memdup((vcol), sizeof(*(vcol))); \
2603     (vcol)->expr= NULL;                                                   \
2604   }
2605 
2606 /**
2607   As we can't let many client threads modify the same TABLE
2608   structure of the dedicated delayed insert thread, we create an
2609   own structure for each client thread. This includes a row
2610   buffer to save the column values and new fields that point to
2611   the new row buffer. The memory is allocated in the client
2612   thread and is freed automatically.
2613 
2614   @pre This function is called from the client thread.  Delayed
2615        insert thread mutex must be acquired before invoking this
2616        function.
2617 
2618   @return Not-NULL table object on success. NULL in case of an error,
2619                     which is set in client_thd.
2620 */
2621 
get_local_table(THD * client_thd)2622 TABLE *Delayed_insert::get_local_table(THD* client_thd)
2623 {
2624   my_ptrdiff_t adjust_ptrs;
2625   Field **field,**org_field, *found_next_number_field;
2626   TABLE *copy;
2627   TABLE_SHARE *share;
2628   uchar *bitmap;
2629   char *copy_tmp;
2630   uint bitmaps_used;
2631   Field **default_fields, **virtual_fields;
2632   uchar *record;
2633   DBUG_ENTER("Delayed_insert::get_local_table");
2634 
2635   /* First request insert thread to get a lock */
2636   status=1;
2637   tables_in_use++;
2638   if (!thd.lock)				// Table is not locked
2639   {
2640     THD_STAGE_INFO(client_thd, stage_waiting_for_handler_lock);
2641     mysql_cond_signal(&cond);			// Tell handler to lock table
2642     while (!thd.killed && !thd.lock && ! client_thd->killed)
2643     {
2644       mysql_cond_wait(&cond_client, &mutex);
2645     }
2646     THD_STAGE_INFO(client_thd, stage_got_handler_lock);
2647     if (client_thd->killed)
2648       goto error;
2649     if (thd.killed)
2650     {
2651       /*
2652         Check how the insert thread was killed. If it was killed
2653         by FLUSH TABLES which calls kill_delayed_threads_for_table(),
2654         then is_error is not set.
2655         In this case, return without setting an error,
2656         which means that the insert will be converted to a normal insert.
2657       */
2658       if (thd.is_error())
2659       {
2660         /*
2661           Copy the error message. Note that we don't treat fatal
2662           errors in the delayed thread as fatal errors in the
2663           main thread. If delayed thread was killed, we don't
2664           want to send "Server shutdown in progress" in the
2665           INSERT THREAD.
2666 
2667           The thread could be killed with an error message if
2668           di->handle_inserts() or di->open_and_lock_table() fails.
2669         */
2670         my_message(thd.get_stmt_da()->sql_errno(),
2671                    thd.get_stmt_da()->message(), MYF(0));
2672       }
2673       goto error;
2674     }
2675   }
2676   share= table->s;
2677 
2678   /*
2679     Allocate memory for the TABLE object, the field pointers array,
2680     and one record buffer of reclength size.
2681     Normally a table has three record buffers of rec_buff_length size,
2682     which includes alignment bytes. Since the table copy is used for
2683     creating one record only, the other record buffers and alignment
2684     are unnecessary.
2685     As the table will also need to calculate default values and
2686     expresions, we have to allocate own version of fields. keys and key
2687     parts. The key and key parts are needed as parse_vcol_defs() changes
2688     them in case of long hash keys.
2689   */
2690   THD_STAGE_INFO(client_thd, stage_allocating_local_table);
2691   if (!multi_alloc_root(client_thd->mem_root,
2692                         &copy_tmp, sizeof(*table),
2693                         &field, (uint) (share->fields+1)*sizeof(Field**),
2694                         &default_fields,
2695                         (share->default_fields +
2696                          share->default_expressions + 1) * sizeof(Field*),
2697                         &virtual_fields,
2698                         (share->virtual_fields + 1) * sizeof(Field*),
2699                         &record, (uint) share->reclength,
2700                         &bitmap, (uint) share->column_bitmap_size*4,
2701                         NullS))
2702     goto error;
2703 
2704   /* Copy the TABLE object. */
2705   copy= new (copy_tmp) TABLE;
2706   *copy= *table;
2707 
2708   /* We don't need to change the file handler here */
2709   /* Assign the pointers for the field pointers array and the record. */
2710   copy->field= field;
2711   copy->record[0]= record;
2712   memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2713   if (share->default_fields || share->default_expressions)
2714     copy->default_field= default_fields;
2715   if (share->virtual_fields)
2716     copy->vfield= virtual_fields;
2717 
2718   copy->expr_arena= NULL;
2719 
2720   /* Ensure we don't use the table list of the original table */
2721   copy->pos_in_table_list= 0;
2722 
2723   /*
2724     Make a copy of all fields.
2725     The copied fields need to point into the copied record. This is done
2726     by copying the field objects with their old pointer values and then
2727     "move" the pointers by the distance between the original and copied
2728     records. That way we preserve the relative positions in the records.
2729   */
2730   adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
2731   found_next_number_field= table->found_next_number_field;
2732   for (org_field= table->field; *org_field; org_field++, field++)
2733   {
2734     if (!(*field= (*org_field)->make_new_field(client_thd->mem_root, copy, 1)))
2735       goto error;
2736     (*field)->unireg_check= (*org_field)->unireg_check;
2737     (*field)->invisible= (*org_field)->invisible;
2738     (*field)->orig_table= copy;			// Remove connection
2739     (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
2740     (*field)->flags|= ((*org_field)->flags & LONG_UNIQUE_HASH_FIELD);
2741     (*field)->invisible= (*org_field)->invisible;
2742     memdup_vcol(client_thd, (*field)->vcol_info);
2743     memdup_vcol(client_thd, (*field)->default_value);
2744     memdup_vcol(client_thd, (*field)->check_constraint);
2745     if (*org_field == found_next_number_field)
2746       (*field)->table->found_next_number_field= *field;
2747   }
2748   *field=0;
2749 
2750   if (copy_keys_from_share(copy, client_thd->mem_root))
2751     goto error;
2752 
2753   if (share->virtual_fields || share->default_expressions ||
2754       share->default_fields)
2755   {
2756     bool error_reported= FALSE;
2757     if (unlikely(parse_vcol_defs(client_thd, client_thd->mem_root, copy,
2758                                  &error_reported,
2759                                  VCOL_INIT_DEPENDENCY_FAILURE_IS_WARNING)))
2760       goto error;
2761   }
2762 
2763   switch_defaults_to_nullable_trigger_fields(copy);
2764 
2765   /* Adjust in_use for pointing to client thread */
2766   copy->in_use= client_thd;
2767 
2768   /* Adjust lock_count. This table object is not part of a lock. */
2769   copy->lock_count= 0;
2770 
2771   /* Adjust bitmaps */
2772   copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
2773   copy->def_write_set.bitmap= ((my_bitmap_map*)
2774                                (bitmap + share->column_bitmap_size));
2775   bitmaps_used= 2;
2776   if (share->default_fields || share->default_expressions)
2777   {
2778     my_bitmap_init(&copy->has_value_set,
2779                    (my_bitmap_map*) (bitmap +
2780                                      bitmaps_used*share->column_bitmap_size),
2781                    share->fields, FALSE);
2782   }
2783   copy->tmp_set.bitmap= 0;                      // To catch errors
2784   bzero((char*) bitmap, share->column_bitmap_size * bitmaps_used);
2785   copy->read_set=  &copy->def_read_set;
2786   copy->write_set= &copy->def_write_set;
2787 
2788   DBUG_RETURN(copy);
2789 
2790   /* Got fatal error */
2791  error:
2792   tables_in_use--;
2793   mysql_cond_signal(&cond);                     // Inform thread about abort
2794   DBUG_RETURN(0);
2795 }
2796 
2797 
2798 /* Put a question in queue */
2799 
2800 static
write_delayed(THD * thd,TABLE * table,enum_duplicates duplic,LEX_STRING query,bool ignore,bool log_on)2801 int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
2802                   LEX_STRING query, bool ignore, bool log_on)
2803 {
2804   delayed_row *row= 0;
2805   Delayed_insert *di=thd->di;
2806   const Discrete_interval *forced_auto_inc;
2807   size_t user_len, host_len, ip_length;
2808   DBUG_ENTER("write_delayed");
2809   DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
2810                        (ulong) query.length));
2811 
2812   THD_STAGE_INFO(thd, stage_waiting_for_handler_insert);
2813   mysql_mutex_lock(&di->mutex);
2814   while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
2815     mysql_cond_wait(&di->cond_client, &di->mutex);
2816   THD_STAGE_INFO(thd, stage_storing_row_into_queue);
2817 
2818   if (thd->killed)
2819     goto err;
2820 
2821   /*
2822     Take a copy of the query string, if there is any. The string will
2823     be free'ed when the row is destroyed. If there is no query string,
2824     we don't do anything special.
2825    */
2826 
2827   if (query.str)
2828   {
2829     char *str;
2830     if (!(str= my_strndup(PSI_INSTRUMENT_ME, query.str, query.length,
2831                           MYF(MY_WME))))
2832       goto err;
2833     query.str= str;
2834   }
2835   row= new delayed_row(query, duplic, ignore, log_on);
2836   if (row == NULL)
2837   {
2838     my_free(query.str);
2839     goto err;
2840   }
2841 
2842   user_len= host_len= ip_length= 0;
2843   row->user= row->host= row->ip= NULL;
2844   if (thd->security_ctx)
2845   {
2846     if (thd->security_ctx->user)
2847       user_len= strlen(thd->security_ctx->user) + 1;
2848     if (thd->security_ctx->host)
2849       host_len= strlen(thd->security_ctx->host) + 1;
2850     if (thd->security_ctx->ip)
2851       ip_length= strlen(thd->security_ctx->ip) + 1;
2852   }
2853   /* This can't be THREAD_SPECIFIC as it's freed in delayed thread */
2854   if (!(row->record= (char*) my_malloc(PSI_INSTRUMENT_ME,
2855                                        table->s->reclength +
2856                                        user_len + host_len + ip_length,
2857                                        MYF(MY_WME))))
2858     goto err;
2859   memcpy(row->record, table->record[0], table->s->reclength);
2860 
2861   if (thd->security_ctx)
2862   {
2863     if (thd->security_ctx->user)
2864     {
2865       row->user= row->record + table->s->reclength;
2866       memcpy(row->user, thd->security_ctx->user, user_len);
2867     }
2868     if (thd->security_ctx->host)
2869     {
2870       row->host= row->record + table->s->reclength + user_len;
2871       memcpy(row->host, thd->security_ctx->host, host_len);
2872     }
2873     if (thd->security_ctx->ip)
2874     {
2875       row->ip= row->record + table->s->reclength + user_len + host_len;
2876       memcpy(row->ip, thd->security_ctx->ip, ip_length);
2877     }
2878   }
2879   row->query_id= thd->query_id;
2880   row->thread_id= thd->thread_id;
2881 
2882   row->start_time=                thd->start_time;
2883   row->start_time_sec_part=       thd->start_time_sec_part;
2884   row->query_start_sec_part_used= thd->query_start_sec_part_used;
2885   /*
2886     those are for the binlog: LAST_INSERT_ID() has been evaluated at this
2887     time, so record does not need it, but statement-based binlogging of the
2888     INSERT will need when the row is actually inserted.
2889     As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
2890   */
2891   row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
2892     thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2893   row->first_successful_insert_id_in_prev_stmt=
2894     thd->first_successful_insert_id_in_prev_stmt;
2895 
2896   /* Add session variable timezone
2897      Time_zone object will not be freed even the thread is ended.
2898      So we can get time_zone object from thread which handling delayed statement.
2899      See the comment of my_tz_find() for detail.
2900   */
2901   if (thd->time_zone_used)
2902   {
2903     row->time_zone = thd->variables.time_zone;
2904   }
2905   else
2906   {
2907     row->time_zone = NULL;
2908   }
2909   /* Copy session variables. */
2910   row->auto_increment_increment= thd->variables.auto_increment_increment;
2911   row->auto_increment_offset=    thd->variables.auto_increment_offset;
2912   row->sql_mode=                 thd->variables.sql_mode;
2913   row->auto_increment_field_not_null= table->auto_increment_field_not_null;
2914 
2915   /* Copy the next forced auto increment value, if any. */
2916   if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
2917   {
2918     row->forced_insert_id= forced_auto_inc->minimum();
2919     DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
2920                            (ulong) row->forced_insert_id));
2921   }
2922 
2923   di->rows.push_back(row);
2924   di->stacked_inserts++;
2925   di->status=1;
2926   if (table->s->blob_fields)
2927     unlink_blobs(table);
2928   mysql_cond_signal(&di->cond);
2929 
2930   thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
2931   mysql_mutex_unlock(&di->mutex);
2932   DBUG_RETURN(0);
2933 
2934  err:
2935   delete row;
2936   mysql_mutex_unlock(&di->mutex);
2937   DBUG_RETURN(1);
2938 }
2939 
2940 /**
2941   Signal the delayed insert thread that this user connection
2942   is finished using it for this statement.
2943 */
2944 
end_delayed_insert(THD * thd)2945 static void end_delayed_insert(THD *thd)
2946 {
2947   DBUG_ENTER("end_delayed_insert");
2948   Delayed_insert *di=thd->di;
2949   mysql_mutex_lock(&di->mutex);
2950   DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
2951   if (!--di->tables_in_use || di->thd.killed)
2952   {						// Unlock table
2953     di->status=1;
2954     mysql_cond_signal(&di->cond);
2955   }
2956   mysql_mutex_unlock(&di->mutex);
2957   DBUG_VOID_RETURN;
2958 }
2959 
2960 
2961 /* We kill all delayed threads when doing flush-tables */
2962 
kill_delayed_threads(void)2963 void kill_delayed_threads(void)
2964 {
2965   DBUG_ENTER("kill_delayed_threads");
2966   mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
2967 
2968   I_List_iterator<Delayed_insert> it(delayed_threads);
2969   Delayed_insert *di;
2970   while ((di= it++))
2971   {
2972     mysql_mutex_lock(&di->thd.LOCK_thd_kill);
2973     if (di->thd.killed < KILL_CONNECTION)
2974       di->thd.set_killed_no_mutex(KILL_CONNECTION);
2975     di->thd.abort_current_cond_wait(false);
2976     mysql_mutex_unlock(&di->thd.LOCK_thd_kill);
2977   }
2978   mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2979   DBUG_VOID_RETURN;
2980 }
2981 
2982 
2983 /**
2984   A strategy for the prelocking algorithm which prevents the
2985   delayed insert thread from opening tables with engines which
2986   do not support delayed inserts.
2987 
2988   Particularly it allows to abort open_tables() as soon as we
2989   discover that we have opened a MERGE table, without acquiring
2990   metadata locks on underlying tables.
2991 */
2992 
2993 class Delayed_prelocking_strategy : public Prelocking_strategy
2994 {
2995 public:
2996   virtual bool handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2997                               Sroutine_hash_entry *rt, sp_head *sp,
2998                               bool *need_prelocking);
2999   virtual bool handle_table(THD *thd, Query_tables_list *prelocking_ctx,
3000                             TABLE_LIST *table_list, bool *need_prelocking);
3001   virtual bool handle_view(THD *thd, Query_tables_list *prelocking_ctx,
3002                            TABLE_LIST *table_list, bool *need_prelocking);
3003 };
3004 
3005 
3006 bool Delayed_prelocking_strategy::
handle_table(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)3007 handle_table(THD *thd, Query_tables_list *prelocking_ctx,
3008              TABLE_LIST *table_list, bool *need_prelocking)
3009 {
3010   DBUG_ASSERT(table_list->lock_type == TL_WRITE_DELAYED);
3011 
3012   if (!(table_list->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
3013   {
3014     my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), table_list->table_name.str);
3015     return TRUE;
3016   }
3017   return FALSE;
3018 }
3019 
3020 
3021 bool Delayed_prelocking_strategy::
handle_routine(THD * thd,Query_tables_list * prelocking_ctx,Sroutine_hash_entry * rt,sp_head * sp,bool * need_prelocking)3022 handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
3023                Sroutine_hash_entry *rt, sp_head *sp,
3024                bool *need_prelocking)
3025 {
3026   /* LEX used by the delayed insert thread has no routines. */
3027   DBUG_ASSERT(0);
3028   return FALSE;
3029 }
3030 
3031 
3032 bool Delayed_prelocking_strategy::
handle_view(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)3033 handle_view(THD *thd, Query_tables_list *prelocking_ctx,
3034             TABLE_LIST *table_list, bool *need_prelocking)
3035 {
3036   /* We don't open views in the delayed insert thread. */
3037   DBUG_ASSERT(0);
3038   return FALSE;
3039 }
3040 
3041 
3042 /**
3043    Open and lock table for use by delayed thread and check that
3044    this table is suitable for delayed inserts.
3045 
3046    @retval FALSE - Success.
3047    @retval TRUE  - Failure.
3048 */
3049 
open_and_lock_table()3050 bool Delayed_insert::open_and_lock_table()
3051 {
3052   Delayed_prelocking_strategy prelocking_strategy;
3053 
3054   /*
3055     Use special prelocking strategy to get ER_DELAYED_NOT_SUPPORTED
3056     error for tables with engines which don't support delayed inserts.
3057 
3058     We can't do auto-repair in insert delayed thread, as it would hang
3059     when trying to an exclusive MDL_LOCK on the table during repair
3060     as the connection thread has a SHARED_WRITE lock.
3061   */
3062   if (!(table= open_n_lock_single_table(&thd, &table_list,
3063                                         TL_WRITE_DELAYED,
3064                                         MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
3065                                         MYSQL_OPEN_IGNORE_REPAIR,
3066                                         &prelocking_strategy)))
3067   {
3068     /* If table was crashed, then upper level should retry open+repair */
3069     retry= table_list.crashed;
3070     thd.fatal_error();                      // Abort waiting inserts
3071     return TRUE;
3072   }
3073 
3074   if (table->triggers || table->check_constraints)
3075   {
3076     /*
3077       Table has triggers or check constraints. This is not an error, but we do
3078       not support these with delayed insert. Terminate the delayed
3079       thread without an error and thus request lock upgrade.
3080     */
3081     return TRUE;
3082   }
3083   table->copy_blobs= 1;
3084 
3085   table->file->prepare_for_row_logging();
3086   return FALSE;
3087 }
3088 
3089 
3090 /*
3091  * Create a new delayed insert thread
3092 */
3093 
handle_delayed_insert(void * arg)3094 pthread_handler_t handle_delayed_insert(void *arg)
3095 {
3096   Delayed_insert *di=(Delayed_insert*) arg;
3097   THD *thd= &di->thd;
3098 
3099   pthread_detach_this_thread();
3100   /* Add thread to THD list so that's it's visible in 'show processlist' */
3101   thd->set_start_time();
3102   server_threads.insert(thd);
3103   if (abort_loop)
3104     thd->set_killed(KILL_CONNECTION);
3105   else
3106     thd->reset_killed();
3107 
3108   mysql_thread_set_psi_id(thd->thread_id);
3109 
3110   /*
3111     Wait until the client runs into mysql_cond_wait(),
3112     where we free it after the table is opened and di linked in the list.
3113     If we did not wait here, the client might detect the opened table
3114     before it is linked to the list. It would release LOCK_delayed_create
3115     and allow another thread to create another handler for the same table,
3116     since it does not find one in the list.
3117   */
3118   mysql_mutex_lock(&di->mutex);
3119   if (my_thread_init())
3120   {
3121     /* Can't use my_error since store_globals has not yet been called */
3122     thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3123     di->handler_thread_initialized= TRUE;
3124   }
3125   else
3126   {
3127     DBUG_ENTER("handle_delayed_insert");
3128     thd->thread_stack= (char*) &thd;
3129     if (init_thr_lock())
3130     {
3131       thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3132       di->handler_thread_initialized= TRUE;
3133       thd->fatal_error();
3134       goto err;
3135     }
3136 
3137     thd->store_globals();
3138 
3139     thd->lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
3140 
3141     /*
3142       INSERT DELAYED has to go to row-based format because the time
3143       at which rows are inserted cannot be determined in mixed mode.
3144     */
3145     thd->set_current_stmt_binlog_format_row_if_mixed();
3146     /* Don't annotate insert delayed binlog events */
3147     thd->variables.binlog_annotate_row_events= 0;
3148 
3149     /*
3150       Clone tickets representing protection against GRL and the lock on
3151       the target table for the insert and add them to the list of granted
3152       metadata locks held by the handler thread. This is safe since the
3153       handler thread is not holding nor waiting on any metadata locks.
3154     */
3155     if (thd->mdl_context.clone_ticket(&di->grl_protection) ||
3156         thd->mdl_context.clone_ticket(&di->table_list.mdl_request))
3157     {
3158       thd->release_transactional_locks();
3159       di->handler_thread_initialized= TRUE;
3160       goto err;
3161     }
3162 
3163     /*
3164       Now that the ticket has been cloned, it is safe for the connection
3165       thread to exit.
3166     */
3167     di->handler_thread_initialized= TRUE;
3168     di->table_list.mdl_request.ticket= NULL;
3169 
3170     thd->set_query_id(next_query_id());
3171 
3172     if (di->open_and_lock_table())
3173       goto err;
3174 
3175     /*
3176       INSERT DELAYED generally expects thd->lex->current_select to be NULL,
3177       since this is not an attribute of the current thread. This can lead to
3178       problems if the thread that spawned the current one disconnects.
3179       current_select will then point to freed memory. But current_select is
3180       required to resolve the partition function. So, after fulfilling that
3181       requirement, we set the current_select to 0.
3182     */
3183     thd->lex->current_select= NULL;
3184 
3185     /* Tell client that the thread is initialized */
3186     mysql_cond_signal(&di->cond_client);
3187 
3188     /*
3189       Inform mdl that it needs to call mysql_lock_abort to abort locks
3190       for delayed insert.
3191     */
3192     thd->mdl_context.set_needs_thr_lock_abort(TRUE);
3193 
3194     di->table->mark_columns_needed_for_insert();
3195     /* Mark all columns for write as we don't know which columns we get from user */
3196     bitmap_set_all(di->table->write_set);
3197 
3198     /* Now wait until we get an insert or lock to handle */
3199     /* We will not abort as long as a client thread uses this thread */
3200 
3201     for (;;)
3202     {
3203       if (thd->killed)
3204       {
3205         uint lock_count;
3206         DBUG_PRINT("delayed", ("Insert delayed killed"));
3207         /*
3208           Remove this from delay insert list so that no one can request a
3209           table from this
3210         */
3211         mysql_mutex_unlock(&di->mutex);
3212         mysql_mutex_lock(&LOCK_delayed_insert);
3213         di->unlink();
3214         lock_count=di->lock_count();
3215         mysql_mutex_unlock(&LOCK_delayed_insert);
3216         mysql_mutex_lock(&di->mutex);
3217         if (!lock_count && !di->tables_in_use && !di->stacked_inserts &&
3218             !thd->lock)
3219           break;					// Time to die
3220       }
3221 
3222       /* Shouldn't wait if killed or an insert is waiting. */
3223       DBUG_PRINT("delayed",
3224                  ("thd->killed: %d  di->status: %d  di->stacked_inserts: %d",
3225                   thd->killed, di->status, di->stacked_inserts));
3226       if (!thd->killed && !di->status && !di->stacked_inserts)
3227       {
3228         struct timespec abstime;
3229         set_timespec(abstime, delayed_insert_timeout);
3230 
3231         /* Information for pthread_kill */
3232         mysql_mutex_unlock(&di->mutex);
3233         mysql_mutex_lock(&di->thd.mysys_var->mutex);
3234         di->thd.mysys_var->current_mutex= &di->mutex;
3235         di->thd.mysys_var->current_cond= &di->cond;
3236         mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3237         mysql_mutex_lock(&di->mutex);
3238         THD_STAGE_INFO(&(di->thd), stage_waiting_for_insert);
3239 
3240         DBUG_PRINT("info",("Waiting for someone to insert rows"));
3241         while (!thd->killed && !di->status)
3242         {
3243           int error;
3244           mysql_audit_release(thd);
3245           error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
3246 #ifdef EXTRA_DEBUG
3247           if (error && error != EINTR && error != ETIMEDOUT)
3248           {
3249             fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
3250             DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
3251                                  error));
3252           }
3253 #endif
3254           if (error == ETIMEDOUT || error == ETIME)
3255             thd->set_killed(KILL_CONNECTION);
3256         }
3257         /* We can't lock di->mutex and mysys_var->mutex at the same time */
3258         mysql_mutex_unlock(&di->mutex);
3259         mysql_mutex_lock(&di->thd.mysys_var->mutex);
3260         di->thd.mysys_var->current_mutex= 0;
3261         di->thd.mysys_var->current_cond= 0;
3262         mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3263         mysql_mutex_lock(&di->mutex);
3264       }
3265 
3266       /*
3267         The code depends on that the following ASSERT always hold.
3268         I don't want to accidently introduce and bugs in the following code
3269         in this commit, so I leave the small cleaning up of the code to
3270         a future commit
3271       */
3272       DBUG_ASSERT(thd->lock || di->stacked_inserts == 0);
3273 
3274       DBUG_PRINT("delayed",
3275                  ("thd->killed: %d  di->status: %d  di->stacked_insert: %d  di->tables_in_use: %d  thd->lock: %d",
3276                   thd->killed, di->status, di->stacked_inserts, di->tables_in_use, thd->lock != 0));
3277 
3278       /*
3279         This is used to test see what happens if killed is sent before
3280         we have time to handle the insert requests.
3281       */
3282       DBUG_EXECUTE_IF("write_delay_wakeup",
3283                       if (!thd->killed && di->stacked_inserts)
3284                         my_sleep(500000);
3285                       );
3286 
3287       if (di->tables_in_use && ! thd->lock &&
3288           (!thd->killed || di->stacked_inserts))
3289       {
3290         thd->set_query_id(next_query_id());
3291         /*
3292           Request for new delayed insert.
3293           Lock the table, but avoid to be blocked by a global read lock.
3294           If we got here while a global read lock exists, then one or more
3295           inserts started before the lock was requested. These are allowed
3296           to complete their work before the server returns control to the
3297           client which requested the global read lock. The delayed insert
3298           handler will close the table and finish when the outstanding
3299           inserts are done.
3300         */
3301         if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 0)))
3302         {
3303           /* Fatal error */
3304           thd->set_killed(KILL_CONNECTION);
3305         }
3306         mysql_cond_broadcast(&di->cond_client);
3307       }
3308       if (di->stacked_inserts)
3309       {
3310         delayed_row *row;
3311         I_List_iterator<delayed_row> it(di->rows);
3312         my_thread_id cur_thd= di->thd.thread_id;
3313 
3314         while ((row= it++))
3315         {
3316           if (cur_thd != row->thread_id)
3317           {
3318             mysql_audit_external_lock_ex(&di->thd, row->thread_id,
3319                 row->user, row->host, row->ip, row->query_id,
3320                 di->table->s, F_WRLCK);
3321             cur_thd= row->thread_id;
3322           }
3323         }
3324         if (di->handle_inserts())
3325         {
3326           /* Some fatal error */
3327           thd->set_killed(KILL_CONNECTION);
3328         }
3329       }
3330       di->status=0;
3331       if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
3332       {
3333         /*
3334           No one is doing a insert delayed
3335           Unlock table so that other threads can use it
3336         */
3337         MYSQL_LOCK *lock=thd->lock;
3338         thd->lock=0;
3339         mysql_mutex_unlock(&di->mutex);
3340         /*
3341           We need to release next_insert_id before unlocking. This is
3342           enforced by handler::ha_external_lock().
3343         */
3344         di->table->file->ha_release_auto_increment();
3345         mysql_unlock_tables(thd, lock);
3346         trans_commit_stmt(thd);
3347         di->group_count=0;
3348         mysql_audit_release(thd);
3349         /*
3350           Reset binlog. We can't call ha_reset() for the table as this will
3351           reset the table maps we have calculated earlier.
3352         */
3353         mysql_mutex_lock(&di->mutex);
3354       }
3355 
3356       /*
3357         Reset binlog. We can't call ha_reset() for the table as this will
3358         reset the table maps we have calculated earlier.
3359       */
3360       thd->reset_binlog_for_next_statement();
3361 
3362       if (di->tables_in_use)
3363         mysql_cond_broadcast(&di->cond_client); // If waiting clients
3364     }
3365 
3366   err:
3367     DBUG_LEAVE;
3368   }
3369 
3370   {
3371     DBUG_ENTER("handle_delayed_insert-cleanup");
3372     di->table=0;
3373     mysql_mutex_unlock(&di->mutex);
3374 
3375     /*
3376       Protect against mdl_locks trying to access open tables
3377       We use KILL_CONNECTION_HARD here to ensure that
3378       THD::notify_shared_lock() dosn't try to access open tables after
3379       this.
3380     */
3381     mysql_mutex_lock(&thd->LOCK_thd_data);
3382     thd->mdl_context.set_needs_thr_lock_abort(0);
3383     mysql_mutex_unlock(&thd->LOCK_thd_data);
3384     thd->set_killed(KILL_CONNECTION_HARD);	        // If error
3385 
3386     close_thread_tables(thd);			// Free the table
3387     thd->release_transactional_locks();
3388     mysql_cond_broadcast(&di->cond_client);       // Safety
3389 
3390     mysql_mutex_lock(&LOCK_delayed_create);    // Because of delayed_get_table
3391     mysql_mutex_lock(&LOCK_delayed_insert);
3392     /*
3393       di should be unlinked from the thread handler list and have no active
3394       clients
3395     */
3396     delete di;
3397     mysql_mutex_unlock(&LOCK_delayed_insert);
3398     mysql_mutex_unlock(&LOCK_delayed_create);
3399 
3400     DBUG_LEAVE;
3401   }
3402   my_thread_end();
3403   pthread_exit(0);
3404 
3405   return 0;
3406 }
3407 
3408 
3409 /* Remove all pointers to data for blob fields so that original table doesn't try to free them */
3410 
unlink_blobs(TABLE * table)3411 static void unlink_blobs(TABLE *table)
3412 {
3413   for (Field **ptr=table->field ; *ptr ; ptr++)
3414   {
3415     if ((*ptr)->flags & BLOB_FLAG)
3416       ((Field_blob *) (*ptr))->clear_temporary();
3417   }
3418 }
3419 
3420 /* Free blobs stored in current row */
3421 
free_delayed_insert_blobs(TABLE * table)3422 static void free_delayed_insert_blobs(TABLE *table)
3423 {
3424   for (Field **ptr=table->field ; *ptr ; ptr++)
3425   {
3426     if ((*ptr)->flags & BLOB_FLAG)
3427       ((Field_blob *) *ptr)->free();
3428   }
3429 }
3430 
3431 
3432 /* set value field for blobs to point to data in record */
3433 
set_delayed_insert_blobs(TABLE * table)3434 static void set_delayed_insert_blobs(TABLE *table)
3435 {
3436   for (Field **ptr=table->field ; *ptr ; ptr++)
3437   {
3438     if ((*ptr)->flags & BLOB_FLAG)
3439     {
3440       Field_blob *blob= ((Field_blob *) *ptr);
3441       uchar *data= blob->get_ptr();
3442       if (data)
3443         blob->set_value(data);     // Set value.ptr() to point to data
3444     }
3445   }
3446 }
3447 
3448 
handle_inserts(void)3449 bool Delayed_insert::handle_inserts(void)
3450 {
3451   int error;
3452   ulong max_rows;
3453   bool using_ignore= 0, using_opt_replace= 0, using_bin_log;
3454   delayed_row *row;
3455   DBUG_ENTER("handle_inserts");
3456 
3457   /* Allow client to insert new rows */
3458   mysql_mutex_unlock(&mutex);
3459 
3460   table->next_number_field=table->found_next_number_field;
3461   table->use_all_columns();
3462 
3463   THD_STAGE_INFO(&thd, stage_upgrading_lock);
3464   if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
3465                                    thd.variables.lock_wait_timeout))
3466   {
3467     /*
3468       This can happen if thread is killed either by a shutdown
3469       or if another thread is removing the current table definition
3470       from the table cache.
3471     */
3472     my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATAL | ME_ERROR_LOG),
3473              table->s->table_name.str);
3474     goto err;
3475   }
3476 
3477   THD_STAGE_INFO(&thd, stage_insert);
3478   max_rows= delayed_insert_limit;
3479   if (thd.killed || table->s->tdc->flushed)
3480   {
3481     thd.set_killed(KILL_SYSTEM_THREAD);
3482     max_rows= ULONG_MAX;                     // Do as much as possible
3483   }
3484 
3485   if (table->file->ha_rnd_init_with_error(0))
3486     goto err;
3487   /*
3488     We have to call prepare_for_row_logging() as the second call to
3489     handler_writes() will not have called decide_logging_format.
3490   */
3491   table->file->prepare_for_row_logging();
3492   table->file->prepare_for_insert(1);
3493   using_bin_log= table->file->row_logging;
3494 
3495   /*
3496     We can't use row caching when using the binary log because if
3497     we get a crash, then binary log will contain rows that are not yet
3498     written to disk, which will cause problems in replication.
3499   */
3500   if (!using_bin_log)
3501     table->file->extra(HA_EXTRA_WRITE_CACHE);
3502 
3503   mysql_mutex_lock(&mutex);
3504 
3505   while ((row=rows.get()))
3506   {
3507     int tmp_error;
3508     stacked_inserts--;
3509     mysql_mutex_unlock(&mutex);
3510     memcpy(table->record[0],row->record,table->s->reclength);
3511     if (table->s->blob_fields)
3512       set_delayed_insert_blobs(table);
3513 
3514     thd.start_time=row->start_time;
3515     thd.start_time_sec_part=row->start_time_sec_part;
3516     thd.query_start_sec_part_used=row->query_start_sec_part_used;
3517     /*
3518       To get the exact auto_inc interval to store in the binlog we must not
3519       use values from the previous interval (of the previous rows).
3520     */
3521     bool log_query= (row->log_query && row->query.str != NULL);
3522     DBUG_PRINT("delayed", ("query: '%s'  length: %lu", row->query.str ?
3523                            row->query.str : "[NULL]",
3524                            (ulong) row->query.length));
3525     if (log_query)
3526     {
3527       /*
3528         Guaranteed that the INSERT DELAYED STMT will not be here
3529         in SBR when mysql binlog is enabled.
3530       */
3531       DBUG_ASSERT(!mysql_bin_log.is_open() ||
3532                   thd.is_current_stmt_binlog_format_row());
3533 
3534       /*
3535         This is the first value of an INSERT statement.
3536         It is the right place to clear a forced insert_id.
3537         This is usually done after the last value of an INSERT statement,
3538         but we won't know this in the insert delayed thread. But before
3539         the first value is sufficiently equivalent to after the last
3540         value of the previous statement.
3541       */
3542       table->file->ha_release_auto_increment();
3543       thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
3544     }
3545     thd.first_successful_insert_id_in_prev_stmt=
3546       row->first_successful_insert_id_in_prev_stmt;
3547     thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
3548       row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
3549     table->auto_increment_field_not_null= row->auto_increment_field_not_null;
3550 
3551     /* Copy the session variables. */
3552     thd.variables.auto_increment_increment= row->auto_increment_increment;
3553     thd.variables.auto_increment_offset=    row->auto_increment_offset;
3554     thd.variables.sql_mode=                 row->sql_mode;
3555 
3556     /* Copy a forced insert_id, if any. */
3557     if (row->forced_insert_id)
3558     {
3559       DBUG_PRINT("delayed", ("received auto_inc: %lu",
3560                              (ulong) row->forced_insert_id));
3561       thd.force_one_auto_inc_interval(row->forced_insert_id);
3562     }
3563 
3564     info.ignore= row->ignore;
3565     info.handle_duplicates= row->dup;
3566     if (info.ignore ||
3567 	info.handle_duplicates != DUP_ERROR)
3568     {
3569       table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3570       using_ignore=1;
3571     }
3572     if (info.handle_duplicates == DUP_REPLACE &&
3573         (!table->triggers ||
3574          !table->triggers->has_delete_triggers()))
3575     {
3576       table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3577       using_opt_replace= 1;
3578     }
3579     if (info.handle_duplicates == DUP_UPDATE)
3580       table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3581     thd.clear_error(); // reset error for binlog
3582 
3583     tmp_error= 0;
3584     if (unlikely(table->vfield))
3585     {
3586       /*
3587         Virtual fields where not calculated by caller as the temporary
3588         TABLE object used had vcol_set empty. Better to calculate them
3589         here to make the caller faster.
3590       */
3591       tmp_error= table->update_virtual_fields(table->file,
3592                                               VCOL_UPDATE_FOR_WRITE);
3593     }
3594 
3595     if (unlikely(tmp_error || write_record(&thd, table, &info, NULL)))
3596     {
3597       info.error_count++;				// Ignore errors
3598       thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3599       row->log_query = 0;
3600     }
3601 
3602     if (using_ignore)
3603     {
3604       using_ignore=0;
3605       table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3606     }
3607     if (using_opt_replace)
3608     {
3609       using_opt_replace= 0;
3610       table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3611     }
3612 
3613     if (table->s->blob_fields)
3614       free_delayed_insert_blobs(table);
3615     thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
3616     thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
3617     mysql_mutex_lock(&mutex);
3618 
3619     /*
3620       Reset the table->auto_increment_field_not_null as it is valid for
3621       only one row.
3622     */
3623     table->auto_increment_field_not_null= FALSE;
3624 
3625     delete row;
3626     /*
3627       Let READ clients do something once in a while
3628       We should however not break in the middle of a multi-line insert
3629       if we have binary logging enabled as we don't want other commands
3630       on this table until all entries has been processed
3631     */
3632     if (group_count++ >= max_rows && (row= rows.head()) &&
3633 	(!(row->log_query & using_bin_log)))
3634     {
3635       group_count=0;
3636       if (stacked_inserts || tables_in_use)	// Let these wait a while
3637       {
3638 	if (tables_in_use)
3639           mysql_cond_broadcast(&cond_client);   // If waiting clients
3640 	THD_STAGE_INFO(&thd, stage_reschedule);
3641         mysql_mutex_unlock(&mutex);
3642 	if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3643 	{
3644 	  /* This should never happen */
3645 	  table->file->print_error(error,MYF(0));
3646 	  sql_print_error("%s", thd.get_stmt_da()->message());
3647           DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
3648 	  goto err;
3649 	}
3650 	query_cache_invalidate3(&thd, table, 1);
3651 	if (thr_reschedule_write_lock(*thd.lock->locks,
3652                                 thd.variables.lock_wait_timeout))
3653 	{
3654           /* This is not known to happen. */
3655           my_error(ER_DELAYED_CANT_CHANGE_LOCK,
3656                    MYF(ME_FATAL | ME_ERROR_LOG),
3657                    table->s->table_name.str);
3658           goto err;
3659 	}
3660 	if (!using_bin_log)
3661 	  table->file->extra(HA_EXTRA_WRITE_CACHE);
3662         mysql_mutex_lock(&mutex);
3663 	THD_STAGE_INFO(&thd, stage_insert);
3664       }
3665       if (tables_in_use)
3666         mysql_cond_broadcast(&cond_client);     // If waiting clients
3667     }
3668   }
3669 
3670   table->file->ha_rnd_end();
3671 
3672   if (WSREP((&thd)))
3673     thd_proc_info(&thd, "Insert done");
3674   else
3675     thd_proc_info(&thd, 0);
3676   mysql_mutex_unlock(&mutex);
3677 
3678   /*
3679     We need to flush the pending event when using row-based
3680     replication since the flushing normally done in binlog_query() is
3681     not done last in the statement: for delayed inserts, the insert
3682     statement is logged *before* all rows are inserted.
3683 
3684     We can flush the pending event without checking the thd->lock
3685     since the delayed insert *thread* is not inside a stored function
3686     or trigger.
3687 
3688     TODO: Move the logging to last in the sequence of rows.
3689   */
3690   if (table->file->row_logging &&
3691       thd.binlog_flush_pending_rows_event(TRUE,
3692                                           table->file->row_logging_has_trans))
3693     goto err;
3694 
3695   if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3696   {						// This shouldn't happen
3697     table->file->print_error(error,MYF(0));
3698     sql_print_error("%s", thd.get_stmt_da()->message());
3699     DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
3700     goto err;
3701   }
3702   query_cache_invalidate3(&thd, table, 1);
3703   mysql_mutex_lock(&mutex);
3704   DBUG_RETURN(0);
3705 
3706  err:
3707 #ifndef DBUG_OFF
3708   max_rows= 0;                                  // For DBUG output
3709 #endif
3710   /* Remove all not used rows */
3711   mysql_mutex_lock(&mutex);
3712   while ((row=rows.get()))
3713   {
3714     if (table->s->blob_fields)
3715     {
3716       memcpy(table->record[0],row->record,table->s->reclength);
3717       set_delayed_insert_blobs(table);
3718       free_delayed_insert_blobs(table);
3719     }
3720     delete row;
3721     thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3722     stacked_inserts--;
3723 #ifndef DBUG_OFF
3724     max_rows++;
3725 #endif
3726   }
3727   DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
3728   thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
3729   DBUG_RETURN(1);
3730 }
3731 #endif /* EMBEDDED_LIBRARY */
3732 
3733 /***************************************************************************
3734   Store records in INSERT ... SELECT *
3735 ***************************************************************************/
3736 
3737 
3738 /*
3739   make insert specific preparation and checks after opening tables
3740 
3741   SYNOPSIS
3742     mysql_insert_select_prepare()
3743     thd         thread handler
3744 
3745   RETURN
3746   0   OK
3747   > 0 Error
3748   < 0 Ok, ignore insert
3749 */
3750 
mysql_insert_select_prepare(THD * thd,select_result * sel_res)3751 int mysql_insert_select_prepare(THD *thd, select_result *sel_res)
3752 {
3753   int res;
3754   LEX *lex= thd->lex;
3755   SELECT_LEX *select_lex= lex->first_select_lex();
3756   DBUG_ENTER("mysql_insert_select_prepare");
3757 
3758   /*
3759     SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
3760     clause if table is VIEW
3761   */
3762 
3763   if ((res= mysql_prepare_insert(thd, lex->query_tables, lex->field_list, 0,
3764                                  lex->update_list, lex->value_list,
3765                                  lex->duplicates,
3766                                  &select_lex->where, TRUE)))
3767     DBUG_RETURN(res);
3768 
3769   /*
3770     If sel_res is not empty, it means we have items in returing_list.
3771     So we prepare the list now
3772   */
3773   if (sel_res)
3774     sel_res->prepare(lex->returning()->item_list, NULL);
3775 
3776   DBUG_ASSERT(select_lex->leaf_tables.elements != 0);
3777   List_iterator<TABLE_LIST> ti(select_lex->leaf_tables);
3778   TABLE_LIST *table;
3779   uint insert_tables;
3780 
3781   if (select_lex->first_cond_optimization)
3782   {
3783     /* Back up leaf_tables list. */
3784     Query_arena *arena= thd->stmt_arena, backup;
3785     arena= thd->activate_stmt_arena_if_needed(&backup);  // For easier test
3786 
3787     insert_tables= select_lex->insert_tables;
3788     while ((table= ti++) && insert_tables--)
3789     {
3790       select_lex->leaf_tables_exec.push_back(table);
3791       table->tablenr_exec= table->table->tablenr;
3792       table->map_exec= table->table->map;
3793       table->maybe_null_exec= table->table->maybe_null;
3794     }
3795     if (arena)
3796       thd->restore_active_arena(arena, &backup);
3797   }
3798   ti.rewind();
3799   /*
3800     exclude first table from leaf tables list, because it belong to
3801     INSERT
3802   */
3803   /* skip all leaf tables belonged to view where we are insert */
3804   insert_tables= select_lex->insert_tables;
3805   while ((table= ti++) && insert_tables--)
3806     ti.remove();
3807 
3808   DBUG_RETURN(0);
3809 }
3810 
3811 
select_insert(THD * thd_arg,TABLE_LIST * table_list_par,TABLE * table_par,List<Item> * fields_par,List<Item> * update_fields,List<Item> * update_values,enum_duplicates duplic,bool ignore_check_option_errors,select_result * result)3812 select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par,
3813                              TABLE *table_par,
3814                              List<Item> *fields_par,
3815                              List<Item> *update_fields,
3816                              List<Item> *update_values,
3817                              enum_duplicates duplic,
3818                              bool ignore_check_option_errors,
3819                              select_result *result):
3820   select_result_interceptor(thd_arg),
3821   sel_result(result),
3822   table_list(table_list_par), table(table_par), fields(fields_par),
3823   autoinc_value_of_last_inserted_row(0),
3824   insert_into_view(table_list_par && table_list_par->view != 0)
3825 {
3826   bzero((char*) &info,sizeof(info));
3827   info.handle_duplicates= duplic;
3828   info.ignore= ignore_check_option_errors;
3829   info.update_fields= update_fields;
3830   info.update_values= update_values;
3831   info.view= (table_list_par->view ? table_list_par : 0);
3832   info.table_list= table_list_par;
3833 }
3834 
3835 
3836 int
prepare(List<Item> & values,SELECT_LEX_UNIT * u)3837 select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3838 {
3839   LEX *lex= thd->lex;
3840   int res= 0;
3841   table_map map= 0;
3842   SELECT_LEX *lex_current_select_save= lex->current_select;
3843   DBUG_ENTER("select_insert::prepare");
3844 
3845   unit= u;
3846 
3847   /*
3848     Since table in which we are going to insert is added to the first
3849     select, LEX::current_select should point to the first select while
3850     we are fixing fields from insert list.
3851   */
3852   lex->current_select= lex->first_select_lex();
3853 
3854   res= (setup_returning_fields(thd, table_list) ||
3855         setup_fields(thd, Ref_ptr_array(), values, MARK_COLUMNS_READ, 0, 0,
3856                      0) ||
3857         check_insert_fields(thd, table_list, *fields, values,
3858                             !insert_into_view, 1, &map));
3859 
3860   if (!res && fields->elements)
3861   {
3862     Abort_on_warning_instant_set aws(thd,
3863                                      !info.ignore && thd->is_strict_mode());
3864     res= check_that_all_fields_are_given_values(thd, table_list->table,
3865                                                 table_list);
3866   }
3867 
3868   if (info.handle_duplicates == DUP_UPDATE && !res)
3869   {
3870     Name_resolution_context *context= &lex->first_select_lex()->context;
3871     Name_resolution_context_state ctx_state;
3872 
3873     /* Save the state of the current name resolution context. */
3874     ctx_state.save_state(context, table_list);
3875 
3876     /* Perform name resolution only in the first table - 'table_list'. */
3877     table_list->next_local= 0;
3878     context->resolve_in_table_list_only(table_list);
3879 
3880     lex->first_select_lex()->no_wrap_view_item= TRUE;
3881     res= res ||
3882       check_update_fields(thd, context->table_list,
3883                           *info.update_fields, *info.update_values,
3884                           /*
3885                             In INSERT SELECT ON DUPLICATE KEY UPDATE col=x
3886                             'x' can legally refer to a non-inserted table.
3887                             'x' is not even resolved yet.
3888                            */
3889                           true,
3890                           &map);
3891     lex->first_select_lex()->no_wrap_view_item= FALSE;
3892     /*
3893       When we are not using GROUP BY and there are no ungrouped
3894       aggregate functions we can refer to other tables in the ON
3895       DUPLICATE KEY part.  We use next_name_resolution_table
3896       descructively, so check it first (views?)
3897     */
3898     DBUG_ASSERT (!table_list->next_name_resolution_table);
3899     if (lex->first_select_lex()->group_list.elements == 0 &&
3900         !lex->first_select_lex()->with_sum_func)
3901     {
3902       /*
3903         We must make a single context out of the two separate name
3904         resolution contexts : the INSERT table and the tables in the
3905         SELECT part of INSERT ... SELECT.  To do that we must
3906         concatenate the two lists
3907       */
3908       table_list->next_name_resolution_table=
3909         ctx_state.get_first_name_resolution_table();
3910     }
3911 
3912     res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values,
3913                              MARK_COLUMNS_READ, 0, NULL, 0);
3914     if (!res)
3915     {
3916       /*
3917         Traverse the update values list and substitute fields from the
3918         select for references (Item_ref objects) to them. This is done in
3919         order to get correct values from those fields when the select
3920         employs a temporary table.
3921       */
3922       List_iterator<Item> li(*info.update_values);
3923       Item *item;
3924 
3925       while ((item= li++))
3926       {
3927         item->transform(thd, &Item::update_value_transformer,
3928                         (uchar*)lex->current_select);
3929       }
3930     }
3931 
3932     /* Restore the current context. */
3933     ctx_state.restore_state(context, table_list);
3934   }
3935 
3936   lex->current_select= lex_current_select_save;
3937   if (res)
3938     DBUG_RETURN(1);
3939   /*
3940     if it is INSERT into join view then check_insert_fields already found
3941     real table for insert
3942   */
3943   table= table_list->table;
3944 
3945   /*
3946     Is table which we are changing used somewhere in other parts of
3947     query
3948   */
3949   if (unique_table(thd, table_list, table_list->next_global, 0))
3950   {
3951     /* Using same table for INSERT and SELECT */
3952     lex->current_select->options|= OPTION_BUFFER_RESULT;
3953     lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
3954   }
3955   else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
3956            thd->locked_tables_mode <= LTM_LOCK_TABLES)
3957   {
3958     /*
3959       We must not yet prepare the result table if it is the same as one of the
3960       source tables (INSERT SELECT). The preparation may disable
3961       indexes on the result table, which may be used during the select, if it
3962       is the same table (Bug #6034). Do the preparation after the select phase
3963       in select_insert::prepare2().
3964       We won't start bulk inserts at all if this statement uses functions or
3965       should invoke triggers since they may access to the same table too.
3966     */
3967     table->file->ha_start_bulk_insert((ha_rows) 0);
3968   }
3969   restore_record(table,s->default_values);		// Get empty record
3970   table->reset_default_fields();
3971   table->next_number_field=table->found_next_number_field;
3972 
3973 #ifdef HAVE_REPLICATION
3974   if (thd->rgi_slave &&
3975       (info.handle_duplicates == DUP_UPDATE) &&
3976       (table->next_number_field != NULL) &&
3977       rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
3978     DBUG_RETURN(1);
3979 #endif
3980 
3981   thd->cuted_fields=0;
3982   bool create_lookup_handler= info.handle_duplicates != DUP_ERROR;
3983   if (info.ignore || info.handle_duplicates != DUP_ERROR)
3984   {
3985     create_lookup_handler= true;
3986     table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3987     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
3988     {
3989       if (table->file->ha_rnd_init_with_error(0))
3990         DBUG_RETURN(1);
3991     }
3992   }
3993   table->file->prepare_for_insert(create_lookup_handler);
3994   if (info.handle_duplicates == DUP_REPLACE &&
3995       (!table->triggers || !table->triggers->has_delete_triggers()))
3996     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3997   if (info.handle_duplicates == DUP_UPDATE)
3998     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3999   thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
4000   res= (table_list->prepare_where(thd, 0, TRUE) ||
4001         table_list->prepare_check_option(thd));
4002 
4003   if (!res)
4004   {
4005      table->prepare_triggers_for_insert_stmt_or_event();
4006      table->mark_columns_needed_for_insert();
4007   }
4008 
4009   DBUG_RETURN(res);
4010 }
4011 
4012 
4013 /*
4014   Finish the preparation of the result table.
4015 
4016   SYNOPSIS
4017     select_insert::prepare2()
4018     void
4019 
4020   DESCRIPTION
4021     If the result table is the same as one of the source tables
4022     (INSERT SELECT), the result table is not finally prepared at the
4023     join prepair phase.  Do the final preparation now.
4024 
4025   RETURN
4026     0   OK
4027 */
4028 
prepare2(JOIN *)4029 int select_insert::prepare2(JOIN *)
4030 {
4031   DBUG_ENTER("select_insert::prepare2");
4032   if (table->validate_default_values_of_unset_fields(thd))
4033     DBUG_RETURN(1);
4034   if (thd->lex->describe)
4035     DBUG_RETURN(0);
4036   if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
4037       thd->locked_tables_mode <= LTM_LOCK_TABLES)
4038     table->file->ha_start_bulk_insert((ha_rows) 0);
4039 
4040   /* Same as the other variants of INSERT */
4041   if (sel_result &&
4042       sel_result->send_result_set_metadata(thd->lex->returning()->item_list,
4043                                 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
4044     DBUG_RETURN(1);
4045   DBUG_RETURN(0);
4046 }
4047 
4048 
cleanup()4049 void select_insert::cleanup()
4050 {
4051   /* select_insert/select_create are never re-used in prepared statement */
4052   DBUG_ASSERT(0);
4053 }
4054 
~select_insert()4055 select_insert::~select_insert()
4056 {
4057   DBUG_ENTER("~select_insert");
4058   sel_result= NULL;
4059   if (table && table->is_created())
4060   {
4061     table->next_number_field=0;
4062     table->auto_increment_field_not_null= FALSE;
4063     table->file->ha_reset();
4064   }
4065   thd->count_cuted_fields= CHECK_FIELD_IGNORE;
4066   thd->abort_on_warning= 0;
4067   DBUG_VOID_RETURN;
4068 }
4069 
4070 
send_data(List<Item> & values)4071 int select_insert::send_data(List<Item> &values)
4072 {
4073   DBUG_ENTER("select_insert::send_data");
4074   bool error=0;
4075 
4076   thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
4077   store_values(values);
4078   if (table->default_field &&
4079       unlikely(table->update_default_fields(info.ignore)))
4080     DBUG_RETURN(1);
4081   thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
4082   if (unlikely(thd->is_error()))
4083   {
4084     table->auto_increment_field_not_null= FALSE;
4085     DBUG_RETURN(1);
4086   }
4087 
4088   table->vers_write= table->versioned();
4089   if (table_list)                               // Not CREATE ... SELECT
4090   {
4091     switch (table_list->view_check_option(thd, info.ignore)) {
4092     case VIEW_CHECK_SKIP:
4093       DBUG_RETURN(0);
4094     case VIEW_CHECK_ERROR:
4095       DBUG_RETURN(1);
4096     }
4097   }
4098 
4099   error= write_record(thd, table, &info, sel_result);
4100   table->vers_write= table->versioned();
4101   table->auto_increment_field_not_null= FALSE;
4102 
4103   if (likely(!error))
4104   {
4105     if (table->triggers || info.handle_duplicates == DUP_UPDATE)
4106     {
4107       /*
4108         Restore fields of the record since it is possible that they were
4109         changed by ON DUPLICATE KEY UPDATE clause.
4110 
4111         If triggers exist then whey can modify some fields which were not
4112         originally touched by INSERT ... SELECT, so we have to restore
4113         their original values for the next row.
4114       */
4115       restore_record(table, s->default_values);
4116     }
4117     if (table->next_number_field)
4118     {
4119       /*
4120         If no value has been autogenerated so far, we need to remember the
4121         value we just saw, we may need to send it to client in the end.
4122       */
4123       if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
4124         autoinc_value_of_last_inserted_row=
4125           table->next_number_field->val_int();
4126       /*
4127         Clear auto-increment field for the next record, if triggers are used
4128         we will clear it twice, but this should be cheap.
4129       */
4130       table->next_number_field->reset();
4131     }
4132   }
4133   DBUG_RETURN(error);
4134 }
4135 
4136 
store_values(List<Item> & values)4137 void select_insert::store_values(List<Item> &values)
4138 {
4139   DBUG_ENTER("select_insert::store_values");
4140 
4141   if (fields->elements)
4142     fill_record_n_invoke_before_triggers(thd, table, *fields, values, 1,
4143                                          TRG_EVENT_INSERT);
4144   else
4145     fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
4146                                          values, 1, TRG_EVENT_INSERT);
4147 
4148   DBUG_VOID_RETURN;
4149 }
4150 
prepare_eof()4151 bool select_insert::prepare_eof()
4152 {
4153   int error;
4154   bool const trans_table= table->file->has_transactions_and_rollback();
4155   bool changed;
4156   bool binary_logged= 0;
4157   killed_state killed_status= thd->killed;
4158 
4159   DBUG_ENTER("select_insert::prepare_eof");
4160   DBUG_PRINT("enter", ("trans_table: %d, table_type: '%s'",
4161                        trans_table, table->file->table_type()));
4162 
4163 #ifdef WITH_WSREP
4164   error= (thd->wsrep_cs().current_error()) ? -1 :
4165     (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
4166 #else
4167     error= (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
4168 #endif /* WITH_WSREP */
4169     table->file->ha_end_bulk_insert() : 0;
4170 
4171   if (likely(!error) && unlikely(thd->is_error()))
4172     error= thd->get_stmt_da()->sql_errno();
4173 
4174   if (info.ignore || info.handle_duplicates != DUP_ERROR)
4175       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
4176         table->file->ha_rnd_end();
4177   table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4178   table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4179 
4180   if (likely((changed= (info.copied || info.deleted || info.updated))))
4181   {
4182     /*
4183       We must invalidate the table in the query cache before binlog writing
4184       and ha_autocommit_or_rollback.
4185     */
4186     query_cache_invalidate3(thd, table, 1);
4187   }
4188 
4189   if (thd->transaction->stmt.modified_non_trans_table)
4190     thd->transaction->all.modified_non_trans_table= TRUE;
4191   thd->transaction->all.m_unsafe_rollback_flags|=
4192     (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
4193 
4194   DBUG_ASSERT(trans_table || !changed ||
4195               thd->transaction->stmt.modified_non_trans_table);
4196 
4197   /*
4198     Write to binlog before commiting transaction.  No statement will
4199     be written by the binlog_query() below in RBR mode.  All the
4200     events are in the transaction cache and will be written when
4201     ha_autocommit_or_rollback() is issued below.
4202   */
4203   if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
4204       (likely(!error) || thd->transaction->stmt.modified_non_trans_table))
4205   {
4206     int errcode= 0;
4207     int res;
4208     if (likely(!error))
4209       thd->clear_error();
4210     else
4211       errcode= query_error_code(thd, killed_status == NOT_KILLED);
4212     res= thd->binlog_query(THD::ROW_QUERY_TYPE,
4213                            thd->query(), thd->query_length(),
4214                            trans_table, FALSE, FALSE, errcode);
4215     if (res > 0)
4216     {
4217       table->file->ha_release_auto_increment();
4218       DBUG_RETURN(true);
4219     }
4220     binary_logged= res == 0 || !table->s->tmp_table;
4221   }
4222   table->s->table_creation_was_logged|= binary_logged;
4223   table->file->ha_release_auto_increment();
4224 
4225   if (unlikely(error))
4226   {
4227     table->file->print_error(error,MYF(0));
4228     DBUG_RETURN(true);
4229   }
4230 
4231   DBUG_RETURN(false);
4232 }
4233 
send_ok_packet()4234 bool select_insert::send_ok_packet() {
4235   char  message[160];                           /* status message */
4236   ulonglong row_count;                          /* rows affected */
4237   ulonglong id;                                 /* last insert-id */
4238   DBUG_ENTER("select_insert::send_ok_packet");
4239 
4240   if (info.ignore)
4241     my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4242                 (ulong) info.records, (ulong) (info.records - info.copied),
4243                 (long) thd->get_stmt_da()->current_statement_warn_count());
4244   else
4245     my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4246                 (ulong) info.records, (ulong) (info.deleted + info.updated),
4247                 (long) thd->get_stmt_da()->current_statement_warn_count());
4248 
4249   row_count= info.copied + info.deleted +
4250     ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
4251      info.touched : info.updated);
4252 
4253   id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
4254     thd->first_successful_insert_id_in_cur_stmt :
4255     (thd->arg_of_last_insert_id_function ?
4256      thd->first_successful_insert_id_in_prev_stmt :
4257      (info.copied ? autoinc_value_of_last_inserted_row : 0));
4258 
4259   /*
4260     Client expects an EOF/OK packet If LEX::has_returning and if result set
4261     meta was sent. See explanation for other variants of INSERT.
4262   */
4263   if (sel_result)
4264     sel_result->send_eof();
4265   else
4266     ::my_ok(thd, row_count, id, message);
4267 
4268   DBUG_RETURN(false);
4269 }
4270 
send_eof()4271 bool select_insert::send_eof()
4272 {
4273   bool res;
4274   DBUG_ENTER("select_insert::send_eof");
4275   res= (prepare_eof() || (!suppress_my_ok && send_ok_packet()));
4276   DBUG_RETURN(res);
4277 }
4278 
abort_result_set()4279 void select_insert::abort_result_set()
4280 {
4281   bool binary_logged= 0;
4282   DBUG_ENTER("select_insert::abort_result_set");
4283   /*
4284     If the creation of the table failed (due to a syntax error, for
4285     example), no table will have been opened and therefore 'table'
4286     will be NULL. In that case, we still need to execute the rollback
4287     and the end of the function.
4288 
4289     If it fail due to inability to insert in multi-table view for example,
4290     table will be assigned with view table structure, but that table will
4291     not be opened really (it is dummy to check fields types & Co).
4292    */
4293   if (table && table->file->is_open())
4294   {
4295     bool changed, transactional_table;
4296     /*
4297       If we are not in prelocked mode, we end the bulk insert started
4298       before.
4299     */
4300     if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4301       table->file->ha_end_bulk_insert();
4302 
4303     if (table->file->inited)
4304       table->file->ha_rnd_end();
4305     table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4306     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4307 
4308     /*
4309       If at least one row has been inserted/modified and will stay in
4310       the table (the table doesn't have transactions) we must write to
4311       the binlog (and the error code will make the slave stop).
4312 
4313       For many errors (example: we got a duplicate key error while
4314       inserting into a MyISAM table), no row will be added to the table,
4315       so passing the error to the slave will not help since there will
4316       be an error code mismatch (the inserts will succeed on the slave
4317       with no error).
4318 
4319       If table creation failed, the number of rows modified will also be
4320       zero, so no check for that is made.
4321     */
4322     changed= (info.copied || info.deleted || info.updated);
4323     transactional_table= table->file->has_transactions_and_rollback();
4324     if (thd->transaction->stmt.modified_non_trans_table ||
4325         thd->log_current_statement)
4326     {
4327         if (!can_rollback_data())
4328           thd->transaction->all.modified_non_trans_table= TRUE;
4329 
4330         if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4331         {
4332           int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4333           int res;
4334           /* error of writing binary log is ignored */
4335           res= thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
4336                                  thd->query_length(),
4337                                  transactional_table, FALSE, FALSE, errcode);
4338           binary_logged= res == 0 || !table->s->tmp_table;
4339         }
4340 	if (changed)
4341 	  query_cache_invalidate3(thd, table, 1);
4342     }
4343     DBUG_ASSERT(transactional_table || !changed ||
4344 		thd->transaction->stmt.modified_non_trans_table);
4345 
4346     table->s->table_creation_was_logged|= binary_logged;
4347     table->file->ha_release_auto_increment();
4348   }
4349 
4350   DBUG_VOID_RETURN;
4351 }
4352 
4353 
4354 /***************************************************************************
4355   CREATE TABLE (SELECT) ...
4356 ***************************************************************************/
4357 
create_field_for_create_select(MEM_ROOT * root,TABLE * table)4358 Field *Item::create_field_for_create_select(MEM_ROOT *root, TABLE *table)
4359 {
4360   static Tmp_field_param param(false, false, false, false);
4361   Tmp_field_src src;
4362   return create_tmp_field_ex(root, table, &src, &param);
4363 }
4364 
4365 
4366 /**
4367   Create table from lists of fields and items (or just return TABLE
4368   object for pre-opened existing table).
4369 
4370   @param thd           [in]     Thread object
4371   @param create_info   [in]     Create information (like MAX_ROWS, ENGINE or
4372                                 temporary table flag)
4373   @param create_table  [in]     Pointer to TABLE_LIST object providing database
4374                                 and name for table to be created or to be open
4375   @param alter_info    [in/out] Initial list of columns and indexes for the
4376                                 table to be created
4377   @param items         [in]     List of items which should be used to produce
4378                                 rest of fields for the table (corresponding
4379                                 fields will be added to the end of
4380                                 alter_info->create_list)
4381   @param lock          [out]    Pointer to the MYSQL_LOCK object for table
4382                                 created will be returned in this parameter.
4383                                 Since this table is not included in THD::lock
4384                                 caller is responsible for explicitly unlocking
4385                                 this table.
4386   @param hooks         [in]     Hooks to be invoked before and after obtaining
4387                                 table lock on the table being created.
4388 
4389   @note
4390     This function assumes that either table exists and was pre-opened and
4391     locked at open_and_lock_tables() stage (and in this case we just emit
4392     error or warning and return pre-opened TABLE object) or an exclusive
4393     metadata lock was acquired on table so we can safely create, open and
4394     lock table in it (we don't acquire metadata lock if this create is
4395     for temporary table).
4396 
4397   @note
4398     Since this function contains some logic specific to CREATE TABLE ...
4399     SELECT it should be changed before it can be used in other contexts.
4400 
4401   @retval non-zero  Pointer to TABLE object for table created or opened
4402   @retval 0         Error
4403 */
4404 
create_table_from_items(THD * thd,List<Item> * items,MYSQL_LOCK ** lock,TABLEOP_HOOKS * hooks)4405 TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
4406                                       MYSQL_LOCK **lock, TABLEOP_HOOKS *hooks)
4407 {
4408   TABLE tmp_table;		// Used during 'Create_field()'
4409   TABLE_SHARE share;
4410   TABLE *table= 0;
4411   uint select_field_count= items->elements;
4412   /* Add selected items to field list */
4413   List_iterator_fast<Item> it(*items);
4414   Item *item;
4415   bool save_table_creation_was_logged;
4416   DBUG_ENTER("select_create::create_table_from_items");
4417 
4418   tmp_table.s= &share;
4419   init_tmp_table_share(thd, &share, "", 0, "", "");
4420 
4421   tmp_table.s->db_create_options=0;
4422   tmp_table.null_row= 0;
4423   tmp_table.maybe_null= 0;
4424   tmp_table.in_use= thd;
4425 
4426   if (!opt_explicit_defaults_for_timestamp)
4427     promote_first_timestamp_column(&alter_info->create_list);
4428 
4429   if (create_info->fix_create_fields(thd, alter_info, *create_table))
4430     DBUG_RETURN(NULL);
4431 
4432   while ((item=it++))
4433   {
4434     Field *tmp_field= item->create_field_for_create_select(thd->mem_root,
4435                                                            &tmp_table);
4436 
4437     if (!tmp_field)
4438       DBUG_RETURN(NULL);
4439 
4440     Field *table_field;
4441 
4442     switch (item->type())
4443     {
4444     /*
4445       We have to take into account both the real table's fields and
4446       pseudo-fields used in trigger's body. These fields are used
4447       to copy defaults values later inside constructor of
4448       the class Create_field.
4449     */
4450     case Item::FIELD_ITEM:
4451     case Item::TRIGGER_FIELD_ITEM:
4452       table_field= ((Item_field *) item)->field;
4453       break;
4454     default:
4455       table_field= NULL;
4456     }
4457 
4458     Create_field *cr_field= new (thd->mem_root)
4459                                   Create_field(thd, tmp_field, table_field);
4460 
4461     if (!cr_field)
4462       DBUG_RETURN(NULL);
4463 
4464     if (item->maybe_null)
4465       cr_field->flags &= ~NOT_NULL_FLAG;
4466     alter_info->create_list.push_back(cr_field, thd->mem_root);
4467   }
4468 
4469   if (create_info->check_fields(thd, alter_info,
4470                                 create_table->table_name,
4471                                 create_table->db,
4472                                 select_field_count))
4473     DBUG_RETURN(NULL);
4474 
4475   DEBUG_SYNC(thd,"create_table_select_before_create");
4476 
4477   /* Check if LOCK TABLES + CREATE OR REPLACE of existing normal table*/
4478   if (thd->locked_tables_mode && create_table->table &&
4479       !create_info->tmp_table())
4480   {
4481     /* Remember information about the locked table */
4482     create_info->pos_in_locked_tables=
4483       create_table->table->pos_in_locked_tables;
4484     create_info->mdl_ticket= create_table->table->mdl_ticket;
4485   }
4486 
4487   /*
4488     Create and lock table.
4489 
4490     Note that we either creating (or opening existing) temporary table or
4491     creating base table on which name we have exclusive lock. So code below
4492     should not cause deadlocks or races.
4493 
4494     We don't log the statement, it will be logged later.
4495 
4496     If this is a HEAP table, the automatic DELETE FROM which is written to the
4497     binlog when a HEAP table is opened for the first time since startup, must
4498     not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
4499     don't want to delete from it) 2) it would be written before the CREATE
4500     TABLE, which is a wrong order. So we keep binary logging disabled when we
4501     open_table().
4502   */
4503 
4504   if (!mysql_create_table_no_lock(thd, &create_table->db,
4505                                   &create_table->table_name,
4506                                   create_info, alter_info, NULL,
4507                                   select_field_count, create_table))
4508   {
4509     DEBUG_SYNC(thd,"create_table_select_before_open");
4510 
4511     /*
4512       If we had a temporary table or a table used with LOCK TABLES,
4513       it was closed by mysql_create()
4514     */
4515     create_table->table= 0;
4516 
4517     if (!create_info->tmp_table())
4518     {
4519       Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
4520       TABLE_LIST::enum_open_strategy save_open_strategy;
4521 
4522       /* Force the newly created table to be opened */
4523       save_open_strategy= create_table->open_strategy;
4524       create_table->open_strategy= TABLE_LIST::OPEN_NORMAL;
4525       /*
4526         Here we open the destination table, on which we already have
4527         an exclusive metadata lock.
4528       */
4529       if (open_table(thd, create_table, &ot_ctx))
4530       {
4531         quick_rm_table(thd, create_info->db_type, &create_table->db,
4532                        table_case_name(create_info, &create_table->table_name),
4533                        0);
4534       }
4535       /* Restore */
4536       create_table->open_strategy= save_open_strategy;
4537     }
4538     else
4539     {
4540       /*
4541         The pointer to the newly created temporary table has been stored in
4542         table->create_info.
4543       */
4544       create_table->table= create_info->table;
4545       if (!create_table->table)
4546       {
4547         /*
4548           This shouldn't happen as creation of temporary table should make
4549           it preparable for open. Anyway we can't drop temporary table if
4550           we are unable to find it.
4551         */
4552         DBUG_ASSERT(0);
4553       }
4554     }
4555   }
4556   else
4557     create_table->table= 0;                     // Create failed
4558 
4559   if (unlikely(!(table= create_table->table)))
4560   {
4561     if (likely(!thd->is_error()))             // CREATE ... IF NOT EXISTS
4562       my_ok(thd);                             //   succeed, but did nothing
4563     DBUG_RETURN(NULL);
4564   }
4565 
4566   DEBUG_SYNC(thd,"create_table_select_before_lock");
4567 
4568   table->reginfo.lock_type=TL_WRITE;
4569   hooks->prelock(&table, 1);                    // Call prelock hooks
4570 
4571   /*
4572     Ensure that decide_logging_format(), called by mysql_lock_tables(), works
4573     with temporary tables that will be logged later if needed.
4574   */
4575   save_table_creation_was_logged= table->s->table_creation_was_logged;
4576   table->s->table_creation_was_logged= 1;
4577 
4578   /*
4579     mysql_lock_tables() below should never fail with request to reopen table
4580     since it won't wait for the table lock (we have exclusive metadata lock on
4581     the table) and thus can't get aborted.
4582   */
4583   if (unlikely(!((*lock)= mysql_lock_tables(thd, &table, 1, 0)) ||
4584                hooks->postlock(&table, 1)))
4585   {
4586     /* purecov: begin tested */
4587     /*
4588       This can happen in innodb when you get a deadlock when using same table
4589       in insert and select or when you run out of memory.
4590       It can also happen if there was a conflict in
4591       THD::decide_logging_format()
4592     */
4593     if (!thd->is_error())
4594       my_error(ER_CANT_LOCK, MYF(0), my_errno);
4595     if (*lock)
4596     {
4597       mysql_unlock_tables(thd, *lock);
4598       *lock= 0;
4599     }
4600     drop_open_table(thd, table, &create_table->db, &create_table->table_name);
4601     DBUG_RETURN(0);
4602     /* purecov: end */
4603   }
4604   table->s->table_creation_was_logged= save_table_creation_was_logged;
4605   if (!table->s->tmp_table)
4606     table->file->prepare_for_row_logging();
4607 
4608   /*
4609     If slave is converting a statement event to row events, log the original
4610     create statement as an annotated row
4611   */
4612 #ifdef HAVE_REPLICATION
4613   if (thd->slave_thread && opt_replicate_annotate_row_events &&
4614       thd->is_current_stmt_binlog_format_row())
4615     thd->variables.binlog_annotate_row_events= 1;
4616 #endif
4617   DBUG_RETURN(table);
4618 }
4619 
4620 
4621 int
prepare(List<Item> & _values,SELECT_LEX_UNIT * u)4622 select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
4623 {
4624   List<Item> values(_values, thd->mem_root);
4625   MYSQL_LOCK *extra_lock= NULL;
4626   DBUG_ENTER("select_create::prepare");
4627 
4628   TABLEOP_HOOKS *hook_ptr= NULL;
4629   /*
4630     For row-based replication, the CREATE-SELECT statement is written
4631     in two pieces: the first one contain the CREATE TABLE statement
4632     necessary to create the table and the second part contain the rows
4633     that should go into the table.
4634 
4635     For non-temporary tables, the start of the CREATE-SELECT
4636     implicitly commits the previous transaction, and all events
4637     forming the statement will be stored the transaction cache. At end
4638     of the statement, the entire statement is committed as a
4639     transaction, and all events are written to the binary log.
4640 
4641     On the master, the table is locked for the duration of the
4642     statement, but since the CREATE part is replicated as a simple
4643     statement, there is no way to lock the table for accesses on the
4644     slave.  Hence, we have to hold on to the CREATE part of the
4645     statement until the statement has finished.
4646    */
4647   class MY_HOOKS : public TABLEOP_HOOKS {
4648   public:
4649     MY_HOOKS(select_create *x, TABLE_LIST *create_table_arg,
4650              TABLE_LIST *select_tables_arg)
4651       : ptr(x),
4652         create_table(create_table_arg),
4653         select_tables(select_tables_arg)
4654       {
4655       }
4656 
4657   private:
4658     virtual int do_postlock(TABLE **tables, uint count)
4659     {
4660       int error;
4661       THD *thd= const_cast<THD*>(ptr->get_thd());
4662       TABLE_LIST *save_next_global= create_table->next_global;
4663 
4664       create_table->next_global= select_tables;
4665 
4666       error= thd->decide_logging_format(create_table);
4667 
4668       create_table->next_global= save_next_global;
4669 
4670       if (unlikely(error))
4671         return error;
4672 
4673       TABLE const *const table = *tables;
4674       if (thd->is_current_stmt_binlog_format_row() &&
4675           !table->s->tmp_table)
4676         return binlog_show_create_table(thd, *tables, ptr->create_info);
4677       return 0;
4678     }
4679     select_create *ptr;
4680     TABLE_LIST *create_table;
4681     TABLE_LIST *select_tables;
4682   };
4683 
4684   MY_HOOKS hooks(this, create_table, select_tables);
4685   hook_ptr= &hooks;
4686 
4687   unit= u;
4688 
4689   /*
4690     Start a statement transaction before the create if we are using
4691     row-based replication for the statement.  If we are creating a
4692     temporary table, we need to start a statement transaction.
4693   */
4694   if (!thd->lex->tmp_table() &&
4695       thd->is_current_stmt_binlog_format_row() &&
4696       mysql_bin_log.is_open())
4697   {
4698     thd->binlog_start_trans_and_stmt();
4699   }
4700 
4701   if (!(table= create_table_from_items(thd, &values, &extra_lock, hook_ptr)))
4702   {
4703     if (create_info->or_replace())
4704     {
4705       /* Original table was deleted. We have to log it */
4706       log_drop_table(thd, &create_table->db, &create_table->table_name,
4707                      thd->lex->tmp_table());
4708     }
4709 
4710     /* abort() deletes table */
4711     DBUG_RETURN(-1);
4712   }
4713 
4714   if (create_info->tmp_table())
4715   {
4716     /*
4717       When the temporary table was created & opened in create_table_impl(),
4718       the table's TABLE_SHARE (and thus TABLE) object was also linked to THD
4719       temporary tables lists. So, we must temporarily remove it from the
4720       list to keep them inaccessible from inner statements.
4721       e.g. CREATE TEMPORARY TABLE `t1` AS SELECT * FROM `t1`;
4722     */
4723     saved_tmp_table_share= thd->save_tmp_table_share(create_table->table);
4724   }
4725 
4726   if (extra_lock)
4727   {
4728     DBUG_ASSERT(m_plock == NULL);
4729 
4730     if (create_info->tmp_table())
4731       m_plock= &m_lock;
4732     else
4733       m_plock= &thd->extra_lock;
4734 
4735     *m_plock= extra_lock;
4736   }
4737 
4738   if (table->s->fields < values.elements)
4739   {
4740     my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
4741     DBUG_RETURN(-1);
4742   }
4743 
4744   /* First field to copy */
4745   field= table->field+table->s->fields;
4746 
4747   /* Mark all fields that are given values */
4748   for (uint n= values.elements; n; )
4749   {
4750     if ((*--field)->invisible >= INVISIBLE_SYSTEM)
4751       continue;
4752     n--;
4753     bitmap_set_bit(table->write_set, (*field)->field_index);
4754   }
4755 
4756   table->next_number_field=table->found_next_number_field;
4757 
4758   restore_record(table,s->default_values);      // Get empty record
4759   thd->cuted_fields=0;
4760   bool create_lookup_handler= info.handle_duplicates != DUP_ERROR;
4761   if (info.ignore || info.handle_duplicates != DUP_ERROR)
4762   {
4763     create_lookup_handler= true;
4764     table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
4765     if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
4766     {
4767       if (table->file->ha_rnd_init_with_error(0))
4768         DBUG_RETURN(1);
4769     }
4770   }
4771   table->file->prepare_for_insert(create_lookup_handler);
4772   if (info.handle_duplicates == DUP_REPLACE &&
4773       (!table->triggers || !table->triggers->has_delete_triggers()))
4774     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
4775   if (info.handle_duplicates == DUP_UPDATE)
4776     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
4777   if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4778     table->file->ha_start_bulk_insert((ha_rows) 0);
4779   thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
4780   if (check_that_all_fields_are_given_values(thd, table, table_list))
4781     DBUG_RETURN(1);
4782   table->mark_columns_needed_for_insert();
4783   table->file->extra(HA_EXTRA_WRITE_CACHE);
4784   // Mark table as used
4785   table->query_id= thd->query_id;
4786   DBUG_RETURN(0);
4787 }
4788 
4789 
binlog_show_create_table(THD * thd,TABLE * table,Table_specification_st * create_info)4790 static int binlog_show_create_table(THD *thd, TABLE *table,
4791                                     Table_specification_st *create_info)
4792 {
4793   /*
4794     Note 1: In RBR mode, we generate a CREATE TABLE statement for the
4795     created table by calling show_create_table().  In the event of an error,
4796     nothing should be written to the binary log, even if the table is
4797     non-transactional; therefore we pretend that the generated CREATE TABLE
4798     statement is for a transactional table.  The event will then be put in the
4799     transaction cache, and any subsequent events (e.g., table-map events and
4800     binrow events) will also be put there.  We can then use
4801     ha_autocommit_or_rollback() to either throw away the entire kaboodle of
4802     events, or write them to the binary log.
4803 
4804     We write the CREATE TABLE statement here and not in prepare()
4805     since there potentially are sub-selects or accesses to information
4806     schema that will do a close_thread_tables(), destroying the
4807     statement transaction cache.
4808   */
4809   DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
4810   StringBuffer<2048> query(system_charset_info);
4811   int result;
4812   TABLE_LIST tmp_table_list;
4813 
4814   tmp_table_list.reset();
4815   tmp_table_list.table = table;
4816 
4817   result= show_create_table(thd, &tmp_table_list, &query,
4818                             create_info, WITH_DB_NAME);
4819   DBUG_ASSERT(result == 0); /* show_create_table() always return 0 */
4820 
4821   if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4822   {
4823     int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4824     result= thd->binlog_query(THD::STMT_QUERY_TYPE,
4825                               query.ptr(), query.length(),
4826                               /* is_trans */ TRUE,
4827                               /* direct */ FALSE,
4828                               /* suppress_use */ FALSE,
4829                               errcode) > 0;
4830   }
4831 #ifdef WITH_WSREP
4832   if (thd->wsrep_trx().active())
4833   {
4834     WSREP_DEBUG("transaction already started for CTAS");
4835   }
4836   else
4837   {
4838     wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
4839   }
4840 #endif
4841   return result;
4842 }
4843 
4844 
4845 /**
4846    Log CREATE TABLE to binary log
4847 
4848    @param thd   Thread handler
4849    @param table Log create statement for this table
4850 
4851    This function is called from ALTER TABLE for a shared table converted
4852    to a not shared table.
4853 */
4854 
binlog_create_table(THD * thd,TABLE * table,bool replace)4855 bool binlog_create_table(THD *thd, TABLE *table, bool replace)
4856 {
4857   Table_specification_st create_info;
4858   bool result;
4859   ulonglong save_option_bits;
4860 
4861   /* Don't log temporary tables in row format */
4862   if (thd->variables.binlog_format == BINLOG_FORMAT_ROW &&
4863       table->s->tmp_table)
4864     return 0;
4865   if (!thd->binlog_table_should_be_logged(&table->s->db))
4866     return 0;
4867 
4868   /*
4869     We have to use ROW format to ensure that future row inserts will be
4870     logged
4871   */
4872   thd->set_current_stmt_binlog_format_row();
4873   table->file->prepare_for_row_logging();
4874 
4875   create_info.lex_start();
4876   save_option_bits= thd->variables.option_bits;
4877   if (replace)
4878     create_info.set(DDL_options_st::OPT_OR_REPLACE);
4879   /* Ensure we write ENGINE=xxx and CHARSET=... to binary log */
4880   create_info.used_fields|= (HA_CREATE_USED_ENGINE |
4881                              HA_CREATE_USED_DEFAULT_CHARSET);
4882   /* Ensure we write all engine options to binary log */
4883   create_info.used_fields|= HA_CREATE_PRINT_ALL_OPTIONS;
4884   result= binlog_show_create_table(thd, table, &create_info) != 0;
4885   thd->variables.option_bits= save_option_bits;
4886   return result;
4887 }
4888 
4889 
4890 /**
4891    Log DROP TABLE to binary log
4892 
4893    @param thd   Thread handler
4894    @param table Log create statement for this table
4895 
4896    This function is called from ALTER TABLE for a shared table converted
4897    to a not shared table.
4898 */
4899 
binlog_drop_table(THD * thd,TABLE * table)4900 bool binlog_drop_table(THD *thd, TABLE *table)
4901 {
4902   StringBuffer<2048> query(system_charset_info);
4903   /* Don't log temporary tables in row format */
4904   if (!table->s->table_creation_was_logged)
4905     return 0;
4906   if (!thd->binlog_table_should_be_logged(&table->s->db))
4907     return 0;
4908 
4909   query.append("DROP ");
4910   if (table->s->tmp_table)
4911     query.append("TEMPORARY ");
4912   query.append("TABLE IF EXISTS ");
4913   append_identifier(thd, &query, &table->s->db);
4914   query.append(".");
4915   append_identifier(thd, &query, &table->s->table_name);
4916 
4917   return thd->binlog_query(THD::STMT_QUERY_TYPE,
4918                            query.ptr(), query.length(),
4919                            /* is_trans */ TRUE,
4920                            /* direct */ FALSE,
4921                            /* suppress_use */ TRUE,
4922                            0) > 0;
4923 }
4924 
4925 
store_values(List<Item> & values)4926 void select_create::store_values(List<Item> &values)
4927 {
4928   fill_record_n_invoke_before_triggers(thd, table, field, values, 1,
4929                                        TRG_EVENT_INSERT);
4930 }
4931 
4932 
send_eof()4933 bool select_create::send_eof()
4934 {
4935   DBUG_ENTER("select_create::send_eof");
4936 
4937   /*
4938     The routine that writes the statement in the binary log
4939     is in select_insert::prepare_eof(). For that reason, we
4940     mark the flag at this point.
4941   */
4942   if (table->s->tmp_table)
4943     thd->transaction->stmt.mark_created_temp_table();
4944 
4945   if (thd->slave_thread)
4946     thd->variables.binlog_annotate_row_events= 0;
4947 
4948   if (prepare_eof())
4949   {
4950     abort_result_set();
4951     DBUG_RETURN(true);
4952   }
4953 
4954   if (table->s->tmp_table)
4955   {
4956     /*
4957       Now is good time to add the new table to THD temporary tables list.
4958       But, before that we need to check if same table got created by the sub-
4959       statement.
4960     */
4961     if (thd->find_tmp_table_share(table->s->table_cache_key.str,
4962                                   table->s->table_cache_key.length))
4963     {
4964       my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table->alias.c_ptr());
4965       abort_result_set();
4966       DBUG_RETURN(true);
4967     }
4968     else
4969     {
4970       DBUG_ASSERT(saved_tmp_table_share);
4971       thd->restore_tmp_table_share(saved_tmp_table_share);
4972     }
4973   }
4974 
4975   /*
4976     Do an implicit commit at end of statement for non-temporary
4977     tables.  This can fail, but we should unlock the table
4978     nevertheless.
4979   */
4980   if (!table->s->tmp_table)
4981   {
4982 #ifdef WITH_WSREP
4983     if (WSREP(thd) &&
4984         table->file->ht->db_type == DB_TYPE_INNODB)
4985     {
4986       if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
4987       {
4988         wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
4989       }
4990       DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
4991       WSREP_DEBUG("CTAS key append for trx: %" PRIu64 " thd %llu query %lld ",
4992                   thd->wsrep_trx_id(), thd->thread_id, thd->query_id);
4993 
4994       /*
4995         append table level exclusive key for CTAS
4996       */
4997       wsrep_key_arr_t key_arr= {0, 0};
4998       wsrep_prepare_keys_for_isolation(thd,
4999                                        create_table->db.str,
5000                                        create_table->table_name.str,
5001                                        table_list,
5002                                        &key_arr);
5003       int rcode= wsrep_thd_append_key(thd, key_arr.keys, key_arr.keys_len,
5004                                       WSREP_SERVICE_KEY_EXCLUSIVE);
5005       wsrep_keys_free(&key_arr);
5006       if (rcode)
5007       {
5008         DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
5009         WSREP_ERROR("Appending table key for CTAS failed: %s, %d",
5010                     (wsrep_thd_query(thd)) ?
5011                     wsrep_thd_query(thd) : "void", rcode);
5012         abort_result_set();
5013         DBUG_RETURN(true);
5014       }
5015       /* If commit fails, we should be able to reset the OK status. */
5016       thd->get_stmt_da()->set_overwrite_status(true);
5017     }
5018 #endif /* WITH_WSREP */
5019     trans_commit_stmt(thd);
5020     if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
5021       trans_commit_implicit(thd);
5022 #ifdef WITH_WSREP
5023     if (WSREP(thd))
5024     {
5025       thd->get_stmt_da()->set_overwrite_status(FALSE);
5026       mysql_mutex_lock(&thd->LOCK_thd_data);
5027       if (wsrep_current_error(thd))
5028       {
5029         WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s",
5030                     thd->thread_id,
5031                     wsrep_thd_transaction_state_str(thd), wsrep_thd_query(thd));
5032         mysql_mutex_unlock(&thd->LOCK_thd_data);
5033         abort_result_set();
5034         DBUG_RETURN(true);
5035       }
5036       mysql_mutex_unlock(&thd->LOCK_thd_data);
5037     }
5038 #endif /* WITH_WSREP */
5039   }
5040 
5041   /*
5042     exit_done must only be set after last potential call to
5043     abort_result_set().
5044   */
5045   exit_done= 1;                                 // Avoid double calls
5046 
5047   send_ok_packet();
5048 
5049   if (m_plock)
5050   {
5051     MYSQL_LOCK *lock= *m_plock;
5052     *m_plock= NULL;
5053     m_plock= NULL;
5054 
5055     if (create_info->pos_in_locked_tables)
5056     {
5057       /*
5058         If we are under lock tables, we have created a table that was
5059         originally locked. We should add back the lock to ensure that
5060         all tables in the thd->open_list are locked!
5061       */
5062       table->mdl_ticket= create_info->mdl_ticket;
5063 
5064       /* The following should never fail, except if out of memory */
5065       if (!thd->locked_tables_list.restore_lock(thd,
5066                                                 create_info->
5067                                                 pos_in_locked_tables,
5068                                                 table, lock))
5069         DBUG_RETURN(false);                     // ok
5070       /* Fail. Continue without locking the table */
5071     }
5072     mysql_unlock_tables(thd, lock);
5073   }
5074   DBUG_RETURN(false);
5075 }
5076 
5077 
abort_result_set()5078 void select_create::abort_result_set()
5079 {
5080   ulonglong save_option_bits;
5081   DBUG_ENTER("select_create::abort_result_set");
5082 
5083   /* Avoid double calls, could happen in case of out of memory on cleanup */
5084   if (exit_done)
5085     DBUG_VOID_RETURN;
5086   exit_done= 1;
5087 
5088   /*
5089     In select_insert::abort_result_set() we roll back the statement, including
5090     truncating the transaction cache of the binary log. To do this, we
5091     pretend that the statement is transactional, even though it might
5092     be the case that it was not.
5093 
5094     We roll back the statement prior to deleting the table and prior
5095     to releasing the lock on the table, since there might be potential
5096     for failure if the rollback is executed after the drop or after
5097     unlocking the table.
5098 
5099     We also roll back the statement regardless of whether the creation
5100     of the table succeeded or not, since we need to reset the binary
5101     log state.
5102 
5103     However if there was an original table that was deleted, as part of
5104     create or replace table, then we must log the statement.
5105   */
5106 
5107   save_option_bits= thd->variables.option_bits;
5108   thd->variables.option_bits&= ~OPTION_BIN_LOG;
5109   select_insert::abort_result_set();
5110   thd->transaction->stmt.modified_non_trans_table= FALSE;
5111   thd->variables.option_bits= save_option_bits;
5112 
5113   /* possible error of writing binary log is ignored deliberately */
5114   (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
5115 
5116   if (table)
5117   {
5118     bool tmp_table= table->s->tmp_table;
5119     bool table_creation_was_logged= (!tmp_table ||
5120                                      table->s->table_creation_was_logged);
5121     if (tmp_table)
5122     {
5123       DBUG_ASSERT(saved_tmp_table_share);
5124       thd->restore_tmp_table_share(saved_tmp_table_share);
5125     }
5126 
5127     if (table->file->inited &&
5128         (info.ignore || info.handle_duplicates != DUP_ERROR) &&
5129         (table->file->ha_table_flags() & HA_DUPLICATE_POS))
5130       table->file->ha_rnd_end();
5131     table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5132     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5133     table->auto_increment_field_not_null= FALSE;
5134 
5135     if (m_plock)
5136     {
5137       mysql_unlock_tables(thd, *m_plock);
5138       *m_plock= NULL;
5139       m_plock= NULL;
5140     }
5141 
5142     drop_open_table(thd, table, &create_table->db, &create_table->table_name);
5143     table=0;                                    // Safety
5144     if (thd->log_current_statement && mysql_bin_log.is_open())
5145     {
5146       /* Remove logging of drop, create + insert rows */
5147       binlog_reset_cache(thd);
5148       /* Original table was deleted. We have to log it */
5149       if (table_creation_was_logged)
5150         log_drop_table(thd, &create_table->db, &create_table->table_name,
5151                        tmp_table);
5152     }
5153   }
5154 
5155   if (create_info->table_was_deleted)
5156   {
5157     /* Unlock locked table that was dropped by CREATE. */
5158     (void) trans_rollback_stmt(thd);
5159     thd->locked_tables_list.unlock_locked_table(thd, create_info->mdl_ticket);
5160   }
5161 
5162   DBUG_VOID_RETURN;
5163 }
5164