1 /*
2    Copyright (c) 2000, 2016, Oracle and/or its affiliates.
3    Copyright (c) 2010, 2019, MariaDB Corporation
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA
17 */
18 
19 /* Insert of records */
20 
21 /*
22   INSERT DELAYED
23 
24   Insert delayed is distinguished from a normal insert by lock_type ==
25   TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
26   "delayed" table (delayed_get_table()), but falls back to
27   open_and_lock_tables() on error and proceeds as normal insert then.
28 
29   Opening a "delayed" table means to find a delayed insert thread that
30   has the table open already. If this fails, a new thread is created and
31   waited for to open and lock the table.
32 
33   If accessing the thread succeeded, in
34   Delayed_insert::get_local_table() the table of the thread is copied
35   for local use. A copy is required because the normal insert logic
36   works on a target table, but the other threads table object must not
37   be used. The insert logic uses the record buffer to create a record.
38   And the delayed insert thread uses the record buffer to pass the
39   record to the table handler. So there must be different objects. Also
40   the copied table is not included in the lock, so that the statement
41   can proceed even if the real table cannot be accessed at this moment.
42 
43   Copying a table object is not a trivial operation. Besides the TABLE
44   object there are the field pointer array, the field objects and the
45   record buffer. After copying the field objects, their pointers into
46   the record must be "moved" to point to the new record buffer.
47 
48   After this setup the normal insert logic is used. Only that for
49   delayed inserts write_delayed() is called instead of write_record().
50   It inserts the rows into a queue and signals the delayed insert thread
51   instead of writing directly to the table.
52 
53   The delayed insert thread awakes from the signal. It locks the table,
54   inserts the rows from the queue, unlocks the table, and waits for the
55   next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
56 
57 */
58 
59 #include "mariadb.h"                 /* NO_EMBEDDED_ACCESS_CHECKS */
60 #include "sql_priv.h"
61 #include "sql_insert.h"
62 #include "sql_update.h"                         // compare_record
63 #include "sql_base.h"                           // close_thread_tables
64 #include "sql_cache.h"                          // query_cache_*
65 #include "key.h"                                // key_copy
66 #include "lock.h"                               // mysql_unlock_tables
67 #include "sp_head.h"
68 #include "sql_view.h"         // check_key_in_view, insert_view_fields
69 #include "sql_table.h"        // mysql_create_table_no_lock
70 #include "sql_acl.h"          // *_ACL, check_grant_all_columns
71 #include "sql_trigger.h"
72 #include "sql_select.h"
73 #include "sql_show.h"
74 #include "slave.h"
75 #include "sql_parse.h"                          // end_active_trans
76 #include "rpl_mi.h"
77 #include "transaction.h"
78 #include "sql_audit.h"
79 #include "sql_derived.h"                        // mysql_handle_derived
80 #include "sql_prepare.h"
81 #include <my_bit.h>
82 
83 #include "debug_sync.h"
84 
85 #ifndef EMBEDDED_LIBRARY
86 static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
87                               TABLE_LIST *table_list);
88 static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
89                          LEX_STRING query, bool ignore, bool log_on);
90 static void end_delayed_insert(THD *thd);
91 pthread_handler_t handle_delayed_insert(void *arg);
92 static void unlink_blobs(TABLE *table);
93 #endif
94 static bool check_view_insertability(THD *thd, TABLE_LIST *view);
95 
96 /*
97   Check that insert/update fields are from the same single table of a view.
98 
99   @param fields            The insert/update fields to be checked.
100   @param values            The insert/update values to be checked, NULL if
101   checking is not wanted.
102   @param view              The view for insert.
103   @param map     [in/out]  The insert table map.
104 
105   This function is called in 2 cases:
106     1. to check insert fields. In this case *map will be set to 0.
107        Insert fields are checked to be all from the same single underlying
108        table of the given view. Otherwise the error is thrown. Found table
109        map is returned in the map parameter.
110     2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
111        In this case *map contains table_map found on the previous call of
112        the function to check insert fields. Update fields are checked to be
113        from the same table as the insert fields.
114 
115   @returns false if success.
116 */
117 
check_view_single_update(List<Item> & fields,List<Item> * values,TABLE_LIST * view,table_map * map,bool insert)118 static bool check_view_single_update(List<Item> &fields, List<Item> *values,
119                                      TABLE_LIST *view, table_map *map,
120                                      bool insert)
121 {
122   /* it is join view => we need to find the table for update */
123   List_iterator_fast<Item> it(fields);
124   Item *item;
125   TABLE_LIST *tbl= 0;            // reset for call to check_single_table()
126   table_map tables= 0;
127 
128   while ((item= it++))
129     tables|= item->used_tables();
130 
131   /*
132     Check that table is only one
133     (we can not rely on check_single_table because it skips some
134     types of tables)
135   */
136   if (my_count_bits(tables) > 1)
137     goto error;
138 
139   if (values)
140   {
141     it.init(*values);
142     while ((item= it++))
143       tables|= item->view_used_tables(view);
144   }
145 
146   /* Convert to real table bits */
147   tables&= ~PSEUDO_TABLE_BITS;
148 
149   /* Check found map against provided map */
150   if (*map)
151   {
152     if (tables != *map)
153       goto error;
154     return FALSE;
155   }
156 
157   if (view->check_single_table(&tbl, tables, view) || tbl == 0)
158     goto error;
159 
160   /* view->table should have been set in mysql_derived_merge_for_insert */
161   DBUG_ASSERT(view->table);
162 
163   /*
164     Use buffer for the insert values that was allocated for the merged view.
165   */
166   tbl->table->insert_values= view->table->insert_values;
167   view->table= tbl->table;
168   if (!tbl->single_table_updatable())
169   {
170     if (insert)
171       my_error(ER_NON_INSERTABLE_TABLE, MYF(0), view->alias.str, "INSERT");
172     else
173       my_error(ER_NON_UPDATABLE_TABLE, MYF(0), view->alias.str, "UPDATE");
174     return TRUE;
175   }
176   *map= tables;
177 
178   return FALSE;
179 
180 error:
181   my_error(ER_VIEW_MULTIUPDATE, MYF(0),
182            view->view_db.str, view->view_name.str);
183   return TRUE;
184 }
185 
186 
187 /*
188   Check if insert fields are correct.
189 
190   @param thd            The current thread.
191   @param table_list     The table we are inserting into (may be view)
192   @param fields         The insert fields.
193   @param values         The insert values.
194   @param check_unique   If duplicate values should be rejected.
195   @param fields_and_values_from_different_maps If 'values' are allowed to
196   refer to other tables than those of 'fields'
197   @param map            See check_view_single_update
198 
199   @returns 0 if success, -1 if error
200 */
201 
check_insert_fields(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<Item> & values,bool check_unique,bool fields_and_values_from_different_maps,table_map * map)202 static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
203                                List<Item> &fields, List<Item> &values,
204                                bool check_unique,
205                                bool fields_and_values_from_different_maps,
206                                table_map *map)
207 {
208   TABLE *table= table_list->table;
209   DBUG_ENTER("check_insert_fields");
210 
211   if (!table_list->single_table_updatable())
212   {
213     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
214     DBUG_RETURN(-1);
215   }
216 
217   if (fields.elements == 0 && values.elements != 0)
218   {
219     if (!table)
220     {
221       my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
222                table_list->view_db.str, table_list->view_name.str);
223       DBUG_RETURN(-1);
224     }
225     if (values.elements != table->s->visible_fields)
226     {
227       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
228       DBUG_RETURN(-1);
229     }
230 #ifndef NO_EMBEDDED_ACCESS_CHECKS
231     Field_iterator_table_ref field_it;
232     field_it.set(table_list);
233     if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
234       DBUG_RETURN(-1);
235 #endif
236     /*
237       No fields are provided so all fields must be provided in the values.
238       Thus we set all bits in the write set.
239     */
240     bitmap_set_all(table->write_set);
241   }
242   else
243   {						// Part field list
244     SELECT_LEX *select_lex= &thd->lex->select_lex;
245     Name_resolution_context *context= &select_lex->context;
246     Name_resolution_context_state ctx_state;
247     int res;
248 
249     if (fields.elements != values.elements)
250     {
251       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
252       DBUG_RETURN(-1);
253     }
254 
255     thd->dup_field= 0;
256     select_lex->no_wrap_view_item= TRUE;
257 
258     /* Save the state of the current name resolution context. */
259     ctx_state.save_state(context, table_list);
260 
261     /*
262       Perform name resolution only in the first table - 'table_list',
263       which is the table that is inserted into.
264     */
265     table_list->next_local= 0;
266     context->resolve_in_table_list_only(table_list);
267     /* 'Unfix' fields to allow correct marking by the setup_fields function. */
268     if (table_list->is_view())
269       unfix_fields(fields);
270 
271     res= setup_fields(thd, Ref_ptr_array(),
272                       fields, MARK_COLUMNS_WRITE, 0, NULL, 0);
273 
274     /* Restore the current context. */
275     ctx_state.restore_state(context, table_list);
276     thd->lex->select_lex.no_wrap_view_item= FALSE;
277 
278     if (res)
279       DBUG_RETURN(-1);
280 
281     if (table_list->is_view() && table_list->is_merged_derived())
282     {
283       if (check_view_single_update(fields,
284                                    fields_and_values_from_different_maps ?
285                                    (List<Item>*) 0 : &values,
286                                    table_list, map, true))
287         DBUG_RETURN(-1);
288       table= table_list->table;
289     }
290 
291     if (check_unique && thd->dup_field)
292     {
293       my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0),
294                thd->dup_field->field_name.str);
295       DBUG_RETURN(-1);
296     }
297   }
298   // For the values we need select_priv
299 #ifndef NO_EMBEDDED_ACCESS_CHECKS
300   table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
301 #endif
302 
303   if (check_key_in_view(thd, table_list) ||
304       (table_list->view &&
305        check_view_insertability(thd, table_list)))
306   {
307     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
308     DBUG_RETURN(-1);
309   }
310 
311   DBUG_RETURN(0);
312 }
313 
has_no_default_value(THD * thd,Field * field,TABLE_LIST * table_list)314 static bool has_no_default_value(THD *thd, Field *field, TABLE_LIST *table_list)
315 {
316   if ((field->flags & NO_DEFAULT_VALUE_FLAG) && field->real_type() != MYSQL_TYPE_ENUM)
317   {
318     bool view= false;
319     if (table_list)
320     {
321       table_list= table_list->top_table();
322       view= table_list->view != NULL;
323     }
324     if (view)
325     {
326       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_VIEW_FIELD,
327                           ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
328                           table_list->view_db.str, table_list->view_name.str);
329     }
330     else
331     {
332       push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_FIELD,
333                           ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
334                           field->field_name.str);
335     }
336     return thd->really_abort_on_warning();
337   }
338   return false;
339 }
340 
341 
342 /**
343   Check if update fields are correct.
344 
345   @param thd                  The current thread.
346   @param insert_table_list    The table we are inserting into (may be view)
347   @param update_fields        The update fields.
348   @param update_values        The update values.
349   @param fields_and_values_from_different_maps If 'update_values' are allowed to
350   refer to other tables than those of 'update_fields'
351   @param map                  See check_view_single_update
352 
353   @note
354   If the update fields include an autoinc field, set the
355   table->next_number_field_updated flag.
356 
357   @returns 0 if success, -1 if error
358 */
359 
check_update_fields(THD * thd,TABLE_LIST * insert_table_list,List<Item> & update_fields,List<Item> & update_values,bool fields_and_values_from_different_maps,table_map * map)360 static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
361                                List<Item> &update_fields,
362                                List<Item> &update_values,
363                                bool fields_and_values_from_different_maps,
364                                table_map *map)
365 {
366   TABLE *table= insert_table_list->table;
367   my_bool UNINIT_VAR(autoinc_mark);
368 
369   table->next_number_field_updated= FALSE;
370 
371   if (table->found_next_number_field)
372   {
373     /*
374       Unmark the auto_increment field so that we can check if this is modified
375       by update_fields
376     */
377     autoinc_mark= bitmap_test_and_clear(table->write_set,
378                                         table->found_next_number_field->
379                                         field_index);
380   }
381 
382   /* Check the fields we are going to modify */
383   if (setup_fields(thd, Ref_ptr_array(),
384                    update_fields, MARK_COLUMNS_WRITE, 0, NULL, 0))
385     return -1;
386 
387   if (insert_table_list->is_view() &&
388       insert_table_list->is_merged_derived() &&
389       check_view_single_update(update_fields,
390                                fields_and_values_from_different_maps ?
391                                (List<Item>*) 0 : &update_values,
392                                insert_table_list, map, false))
393     return -1;
394 
395   if (table->default_field)
396     table->mark_default_fields_for_write(FALSE);
397 
398   if (table->found_next_number_field)
399   {
400     if (bitmap_is_set(table->write_set,
401                       table->found_next_number_field->field_index))
402       table->next_number_field_updated= TRUE;
403 
404     if (autoinc_mark)
405       bitmap_set_bit(table->write_set,
406                      table->found_next_number_field->field_index);
407   }
408 
409   return 0;
410 }
411 
412 /**
413   Upgrade table-level lock of INSERT statement to TL_WRITE if
414   a more concurrent lock is infeasible for some reason. This is
415   necessary for engines without internal locking support (MyISAM).
416   An engine with internal locking implementation might later
417   downgrade the lock in handler::store_lock() method.
418 */
419 
420 static
upgrade_lock_type(THD * thd,thr_lock_type * lock_type,enum_duplicates duplic)421 void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
422                        enum_duplicates duplic)
423 {
424   if (duplic == DUP_UPDATE ||
425       (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
426   {
427     *lock_type= TL_WRITE_DEFAULT;
428     return;
429   }
430 
431   if (*lock_type == TL_WRITE_DELAYED)
432   {
433     /*
434       We do not use delayed threads if:
435       - we're running in the safe mode or skip-new mode -- the
436         feature is disabled in these modes
437       - we're executing this statement on a replication slave --
438         we need to ensure serial execution of queries on the
439         slave
440       - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
441         insert cannot be concurrent
442       - this statement is directly or indirectly invoked from
443         a stored function or trigger (under pre-locking) - to
444         avoid deadlocks, since INSERT DELAYED involves a lock
445         upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
446         attempt while keeping other table level locks.
447       - this statement itself may require pre-locking.
448         We should upgrade the lock even though in most cases
449         delayed functionality may work. Unfortunately, we can't
450         easily identify whether the subject table is not used in
451         the statement indirectly via a stored function or trigger:
452         if it is used, that will lead to a deadlock between the
453         client connection and the delayed thread.
454     */
455     if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
456         thd->variables.max_insert_delayed_threads == 0 ||
457         thd->locked_tables_mode > LTM_LOCK_TABLES ||
458         thd->lex->uses_stored_routines() /*||
459         thd->lex->describe*/)
460     {
461       *lock_type= TL_WRITE;
462       return;
463     }
464     if (thd->slave_thread)
465     {
466       /* Try concurrent insert */
467       *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
468                   TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
469       return;
470     }
471 
472     bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
473     if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
474         log_on && mysql_bin_log.is_open())
475     {
476       /*
477         Statement-based binary logging does not work in this case, because:
478         a) two concurrent statements may have their rows intermixed in the
479         queue, leading to autoincrement replication problems on slave (because
480         the values generated used for one statement don't depend only on the
481         value generated for the first row of this statement, so are not
482         replicable)
483         b) if first row of the statement has an error the full statement is
484         not binlogged, while next rows of the statement may be inserted.
485         c) if first row succeeds, statement is binlogged immediately with a
486         zero error code (i.e. "no error"), if then second row fails, query
487         will fail on slave too and slave will stop (wrongly believing that the
488         master got no error).
489         So we fallback to non-delayed INSERT.
490         Note that to be fully correct, we should test the "binlog format which
491         the delayed thread is going to use for this row". But in the common case
492         where the global binlog format is not changed and the session binlog
493         format may be changed, that is equal to the global binlog format.
494         We test it without mutex for speed reasons (condition rarely true), and
495         in the common case (global not changed) it is as good as without mutex;
496         if global value is changed, anyway there is uncertainty as the delayed
497         thread may be old and use the before-the-change value.
498       */
499       *lock_type= TL_WRITE;
500     }
501   }
502 }
503 
504 
505 /**
506   Find or create a delayed insert thread for the first table in
507   the table list, then open and lock the remaining tables.
508   If a table can not be used with insert delayed, upgrade the lock
509   and open and lock all tables using the standard mechanism.
510 
511   @param thd         thread context
512   @param table_list  list of "descriptors" for tables referenced
513                      directly in statement SQL text.
514                      The first element in the list corresponds to
515                      the destination table for inserts, remaining
516                      tables, if any, are usually tables referenced
517                      by sub-queries in the right part of the
518                      INSERT.
519 
520   @return Status of the operation. In case of success 'table'
521   member of every table_list element points to an instance of
522   class TABLE.
523 
524   @sa open_and_lock_tables for more information about MySQL table
525   level locking
526 */
527 
528 static
open_and_lock_for_insert_delayed(THD * thd,TABLE_LIST * table_list)529 bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
530 {
531   MDL_request protection_request;
532   DBUG_ENTER("open_and_lock_for_insert_delayed");
533 
534 #ifndef EMBEDDED_LIBRARY
535   /* INSERT DELAYED is not allowed in a read only transaction. */
536   if (thd->tx_read_only)
537   {
538     my_error(ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION, MYF(0));
539     DBUG_RETURN(true);
540   }
541 
542   /*
543     In order for the deadlock detector to be able to find any deadlocks
544     caused by the handler thread waiting for GRL or this table, we acquire
545     protection against GRL (global IX metadata lock) and metadata lock on
546     table to being inserted into inside the connection thread.
547     If this goes ok, the tickets are cloned and added to the list of granted
548     locks held by the handler thread.
549   */
550   if (thd->global_read_lock.can_acquire_protection())
551     DBUG_RETURN(TRUE);
552 
553   protection_request.init(MDL_key::GLOBAL, "", "", MDL_INTENTION_EXCLUSIVE,
554                           MDL_STATEMENT);
555 
556   if (thd->mdl_context.acquire_lock(&protection_request,
557                                     thd->variables.lock_wait_timeout))
558     DBUG_RETURN(TRUE);
559 
560   if (thd->mdl_context.acquire_lock(&table_list->mdl_request,
561                                     thd->variables.lock_wait_timeout))
562     /*
563       If a lock can't be acquired, it makes no sense to try normal insert.
564       Therefore we just abort the statement.
565     */
566     DBUG_RETURN(TRUE);
567 
568   bool error= FALSE;
569   if (delayed_get_table(thd, &protection_request, table_list))
570     error= TRUE;
571   else if (table_list->table)
572   {
573     /*
574       Open tables used for sub-selects or in stored functions, will also
575       cache these functions.
576     */
577     if (open_and_lock_tables(thd, table_list->next_global, TRUE, 0))
578     {
579       end_delayed_insert(thd);
580       error= TRUE;
581     }
582     else
583     {
584       /*
585         First table was not processed by open_and_lock_tables(),
586         we need to set updatability flag "by hand".
587       */
588       if (!table_list->derived && !table_list->view)
589         table_list->updatable= 1;  // usual table
590     }
591   }
592 
593   /*
594     We can't release protection against GRL and metadata lock on the table
595     being inserted into here. These locks might be required, for example,
596     because this INSERT DELAYED calls functions which may try to update
597     this or another tables (updating the same table is of course illegal,
598     but such an attempt can be discovered only later during statement
599     execution).
600   */
601 
602   /*
603     Reset the ticket in case we end up having to use normal insert and
604     therefore will reopen the table and reacquire the metadata lock.
605   */
606   table_list->mdl_request.ticket= NULL;
607 
608   if (error || table_list->table)
609     DBUG_RETURN(error);
610 #endif
611   /*
612     * This is embedded library and we don't have auxiliary
613     threads OR
614     * a lock upgrade was requested inside delayed_get_table
615       because
616       - there are too many delayed insert threads OR
617       - the table has triggers.
618     Use a normal insert.
619   */
620   table_list->lock_type= TL_WRITE;
621   DBUG_RETURN(open_and_lock_tables(thd, table_list, TRUE, 0));
622 }
623 
624 
625 /**
626   Create a new query string for removing DELAYED keyword for
627   multi INSERT DEALAYED statement.
628 
629   @param[in] thd                 Thread handler
630   @param[in] buf                 Query string
631 
632   @return
633              0           ok
634              1           error
635 */
636 static int
create_insert_stmt_from_insert_delayed(THD * thd,String * buf)637 create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
638 {
639   /* Make a copy of thd->query() and then remove the "DELAYED" keyword */
640   if (buf->append(thd->query()) ||
641       buf->replace(thd->lex->keyword_delayed_begin_offset,
642                    thd->lex->keyword_delayed_end_offset -
643                    thd->lex->keyword_delayed_begin_offset, 0))
644     return 1;
645   return 0;
646 }
647 
648 
save_insert_query_plan(THD * thd,TABLE_LIST * table_list)649 static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
650 {
651   Explain_insert* explain= new (thd->mem_root) Explain_insert(thd->mem_root);
652   explain->table_name.append(table_list->table->alias);
653 
654   thd->lex->explain->add_insert_plan(explain);
655 
656   /* See Update_plan::updating_a_view for details */
657   bool skip= MY_TEST(table_list->view);
658 
659   /* Save subquery children */
660   for (SELECT_LEX_UNIT *unit= thd->lex->select_lex.first_inner_unit();
661        unit;
662        unit= unit->next_unit())
663   {
664     if (skip)
665     {
666       skip= false;
667       continue;
668     }
669     /*
670       Table elimination doesn't work for INSERTS, but let's still have this
671       here for consistency
672     */
673     if (!(unit->item && unit->item->eliminated))
674       explain->add_child(unit->first_select()->select_number);
675   }
676 }
677 
678 
field_to_fill()679 Field **TABLE::field_to_fill()
680 {
681   return triggers && triggers->nullable_fields() ? triggers->nullable_fields() : field;
682 }
683 
684 
685 /**
686   INSERT statement implementation
687 
688   @note Like implementations of other DDL/DML in MySQL, this function
689   relies on the caller to close the thread tables. This is done in the
690   end of dispatch_command().
691 */
692 
mysql_insert(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<List_item> & values_list,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,bool ignore)693 bool mysql_insert(THD *thd,TABLE_LIST *table_list,
694                   List<Item> &fields,
695                   List<List_item> &values_list,
696                   List<Item> &update_fields,
697                   List<Item> &update_values,
698                   enum_duplicates duplic,
699 		  bool ignore)
700 {
701   bool retval= true;
702   int error, res;
703   bool transactional_table, joins_freed= FALSE;
704   bool changed;
705   const bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
706   bool using_bulk_insert= 0;
707   uint value_count;
708   ulong counter = 1;
709   /* counter of iteration in bulk PS operation*/
710   ulonglong iteration= 0;
711   ulonglong id;
712   COPY_INFO info;
713   TABLE *table= 0;
714   List_iterator_fast<List_item> its(values_list);
715   List_item *values;
716   Name_resolution_context *context;
717   Name_resolution_context_state ctx_state;
718 #ifndef EMBEDDED_LIBRARY
719   char *query= thd->query();
720   /*
721     log_on is about delayed inserts only.
722     By default, both logs are enabled (this won't cause problems if the server
723     runs without --log-bin).
724   */
725   bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
726 #endif
727   thr_lock_type lock_type;
728   Item *unused_conds= 0;
729   DBUG_ENTER("mysql_insert");
730 
731   create_explain_query(thd->lex, thd->mem_root);
732   /*
733     Upgrade lock type if the requested lock is incompatible with
734     the current connection mode or table operation.
735   */
736   upgrade_lock_type(thd, &table_list->lock_type, duplic);
737 
738   /*
739     We can't write-delayed into a table locked with LOCK TABLES:
740     this will lead to a deadlock, since the delayed thread will
741     never be able to get a lock on the table.
742   */
743   if (table_list->lock_type == TL_WRITE_DELAYED &&
744       thd->locked_tables_mode &&
745       find_locked_table(thd->open_tables, table_list->db.str,
746                         table_list->table_name.str))
747   {
748     my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
749              table_list->table_name.str);
750     DBUG_RETURN(TRUE);
751   }
752 
753   if (table_list->lock_type == TL_WRITE_DELAYED)
754   {
755     if (open_and_lock_for_insert_delayed(thd, table_list))
756       DBUG_RETURN(TRUE);
757   }
758   else
759   {
760     if (open_and_lock_tables(thd, table_list, TRUE, 0))
761       DBUG_RETURN(TRUE);
762   }
763 
764   THD_STAGE_INFO(thd, stage_init_update);
765   lock_type= table_list->lock_type;
766   thd->lex->used_tables=0;
767   values= its++;
768   if (bulk_parameters_set(thd))
769     DBUG_RETURN(TRUE);
770   value_count= values->elements;
771 
772   if (mysql_prepare_insert(thd, table_list, table, fields, values,
773 			   update_fields, update_values, duplic, &unused_conds,
774                            FALSE))
775     goto abort;
776 
777   /* mysql_prepare_insert sets table_list->table if it was not set */
778   table= table_list->table;
779 
780   context= &thd->lex->select_lex.context;
781   /*
782     These three asserts test the hypothesis that the resetting of the name
783     resolution context below is not necessary at all since the list of local
784     tables for INSERT always consists of one table.
785   */
786   DBUG_ASSERT(!table_list->next_local);
787   DBUG_ASSERT(!context->table_list->next_local);
788   DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
789 
790   /* Save the state of the current name resolution context. */
791   ctx_state.save_state(context, table_list);
792 
793   /*
794     Perform name resolution only in the first table - 'table_list',
795     which is the table that is inserted into.
796   */
797   table_list->next_local= 0;
798   context->resolve_in_table_list_only(table_list);
799   switch_to_nullable_trigger_fields(*values, table);
800 
801   while ((values= its++))
802   {
803     counter++;
804     if (values->elements != value_count)
805     {
806       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
807       goto abort;
808     }
809     if (setup_fields(thd, Ref_ptr_array(),
810                      *values, MARK_COLUMNS_READ, 0, NULL, 0))
811       goto abort;
812     switch_to_nullable_trigger_fields(*values, table);
813   }
814   its.rewind ();
815 
816   /* Restore the current context. */
817   ctx_state.restore_state(context, table_list);
818 
819   if (thd->lex->unit.first_select()->optimize_unflattened_subqueries(false))
820   {
821     goto abort;
822   }
823   save_insert_query_plan(thd, table_list);
824   if (thd->lex->describe)
825   {
826     retval= thd->lex->explain->send_explain(thd);
827     goto abort;
828   }
829 
830   /*
831     Fill in the given fields and dump it to the table file
832   */
833   bzero((char*) &info,sizeof(info));
834   info.ignore= ignore;
835   info.handle_duplicates=duplic;
836   info.update_fields= &update_fields;
837   info.update_values= &update_values;
838   info.view= (table_list->view ? table_list : 0);
839   info.table_list= table_list;
840 
841   /*
842     Count warnings for all inserts.
843     For single line insert, generate an error if try to set a NOT NULL field
844     to NULL.
845   */
846   thd->count_cuted_fields= ((values_list.elements == 1 &&
847                              !ignore) ?
848 			    CHECK_FIELD_ERROR_FOR_NULL :
849 			    CHECK_FIELD_WARN);
850   thd->cuted_fields = 0L;
851   table->next_number_field=table->found_next_number_field;
852 
853 #ifdef HAVE_REPLICATION
854   if (thd->rgi_slave &&
855       (info.handle_duplicates == DUP_UPDATE) &&
856       (table->next_number_field != NULL) &&
857       rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
858     goto abort;
859 #endif
860 
861   error=0;
862   if (duplic == DUP_REPLACE &&
863       (!table->triggers || !table->triggers->has_delete_triggers()))
864     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
865   if (duplic == DUP_UPDATE)
866     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
867   /*
868     let's *try* to start bulk inserts. It won't necessary
869     start them as values_list.elements should be greater than
870     some - handler dependent - threshold.
871     We should not start bulk inserts if this statement uses
872     functions or invokes triggers since they may access
873     to the same table and therefore should not see its
874     inconsistent state created by this optimization.
875     So we call start_bulk_insert to perform nesessary checks on
876     values_list.elements, and - if nothing else - to initialize
877     the code to make the call of end_bulk_insert() below safe.
878   */
879 #ifndef EMBEDDED_LIBRARY
880   if (lock_type != TL_WRITE_DELAYED)
881 #endif /* EMBEDDED_LIBRARY */
882   {
883     if (duplic != DUP_ERROR || ignore)
884     {
885       table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
886       if (table->file->ha_table_flags() & HA_DUPLICATE_POS &&
887           table->file->ha_rnd_init_with_error(0))
888         goto abort;
889     }
890     /**
891       This is a simple check for the case when the table has a trigger
892       that reads from it, or when the statement invokes a stored function
893       that reads from the table being inserted to.
894       Engines can't handle a bulk insert in parallel with a read form the
895       same table in the same connection.
896     */
897     if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
898        values_list.elements > 1)
899     {
900       using_bulk_insert= 1;
901       table->file->ha_start_bulk_insert(values_list.elements);
902     }
903   }
904 
905   thd->abort_on_warning= !ignore && thd->is_strict_mode();
906 
907   table->reset_default_fields();
908   table->prepare_triggers_for_insert_stmt_or_event();
909   table->mark_columns_needed_for_insert();
910 
911   if (fields.elements || !value_count || table_list->view != 0)
912   {
913     if (table->triggers &&
914         table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
915     {
916       /* BEFORE INSERT triggers exist, the check will be done later, per row */
917     }
918     else if (check_that_all_fields_are_given_values(thd, table, table_list))
919     {
920       error= 1;
921       goto values_loop_end;
922     }
923   }
924 
925   if (table_list->prepare_where(thd, 0, TRUE) ||
926       table_list->prepare_check_option(thd))
927     error= 1;
928 
929   switch_to_nullable_trigger_fields(fields, table);
930   switch_to_nullable_trigger_fields(update_fields, table);
931   switch_to_nullable_trigger_fields(update_values, table);
932 
933   if (fields.elements || !value_count)
934   {
935     /*
936       There are possibly some default values:
937       INSERT INTO t1 (fields) VALUES ...
938       INSERT INTO t1 VALUES ()
939     */
940     if (table->validate_default_values_of_unset_fields(thd))
941     {
942       error= 1;
943       goto values_loop_end;
944     }
945   }
946 
947   THD_STAGE_INFO(thd, stage_update);
948   do
949   {
950     DBUG_PRINT("info", ("iteration %llu", iteration));
951     if (iteration && bulk_parameters_set(thd))
952     {
953       error= 1;
954       goto values_loop_end;
955     }
956 
957     while ((values= its++))
958     {
959       if (fields.elements || !value_count)
960       {
961         /*
962           There are possibly some default values:
963           INSERT INTO t1 (fields) VALUES ...
964           INSERT INTO t1 VALUES ()
965         */
966         restore_record(table,s->default_values);	// Get empty record
967         table->reset_default_fields();
968         if (unlikely(fill_record_n_invoke_before_triggers(thd, table, fields,
969                                                           *values, 0,
970                                                           TRG_EVENT_INSERT)))
971         {
972           if (values_list.elements != 1 && ! thd->is_error())
973           {
974             info.records++;
975             continue;
976           }
977           /*
978             TODO: set thd->abort_on_warning if values_list.elements == 1
979 	    and check that all items return warning in case of problem with
980 	    storing field.
981           */
982 	  error=1;
983 	  break;
984         }
985       }
986       else
987       {
988         /*
989           No field list, all fields are set explicitly:
990           INSERT INTO t1 VALUES (values)
991         */
992         if (thd->lex->used_tables || // Column used in values()
993             table->s->visible_fields != table->s->fields)
994 	  restore_record(table,s->default_values);	// Get empty record
995         else
996         {
997           TABLE_SHARE *share= table->s;
998 
999           /*
1000             Fix delete marker. No need to restore rest of record since it will
1001             be overwritten by fill_record() anyway (and fill_record() does not
1002             use default values in this case).
1003           */
1004           table->record[0][0]= share->default_values[0];
1005 
1006           /* Fix undefined null_bits. */
1007           if (share->null_bytes > 1 && share->last_null_bit_pos)
1008           {
1009             table->record[0][share->null_bytes - 1]=
1010               share->default_values[share->null_bytes - 1];
1011           }
1012         }
1013         table->reset_default_fields();
1014         if (unlikely(fill_record_n_invoke_before_triggers(thd, table,
1015                                                           table->
1016                                                           field_to_fill(),
1017                                                           *values, 0,
1018                                                           TRG_EVENT_INSERT)))
1019         {
1020           if (values_list.elements != 1 && ! thd->is_error())
1021 	  {
1022 	    info.records++;
1023 	    continue;
1024 	  }
1025 	  error=1;
1026 	  break;
1027         }
1028       }
1029 
1030       /*
1031         with triggers a field can get a value *conditionally*, so we have to
1032         repeat has_no_default_value() check for every row
1033       */
1034       if (table->triggers &&
1035           table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
1036       {
1037         for (Field **f=table->field ; *f ; f++)
1038         {
1039           if (unlikely(!(*f)->has_explicit_value() &&
1040                        has_no_default_value(thd, *f, table_list)))
1041           {
1042             error= 1;
1043             goto values_loop_end;
1044           }
1045         }
1046       }
1047 
1048       if ((res= table_list->view_check_option(thd,
1049                                               (values_list.elements == 1 ?
1050                                                0 :
1051                                                ignore))) ==
1052           VIEW_CHECK_SKIP)
1053         continue;
1054       else if (res == VIEW_CHECK_ERROR)
1055       {
1056         error= 1;
1057         break;
1058       }
1059 
1060       thd->decide_logging_format_low(table);
1061 #ifndef EMBEDDED_LIBRARY
1062       if (lock_type == TL_WRITE_DELAYED)
1063       {
1064         LEX_STRING const st_query = { query, thd->query_length() };
1065         DEBUG_SYNC(thd, "before_write_delayed");
1066         error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
1067         DEBUG_SYNC(thd, "after_write_delayed");
1068         query=0;
1069       }
1070       else
1071 #endif
1072         error=write_record(thd, table ,&info);
1073       if (unlikely(error))
1074         break;
1075       thd->get_stmt_da()->inc_current_row_for_warning();
1076     }
1077     its.rewind();
1078     iteration++;
1079   } while (bulk_parameters_iterations(thd));
1080 
1081 values_loop_end:
1082   free_underlaid_joins(thd, &thd->lex->select_lex);
1083   joins_freed= TRUE;
1084 
1085   /*
1086     Now all rows are inserted.  Time to update logs and sends response to
1087     user
1088   */
1089 #ifndef EMBEDDED_LIBRARY
1090   if (unlikely(lock_type == TL_WRITE_DELAYED))
1091   {
1092     if (likely(!error))
1093     {
1094       info.copied=values_list.elements;
1095       end_delayed_insert(thd);
1096     }
1097   }
1098   else
1099 #endif
1100   {
1101     /*
1102       Do not do this release if this is a delayed insert, it would steal
1103       auto_inc values from the delayed_insert thread as they share TABLE.
1104     */
1105     table->file->ha_release_auto_increment();
1106     if (using_bulk_insert && unlikely(table->file->ha_end_bulk_insert()) &&
1107         !error)
1108     {
1109       table->file->print_error(my_errno,MYF(0));
1110       error=1;
1111     }
1112     if (duplic != DUP_ERROR || ignore)
1113     {
1114       table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1115       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1116         table->file->ha_rnd_end();
1117     }
1118 
1119     transactional_table= table->file->has_transactions();
1120 
1121     if (likely(changed= (info.copied || info.deleted || info.updated)))
1122     {
1123       /*
1124         Invalidate the table in the query cache if something changed.
1125         For the transactional algorithm to work the invalidation must be
1126         before binlog writing and ha_autocommit_or_rollback
1127       */
1128       query_cache_invalidate3(thd, table_list, 1);
1129     }
1130 
1131     if (thd->transaction.stmt.modified_non_trans_table)
1132       thd->transaction.all.modified_non_trans_table= TRUE;
1133     thd->transaction.all.m_unsafe_rollback_flags|=
1134       (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
1135 
1136     if (error <= 0 ||
1137         thd->transaction.stmt.modified_non_trans_table ||
1138 	was_insert_delayed)
1139     {
1140       if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
1141       {
1142         int errcode= 0;
1143 	if (error <= 0)
1144         {
1145 	  /*
1146 	    [Guilhem wrote] Temporary errors may have filled
1147 	    thd->net.last_error/errno.  For example if there has
1148 	    been a disk full error when writing the row, and it was
1149 	    MyISAM, then thd->net.last_error/errno will be set to
1150             "disk full"... and the mysql_file_pwrite() will wait until free
1151 	    space appears, and so when it finishes then the
1152 	    write_row() was entirely successful
1153 	  */
1154 	  /* todo: consider removing */
1155 	  thd->clear_error();
1156 	}
1157         else
1158           errcode= query_error_code(thd, thd->killed == NOT_KILLED);
1159 
1160         ScopedStatementReplication scoped_stmt_rpl(
1161             table->versioned(VERS_TRX_ID) ? thd : NULL);
1162        /* bug#22725:
1163 
1164 	A query which per-row-loop can not be interrupted with
1165 	KILLED, like INSERT, and that does not invoke stored
1166 	routines can be binlogged with neglecting the KILLED error.
1167 
1168 	If there was no error (error == zero) until after the end of
1169 	inserting loop the KILLED flag that appeared later can be
1170 	disregarded since previously possible invocation of stored
1171 	routines did not result in any error due to the KILLED.  In
1172 	such case the flag is ignored for constructing binlog event.
1173 	*/
1174 	DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0);
1175         if (was_insert_delayed && table_list->lock_type ==  TL_WRITE)
1176         {
1177           /* Binlog INSERT DELAYED as INSERT without DELAYED. */
1178           String log_query;
1179           if (create_insert_stmt_from_insert_delayed(thd, &log_query))
1180           {
1181             sql_print_error("Event Error: An error occurred while creating query string"
1182                             "for INSERT DELAYED stmt, before writing it into binary log.");
1183 
1184             error= 1;
1185           }
1186           else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1187                                      log_query.c_ptr(), log_query.length(),
1188                                      transactional_table, FALSE, FALSE,
1189                                      errcode) > 0)
1190             error= 1;
1191         }
1192         else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1193 			           thd->query(), thd->query_length(),
1194 			           transactional_table, FALSE, FALSE,
1195                                    errcode) > 0)
1196 	  error= 1;
1197       }
1198     }
1199     DBUG_ASSERT(transactional_table || !changed ||
1200                 thd->transaction.stmt.modified_non_trans_table);
1201   }
1202   THD_STAGE_INFO(thd, stage_end);
1203   /*
1204     We'll report to the client this id:
1205     - if the table contains an autoincrement column and we successfully
1206     inserted an autogenerated value, the autogenerated value.
1207     - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
1208     called, X.
1209     - if the table contains an autoincrement column, and some rows were
1210     inserted, the id of the last "inserted" row (if IGNORE, that value may not
1211     have been really inserted but ignored).
1212   */
1213   id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
1214     thd->first_successful_insert_id_in_cur_stmt :
1215     (thd->arg_of_last_insert_id_function ?
1216      thd->first_successful_insert_id_in_prev_stmt :
1217      ((table->next_number_field && info.copied) ?
1218      table->next_number_field->val_int() : 0));
1219   table->next_number_field=0;
1220   thd->count_cuted_fields= CHECK_FIELD_IGNORE;
1221   table->auto_increment_field_not_null= FALSE;
1222   if (duplic == DUP_REPLACE &&
1223       (!table->triggers || !table->triggers->has_delete_triggers()))
1224     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1225 
1226   if (unlikely(error))
1227     goto abort;
1228   if (thd->lex->analyze_stmt)
1229   {
1230     retval= thd->lex->explain->send_explain(thd);
1231     goto abort;
1232   }
1233   if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
1234 				    !thd->cuted_fields))
1235   {
1236     my_ok(thd, info.copied + info.deleted +
1237                ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1238                 info.touched : info.updated),
1239           id);
1240   }
1241   else
1242   {
1243     char buff[160];
1244     ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1245                      info.touched : info.updated);
1246     if (ignore)
1247       sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1248 	      (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
1249 	      (ulong) (info.records - info.copied),
1250               (long) thd->get_stmt_da()->current_statement_warn_count());
1251     else
1252       sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1253 	      (ulong) (info.deleted + updated),
1254               (long) thd->get_stmt_da()->current_statement_warn_count());
1255     ::my_ok(thd, info.copied + info.deleted + updated, id, buff);
1256   }
1257   thd->abort_on_warning= 0;
1258   if (thd->lex->current_select->first_cond_optimization)
1259   {
1260     thd->lex->current_select->save_leaf_tables(thd);
1261     thd->lex->current_select->first_cond_optimization= 0;
1262   }
1263 
1264   DBUG_RETURN(FALSE);
1265 
1266 abort:
1267 #ifndef EMBEDDED_LIBRARY
1268   if (lock_type == TL_WRITE_DELAYED)
1269   {
1270     end_delayed_insert(thd);
1271     /*
1272       In case of an error (e.g. data truncation), the data type specific data
1273       in fields (e.g. Field_blob::value) was not taken over
1274       by the delayed writer thread. All fields in table_list->table
1275       will be freed by free_root() soon. We need to free the specific
1276       data before free_root() to avoid a memory leak.
1277     */
1278     for (Field **ptr= table_list->table->field ; *ptr ; ptr++)
1279       (*ptr)->free();
1280   }
1281 #endif
1282   if (table != NULL)
1283     table->file->ha_release_auto_increment();
1284 
1285   if (!joins_freed)
1286     free_underlaid_joins(thd, &thd->lex->select_lex);
1287   thd->abort_on_warning= 0;
1288   DBUG_RETURN(retval);
1289 }
1290 
1291 
1292 /*
1293   Additional check for insertability for VIEW
1294 
1295   SYNOPSIS
1296     check_view_insertability()
1297     thd     - thread handler
1298     view    - reference on VIEW
1299 
1300   IMPLEMENTATION
1301     A view is insertable if the folloings are true:
1302     - All columns in the view are columns from a table
1303     - All not used columns in table have a default values
1304     - All field in view are unique (not referring to the same column)
1305 
1306   RETURN
1307     FALSE - OK
1308       view->contain_auto_increment is 1 if and only if the view contains an
1309       auto_increment field
1310 
1311     TRUE  - can't be used for insert
1312 */
1313 
check_view_insertability(THD * thd,TABLE_LIST * view)1314 static bool check_view_insertability(THD * thd, TABLE_LIST *view)
1315 {
1316   uint num= view->view->select_lex.item_list.elements;
1317   TABLE *table= view->table;
1318   Field_translator *trans_start= view->field_translation,
1319 		   *trans_end= trans_start + num;
1320   Field_translator *trans;
1321   uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
1322   uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1323   MY_BITMAP used_fields;
1324   enum_column_usage saved_column_usage= thd->column_usage;
1325   DBUG_ENTER("check_key_in_view");
1326 
1327   if (!used_fields_buff)
1328     DBUG_RETURN(TRUE);  // EOM
1329 
1330   DBUG_ASSERT(view->table != 0 && view->field_translation != 0);
1331 
1332   (void) my_bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
1333   bitmap_clear_all(&used_fields);
1334 
1335   view->contain_auto_increment= 0;
1336   /*
1337     we must not set query_id for fields as they're not
1338     really used in this context
1339   */
1340   thd->column_usage= COLUMNS_WRITE;
1341   /* check simplicity and prepare unique test of view */
1342   for (trans= trans_start; trans != trans_end; trans++)
1343   {
1344     if (trans->item->fix_fields_if_needed(thd, &trans->item))
1345     {
1346       thd->column_usage= saved_column_usage;
1347       DBUG_RETURN(TRUE);
1348     }
1349     Item_field *field;
1350     /* simple SELECT list entry (field without expression) */
1351     if (!(field= trans->item->field_for_view_update()))
1352     {
1353       thd->column_usage= saved_column_usage;
1354       DBUG_RETURN(TRUE);
1355     }
1356     if (field->field->unireg_check == Field::NEXT_NUMBER)
1357       view->contain_auto_increment= 1;
1358     /* prepare unique test */
1359     /*
1360       remove collation (or other transparent for update function) if we have
1361       it
1362     */
1363     trans->item= field;
1364   }
1365   thd->column_usage= saved_column_usage;
1366   /* unique test */
1367   for (trans= trans_start; trans != trans_end; trans++)
1368   {
1369     /* Thanks to test above, we know that all columns are of type Item_field */
1370     Item_field *field= (Item_field *)trans->item;
1371     /* check fields belong to table in which we are inserting */
1372     if (field->field->table == table &&
1373         bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1374       DBUG_RETURN(TRUE);
1375   }
1376 
1377   DBUG_RETURN(FALSE);
1378 }
1379 
1380 
1381 /*
1382   Check if table can be updated
1383 
1384   SYNOPSIS
1385      mysql_prepare_insert_check_table()
1386      thd		Thread handle
1387      table_list		Table list
1388      fields		List of fields to be updated
1389      where		Pointer to where clause
1390      select_insert      Check is making for SELECT ... INSERT
1391 
1392    RETURN
1393      FALSE ok
1394      TRUE  ERROR
1395 */
1396 
mysql_prepare_insert_check_table(THD * thd,TABLE_LIST * table_list,List<Item> & fields,bool select_insert)1397 static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1398                                              List<Item> &fields,
1399                                              bool select_insert)
1400 {
1401   bool insert_into_view= (table_list->view != 0);
1402   DBUG_ENTER("mysql_prepare_insert_check_table");
1403 
1404   if (!table_list->single_table_updatable())
1405   {
1406     my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
1407     DBUG_RETURN(TRUE);
1408   }
1409   /*
1410      first table in list is the one we'll INSERT into, requires INSERT_ACL.
1411      all others require SELECT_ACL only. the ACL requirement below is for
1412      new leaves only anyway (view-constituents), so check for SELECT rather
1413      than INSERT.
1414   */
1415 
1416   if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
1417                                     &thd->lex->select_lex.top_join_list,
1418                                     table_list,
1419                                     thd->lex->select_lex.leaf_tables,
1420                                     select_insert, INSERT_ACL, SELECT_ACL,
1421                                     TRUE))
1422     DBUG_RETURN(TRUE);
1423 
1424   if (insert_into_view && !fields.elements)
1425   {
1426     thd->lex->empty_field_list_on_rset= 1;
1427     if (!thd->lex->select_lex.leaf_tables.head()->table ||
1428         table_list->is_multitable())
1429     {
1430       my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1431                table_list->view_db.str, table_list->view_name.str);
1432       DBUG_RETURN(TRUE);
1433     }
1434     DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
1435   }
1436 
1437   DBUG_RETURN(FALSE);
1438 }
1439 
1440 
1441 /*
1442   Get extra info for tables we insert into
1443 
1444   @param table     table(TABLE object) we insert into,
1445                    might be NULL in case of view
1446   @param           table(TABLE_LIST object) or view we insert into
1447 */
1448 
prepare_for_positional_update(TABLE * table,TABLE_LIST * tables)1449 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1450 {
1451   if (table)
1452   {
1453     if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1454       table->prepare_for_position();
1455     return;
1456   }
1457 
1458   DBUG_ASSERT(tables->view);
1459   List_iterator<TABLE_LIST> it(*tables->view_tables);
1460   TABLE_LIST *tbl;
1461   while ((tbl= it++))
1462     prepare_for_positional_update(tbl->table, tbl);
1463 
1464   return;
1465 }
1466 
1467 
1468 /*
1469   Prepare items in INSERT statement
1470 
1471   SYNOPSIS
1472     mysql_prepare_insert()
1473     thd			Thread handler
1474     table_list	        Global/local table list
1475     table		Table to insert into (can be NULL if table should
1476 			be taken from table_list->table)
1477     where		Where clause (for insert ... select)
1478     select_insert	TRUE if INSERT ... SELECT statement
1479 
1480   TODO (in far future)
1481     In cases of:
1482     INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1483     ON DUPLICATE KEY ...
1484     we should be able to refer to sum1 in the ON DUPLICATE KEY part
1485 
1486   WARNING
1487     You MUST set table->insert_values to 0 after calling this function
1488     before releasing the table object.
1489 
1490   RETURN VALUE
1491     FALSE OK
1492     TRUE  error
1493 */
1494 
mysql_prepare_insert(THD * thd,TABLE_LIST * table_list,TABLE * table,List<Item> & fields,List_item * values,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,COND ** where,bool select_insert)1495 bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1496                           TABLE *table, List<Item> &fields, List_item *values,
1497                           List<Item> &update_fields, List<Item> &update_values,
1498                           enum_duplicates duplic, COND **where,
1499                           bool select_insert)
1500 {
1501   SELECT_LEX *select_lex= &thd->lex->select_lex;
1502   Name_resolution_context *context= &select_lex->context;
1503   Name_resolution_context_state ctx_state;
1504   bool insert_into_view= (table_list->view != 0);
1505   bool res= 0;
1506   table_map map= 0;
1507   DBUG_ENTER("mysql_prepare_insert");
1508   DBUG_PRINT("enter", ("table_list: %p  table: %p  view: %d",
1509 		       table_list, table,
1510 		       (int)insert_into_view));
1511   /* INSERT should have a SELECT or VALUES clause */
1512   DBUG_ASSERT (!select_insert || !values);
1513 
1514   if (mysql_handle_derived(thd->lex, DT_INIT))
1515     DBUG_RETURN(TRUE);
1516   if (table_list->handle_derived(thd->lex, DT_MERGE_FOR_INSERT))
1517     DBUG_RETURN(TRUE);
1518   if (thd->lex->handle_list_of_derived(table_list, DT_PREPARE))
1519     DBUG_RETURN(TRUE);
1520   /*
1521     For subqueries in VALUES() we should not see the table in which we are
1522     inserting (for INSERT ... SELECT this is done by changing table_list,
1523     because INSERT ... SELECT share SELECT_LEX it with SELECT.
1524   */
1525   if (!select_insert)
1526   {
1527     for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1528          un;
1529          un= un->next_unit())
1530     {
1531       for (SELECT_LEX *sl= un->first_select();
1532            sl;
1533            sl= sl->next_select())
1534       {
1535         sl->context.outer_context= 0;
1536       }
1537     }
1538   }
1539 
1540   if (duplic == DUP_UPDATE)
1541   {
1542     /* it should be allocated before Item::fix_fields() */
1543     if (table_list->set_insert_values(thd->mem_root))
1544       DBUG_RETURN(TRUE);
1545   }
1546 
1547   if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
1548     DBUG_RETURN(TRUE);
1549 
1550   /* Prepare the fields in the statement. */
1551   if (values)
1552   {
1553     /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
1554     DBUG_ASSERT (!select_lex->group_list.elements);
1555 
1556     /* Save the state of the current name resolution context. */
1557     ctx_state.save_state(context, table_list);
1558 
1559     /*
1560       Perform name resolution only in the first table - 'table_list',
1561       which is the table that is inserted into.
1562      */
1563     table_list->next_local= 0;
1564     context->resolve_in_table_list_only(table_list);
1565 
1566     res= (setup_fields(thd, Ref_ptr_array(),
1567                        *values, MARK_COLUMNS_READ, 0, NULL, 0) ||
1568           check_insert_fields(thd, context->table_list, fields, *values,
1569                               !insert_into_view, 0, &map));
1570 
1571     if (!res)
1572       res= setup_fields(thd, Ref_ptr_array(),
1573                         update_values, MARK_COLUMNS_READ, 0, NULL, 0);
1574 
1575     if (!res && duplic == DUP_UPDATE)
1576     {
1577       select_lex->no_wrap_view_item= TRUE;
1578       res= check_update_fields(thd, context->table_list, update_fields,
1579                                update_values, false, &map);
1580       select_lex->no_wrap_view_item= FALSE;
1581     }
1582 
1583     /* Restore the current context. */
1584     ctx_state.restore_state(context, table_list);
1585   }
1586 
1587   if (res)
1588     DBUG_RETURN(res);
1589 
1590   if (!table)
1591     table= table_list->table;
1592 
1593   if (table->versioned(VERS_TIMESTAMP) && duplic == DUP_REPLACE)
1594   {
1595     // Additional memory may be required to create historical items.
1596     if (table_list->set_insert_values(thd->mem_root))
1597       DBUG_RETURN(TRUE);
1598   }
1599 
1600   if (!select_insert)
1601   {
1602     Item *fake_conds= 0;
1603     TABLE_LIST *duplicate;
1604     if ((duplicate= unique_table(thd, table_list, table_list->next_global,
1605                                  CHECK_DUP_ALLOW_DIFFERENT_ALIAS)))
1606     {
1607       update_non_unique_table_error(table_list, "INSERT", duplicate);
1608       DBUG_RETURN(TRUE);
1609     }
1610     select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
1611   }
1612   /*
1613     Only call prepare_for_posistion() if we are not performing a DELAYED
1614     operation. It will instead be executed by delayed insert thread.
1615   */
1616   if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1617     prepare_for_positional_update(table, table_list);
1618   DBUG_RETURN(FALSE);
1619 }
1620 
1621 
1622 	/* Check if there is more uniq keys after field */
1623 
last_uniq_key(TABLE * table,uint keynr)1624 static int last_uniq_key(TABLE *table,uint keynr)
1625 {
1626   /*
1627     When an underlying storage engine informs that the unique key
1628     conflicts are not reported in the ascending order by setting
1629     the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1630     information to determine the last key conflict.
1631 
1632     The information about the last key conflict will be used to
1633     do a replace of the new row on the conflicting row, rather
1634     than doing a delete (of old row) + insert (of new row).
1635 
1636     Hence check for this flag and disable replacing the last row
1637     by returning 0 always. Returning 0 will result in doing
1638     a delete + insert always.
1639   */
1640   if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1641     return 0;
1642 
1643   while (++keynr < table->s->keys)
1644     if (table->key_info[keynr].flags & HA_NOSAME)
1645       return 0;
1646   return 1;
1647 }
1648 
1649 
1650 /*
1651  Inserts one historical row to a table.
1652 
1653  Copies content of the row from table->record[1] to table->record[0],
1654  sets Sys_end to now() and calls ha_write_row() .
1655 */
1656 
vers_insert_history_row(TABLE * table)1657 int vers_insert_history_row(TABLE *table)
1658 {
1659   DBUG_ASSERT(table->versioned(VERS_TIMESTAMP));
1660   if (!table->vers_write)
1661     return 0;
1662   restore_record(table,record[1]);
1663 
1664   // Set Sys_end to now()
1665   table->vers_update_end();
1666 
1667   Field *row_start= table->vers_start_field();
1668   Field *row_end= table->vers_end_field();
1669   if (row_start->cmp(row_start->ptr, row_end->ptr) >= 0)
1670     return 0;
1671 
1672   if (table->vfield &&
1673       table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_READ))
1674     return HA_ERR_GENERIC;
1675 
1676   return table->file->ha_write_row(table->record[0]);
1677 }
1678 
1679 /*
1680   Write a record to table with optional deleting of conflicting records,
1681   invoke proper triggers if needed.
1682 
1683   SYNOPSIS
1684      write_record()
1685       thd   - thread context
1686       table - table to which record should be written
1687       info  - COPY_INFO structure describing handling of duplicates
1688               and which is used for counting number of records inserted
1689               and deleted.
1690 
1691   NOTE
1692     Once this record will be written to table after insert trigger will
1693     be invoked. If instead of inserting new record we will update old one
1694     then both on update triggers will work instead. Similarly both on
1695     delete triggers will be invoked if we will delete conflicting records.
1696 
1697     Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have
1698     transactions.
1699 
1700   RETURN VALUE
1701     0     - success
1702     non-0 - error
1703 */
1704 
1705 
write_record(THD * thd,TABLE * table,COPY_INFO * info)1706 int write_record(THD *thd, TABLE *table,COPY_INFO *info)
1707 {
1708   int error, trg_error= 0;
1709   char *key=0;
1710   MY_BITMAP *save_read_set, *save_write_set;
1711   ulonglong prev_insert_id= table->file->next_insert_id;
1712   ulonglong insert_id_for_cur_row= 0;
1713   ulonglong prev_insert_id_for_cur_row= 0;
1714   DBUG_ENTER("write_record");
1715 
1716   info->records++;
1717   save_read_set=  table->read_set;
1718   save_write_set= table->write_set;
1719 
1720   if (info->handle_duplicates == DUP_REPLACE ||
1721       info->handle_duplicates == DUP_UPDATE)
1722   {
1723     while (unlikely(error=table->file->ha_write_row(table->record[0])))
1724     {
1725       uint key_nr;
1726       /*
1727         If we do more than one iteration of this loop, from the second one the
1728         row will have an explicit value in the autoinc field, which was set at
1729         the first call of handler::update_auto_increment(). So we must save
1730         the autogenerated value to avoid thd->insert_id_for_cur_row to become
1731         0.
1732       */
1733       if (table->file->insert_id_for_cur_row > 0)
1734         insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1735       else
1736         table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1737       bool is_duplicate_key_error;
1738       if (table->file->is_fatal_error(error, HA_CHECK_ALL))
1739 	goto err;
1740       is_duplicate_key_error=
1741         table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP);
1742       if (!is_duplicate_key_error)
1743       {
1744         /*
1745           We come here when we had an ignorable error which is not a duplicate
1746           key error. In this we ignore error if ignore flag is set, otherwise
1747           report error as usual. We will not do any duplicate key processing.
1748         */
1749         if (info->ignore)
1750         {
1751           table->file->print_error(error, MYF(ME_JUST_WARNING));
1752           goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1753         }
1754         goto err;
1755       }
1756       if (unlikely((int) (key_nr = table->file->get_dup_key(error)) < 0))
1757       {
1758 	error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
1759 	goto err;
1760       }
1761       DEBUG_SYNC(thd, "write_row_replace");
1762 
1763       /* Read all columns for the row we are going to replace */
1764       table->use_all_columns();
1765       /*
1766 	Don't allow REPLACE to replace a row when a auto_increment column
1767 	was used.  This ensures that we don't get a problem when the
1768 	whole range of the key has been used.
1769       */
1770       if (info->handle_duplicates == DUP_REPLACE &&
1771           table->next_number_field &&
1772           key_nr == table->s->next_number_index &&
1773 	  (insert_id_for_cur_row > 0))
1774 	goto err;
1775       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1776       {
1777         DBUG_ASSERT(table->file->inited == handler::RND);
1778 	if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1779 	  goto err;
1780       }
1781       else
1782       {
1783 	if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1784 	{
1785 	  error=my_errno;
1786 	  goto err;
1787 	}
1788 
1789 	if (!key)
1790 	{
1791 	  if (!(key=(char*) my_safe_alloca(table->s->max_unique_length)))
1792 	  {
1793 	    error=ENOMEM;
1794 	    goto err;
1795 	  }
1796 	}
1797 	key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1798         key_part_map keypart_map= (1 << table->key_info[key_nr].user_defined_key_parts) - 1;
1799 	if ((error= (table->file->ha_index_read_idx_map(table->record[1],
1800                                                         key_nr, (uchar*) key,
1801                                                         keypart_map,
1802                                                         HA_READ_KEY_EXACT))))
1803 	  goto err;
1804       }
1805       if (table->vfield)
1806       {
1807         /*
1808           We have not yet called update_virtual_fields(VOL_UPDATE_FOR_READ)
1809           in handler methods for the just read row in record[1].
1810         */
1811         table->move_fields(table->field, table->record[1], table->record[0]);
1812         int verr = table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE);
1813         table->move_fields(table->field, table->record[0], table->record[1]);
1814         if (verr)
1815           goto err;
1816       }
1817       if (info->handle_duplicates == DUP_UPDATE)
1818       {
1819         int res= 0;
1820         /*
1821           We don't check for other UNIQUE keys - the first row
1822           that matches, is updated. If update causes a conflict again,
1823           an error is returned
1824         */
1825 	DBUG_ASSERT(table->insert_values != NULL);
1826         store_record(table,insert_values);
1827         restore_record(table,record[1]);
1828         table->reset_default_fields();
1829 
1830         /*
1831           in INSERT ... ON DUPLICATE KEY UPDATE the set of modified fields can
1832           change per row. Thus, we have to do reset_default_fields() per row.
1833           Twice (before insert and before update).
1834         */
1835         DBUG_ASSERT(info->update_fields->elements ==
1836                     info->update_values->elements);
1837         if (fill_record_n_invoke_before_triggers(thd, table,
1838                                                  *info->update_fields,
1839                                                  *info->update_values,
1840                                                  info->ignore,
1841                                                  TRG_EVENT_UPDATE))
1842           goto before_trg_err;
1843 
1844         bool different_records= (!records_are_comparable(table) ||
1845                                  compare_record(table));
1846         /*
1847           Default fields must be updated before checking view updateability.
1848           This branch of INSERT is executed only when a UNIQUE key was violated
1849           with the ON DUPLICATE KEY UPDATE option. In this case the INSERT
1850           operation is transformed to an UPDATE, and the default fields must
1851           be updated as if this is an UPDATE.
1852         */
1853         if (different_records && table->default_field)
1854           table->evaluate_update_default_function();
1855 
1856         /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1857         res= info->table_list->view_check_option(table->in_use, info->ignore);
1858         if (res == VIEW_CHECK_SKIP)
1859           goto ok_or_after_trg_err;
1860         if (res == VIEW_CHECK_ERROR)
1861           goto before_trg_err;
1862 
1863         table->file->restore_auto_increment(prev_insert_id);
1864         info->touched++;
1865         if (different_records)
1866         {
1867           if (unlikely(error=table->file->ha_update_row(table->record[1],
1868                                                         table->record[0])) &&
1869               error != HA_ERR_RECORD_IS_THE_SAME)
1870           {
1871             if (info->ignore &&
1872                 !table->file->is_fatal_error(error, HA_CHECK_ALL))
1873             {
1874               if (!(thd->variables.old_behavior &
1875                     OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
1876                 table->file->print_error(error, MYF(ME_JUST_WARNING));
1877               goto ok_or_after_trg_err;
1878             }
1879             goto err;
1880           }
1881 
1882           if (error != HA_ERR_RECORD_IS_THE_SAME)
1883           {
1884             info->updated++;
1885             if (table->versioned())
1886             {
1887               if (table->versioned(VERS_TIMESTAMP))
1888               {
1889                 store_record(table, record[2]);
1890                 if ((error= vers_insert_history_row(table)))
1891                 {
1892                   info->last_errno= error;
1893                   table->file->print_error(error, MYF(0));
1894                   trg_error= 1;
1895                   restore_record(table, record[2]);
1896                   goto ok_or_after_trg_err;
1897                 }
1898                 restore_record(table, record[2]);
1899               }
1900               info->copied++;
1901             }
1902           }
1903           else
1904             error= 0;
1905           /*
1906             If ON DUP KEY UPDATE updates a row instead of inserting
1907             one, it's like a regular UPDATE statement: it should not
1908             affect the value of a next SELECT LAST_INSERT_ID() or
1909             mysql_insert_id().  Except if LAST_INSERT_ID(#) was in the
1910             INSERT query, which is handled separately by
1911             THD::arg_of_last_insert_id_function.
1912           */
1913           prev_insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1914           insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1915           trg_error= (table->triggers &&
1916                       table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1917                                                         TRG_ACTION_AFTER, TRUE));
1918           info->copied++;
1919         }
1920 
1921         /*
1922           Only update next_insert_id if the AUTO_INCREMENT value was explicitly
1923           updated, so we don't update next_insert_id with the value from the
1924           row being updated. Otherwise reset next_insert_id to what it was
1925           before the duplicate key error, since that value is unused.
1926         */
1927         if (table->next_number_field_updated)
1928         {
1929           DBUG_ASSERT(table->next_number_field != NULL);
1930 
1931           table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
1932         }
1933         else if (prev_insert_id_for_cur_row)
1934         {
1935           table->file->restore_auto_increment(prev_insert_id_for_cur_row);
1936         }
1937         goto ok_or_after_trg_err;
1938       }
1939       else /* DUP_REPLACE */
1940       {
1941 	/*
1942 	  The manual defines the REPLACE semantics that it is either
1943 	  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
1944 	  InnoDB do not function in the defined way if we allow MySQL
1945 	  to convert the latter operation internally to an UPDATE.
1946           We also should not perform this conversion if we have
1947           timestamp field with ON UPDATE which is different from DEFAULT.
1948           Another case when conversion should not be performed is when
1949           we have ON DELETE trigger on table so user may notice that
1950           we cheat here. Note that it is ok to do such conversion for
1951           tables which have ON UPDATE but have no ON DELETE triggers,
1952           we just should not expose this fact to users by invoking
1953           ON UPDATE triggers.
1954           For system versioning wa also use path through delete since we would
1955           save nothing through this cheating.
1956         */
1957         if (last_uniq_key(table,key_nr) &&
1958             !table->file->referenced_by_foreign_key() &&
1959             (!table->triggers || !table->triggers->has_delete_triggers()))
1960         {
1961           if (table->versioned(VERS_TRX_ID))
1962           {
1963             bitmap_set_bit(table->write_set, table->vers_start_field()->field_index);
1964             table->file->column_bitmaps_signal();
1965             table->vers_start_field()->store(0, false);
1966           }
1967           if (unlikely(error= table->file->ha_update_row(table->record[1],
1968                                                          table->record[0])) &&
1969               error != HA_ERR_RECORD_IS_THE_SAME)
1970             goto err;
1971           if (likely(!error))
1972           {
1973             info->deleted++;
1974             if (!table->file->has_transactions())
1975               thd->transaction.stmt.modified_non_trans_table= TRUE;
1976             if (table->versioned(VERS_TIMESTAMP))
1977             {
1978               store_record(table, record[2]);
1979               error= vers_insert_history_row(table);
1980               restore_record(table, record[2]);
1981               if (unlikely(error))
1982                 goto err;
1983             }
1984           }
1985           else
1986             error= 0;   // error was HA_ERR_RECORD_IS_THE_SAME
1987           /*
1988             Since we pretend that we have done insert we should call
1989             its after triggers.
1990           */
1991           goto after_trg_n_copied_inc;
1992         }
1993         else
1994         {
1995           if (table->triggers &&
1996               table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1997                                                 TRG_ACTION_BEFORE, TRUE))
1998             goto before_trg_err;
1999 
2000           if (!table->versioned(VERS_TIMESTAMP))
2001             error= table->file->ha_delete_row(table->record[1]);
2002           else
2003           {
2004             store_record(table, record[2]);
2005             restore_record(table, record[1]);
2006             table->vers_update_end();
2007             error= table->file->ha_update_row(table->record[1],
2008                                               table->record[0]);
2009             restore_record(table, record[2]);
2010           }
2011           if (unlikely(error))
2012             goto err;
2013           if (!table->versioned(VERS_TIMESTAMP))
2014             info->deleted++;
2015           else
2016             info->updated++;
2017           if (!table->file->has_transactions())
2018             thd->transaction.stmt.modified_non_trans_table= TRUE;
2019           if (table->triggers &&
2020               table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
2021                                                 TRG_ACTION_AFTER, TRUE))
2022           {
2023             trg_error= 1;
2024             goto ok_or_after_trg_err;
2025           }
2026           /* Let us attempt do write_row() once more */
2027         }
2028       }
2029     }
2030 
2031     /*
2032       If more than one iteration of the above while loop is done, from
2033       the second one the row being inserted will have an explicit
2034       value in the autoinc field, which was set at the first call of
2035       handler::update_auto_increment(). This value is saved to avoid
2036       thd->insert_id_for_cur_row becoming 0. Use this saved autoinc
2037       value.
2038      */
2039     if (table->file->insert_id_for_cur_row == 0)
2040       table->file->insert_id_for_cur_row= insert_id_for_cur_row;
2041 
2042     /*
2043       Restore column maps if they where replaced during an duplicate key
2044       problem.
2045     */
2046     if (table->read_set != save_read_set ||
2047         table->write_set != save_write_set)
2048       table->column_bitmaps_set(save_read_set, save_write_set);
2049   }
2050   else if (unlikely((error=table->file->ha_write_row(table->record[0]))))
2051   {
2052     DEBUG_SYNC(thd, "write_row_noreplace");
2053     if (!info->ignore ||
2054         table->file->is_fatal_error(error, HA_CHECK_ALL))
2055       goto err;
2056     if (!(thd->variables.old_behavior &
2057           OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
2058       table->file->print_error(error, MYF(ME_JUST_WARNING));
2059     table->file->restore_auto_increment(prev_insert_id);
2060     goto ok_or_after_trg_err;
2061   }
2062 
2063 after_trg_n_copied_inc:
2064   info->copied++;
2065   thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
2066   trg_error= (table->triggers &&
2067               table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
2068                                                 TRG_ACTION_AFTER, TRUE));
2069 
2070 ok_or_after_trg_err:
2071   if (key)
2072     my_safe_afree(key,table->s->max_unique_length);
2073   if (!table->file->has_transactions())
2074     thd->transaction.stmt.modified_non_trans_table= TRUE;
2075   DBUG_RETURN(trg_error);
2076 
2077 err:
2078   info->last_errno= error;
2079   table->file->print_error(error,MYF(0));
2080 
2081 before_trg_err:
2082   table->file->restore_auto_increment(prev_insert_id);
2083   if (key)
2084     my_safe_afree(key, table->s->max_unique_length);
2085   table->column_bitmaps_set(save_read_set, save_write_set);
2086   DBUG_RETURN(1);
2087 }
2088 
2089 
2090 /******************************************************************************
2091   Check that there aren't any null_fields
2092 ******************************************************************************/
2093 
2094 
check_that_all_fields_are_given_values(THD * thd,TABLE * entry,TABLE_LIST * table_list)2095 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, TABLE_LIST *table_list)
2096 {
2097   int err= 0;
2098   MY_BITMAP *write_set= entry->write_set;
2099 
2100   for (Field **field=entry->field ; *field ; field++)
2101   {
2102     if (!bitmap_is_set(write_set, (*field)->field_index) &&
2103         !(*field)->vers_sys_field() &&
2104         has_no_default_value(thd, *field, table_list) &&
2105         ((*field)->real_type() != MYSQL_TYPE_ENUM))
2106       err=1;
2107   }
2108   return thd->abort_on_warning ? err : 0;
2109 }
2110 
2111 /*****************************************************************************
2112   Handling of delayed inserts
2113   A thread is created for each table that one uses with the DELAYED attribute.
2114 *****************************************************************************/
2115 
2116 #ifndef EMBEDDED_LIBRARY
2117 
2118 class delayed_row :public ilink {
2119 public:
2120   char *record;
2121   enum_duplicates dup;
2122   my_time_t start_time;
2123   ulong start_time_sec_part;
2124   sql_mode_t sql_mode;
2125   bool auto_increment_field_not_null;
2126   bool ignore, log_query, query_start_sec_part_used;
2127   bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2128   ulonglong first_successful_insert_id_in_prev_stmt;
2129   ulonglong forced_insert_id;
2130   ulong auto_increment_increment;
2131   ulong auto_increment_offset;
2132   LEX_STRING query;
2133   Time_zone *time_zone;
2134   char *user, *host, *ip;
2135   query_id_t query_id;
2136   my_thread_id thread_id;
2137 
delayed_row(LEX_STRING const query_arg,enum_duplicates dup_arg,bool ignore_arg,bool log_query_arg)2138   delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
2139               bool ignore_arg, bool log_query_arg)
2140     : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
2141       forced_insert_id(0), query(query_arg), time_zone(0),
2142       user(0), host(0), ip(0)
2143     {}
~delayed_row()2144   ~delayed_row()
2145   {
2146     my_free(query.str);
2147     my_free(record);
2148   }
2149 };
2150 
2151 /**
2152   Delayed_insert - context of a thread responsible for delayed insert
2153   into one table. When processing delayed inserts, we create an own
2154   thread for every distinct table. Later on all delayed inserts directed
2155   into that table are handled by a dedicated thread.
2156 */
2157 
2158 class Delayed_insert :public ilink {
2159   uint locks_in_memory;
2160   thr_lock_type delayed_lock;
2161 public:
2162   THD thd;
2163   TABLE *table;
2164   mysql_mutex_t mutex;
2165   mysql_cond_t cond, cond_client;
2166   uint tables_in_use, stacked_inserts;
2167   volatile bool status;
2168   bool retry;
2169   /**
2170     When the handler thread starts, it clones a metadata lock ticket
2171     which protects against GRL and ticket for the table to be inserted.
2172     This is done to allow the deadlock detector to detect deadlocks
2173     resulting from these locks.
2174     Before this is done, the connection thread cannot safely exit
2175     without causing problems for clone_ticket().
2176     Once handler_thread_initialized has been set, it is safe for the
2177     connection thread to exit.
2178     Access to handler_thread_initialized is protected by di->mutex.
2179   */
2180   bool handler_thread_initialized;
2181   COPY_INFO info;
2182   I_List<delayed_row> rows;
2183   ulong group_count;
2184   TABLE_LIST table_list;			// Argument
2185   /**
2186     Request for IX metadata lock protecting against GRL which is
2187     passed from connection thread to the handler thread.
2188   */
2189   MDL_request grl_protection;
Delayed_insert(SELECT_LEX * current_select)2190   Delayed_insert(SELECT_LEX *current_select)
2191     :locks_in_memory(0), thd(next_thread_id()),
2192      table(0),tables_in_use(0), stacked_inserts(0),
2193      status(0), retry(0), handler_thread_initialized(FALSE), group_count(0)
2194   {
2195     DBUG_ENTER("Delayed_insert constructor");
2196     thd.security_ctx->user=(char*) delayed_user;
2197     thd.security_ctx->host=(char*) my_localhost;
2198     thd.security_ctx->ip= NULL;
2199     thd.query_id= 0;
2200     strmake_buf(thd.security_ctx->priv_user, thd.security_ctx->user);
2201     thd.current_tablenr=0;
2202     thd.set_command(COM_DELAYED_INSERT);
2203     thd.lex->current_select= current_select;
2204     thd.lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
2205     /*
2206       Prevent changes to global.lock_wait_timeout from affecting
2207       delayed insert threads as any timeouts in delayed inserts
2208       are not communicated to the client.
2209     */
2210     thd.variables.lock_wait_timeout= LONG_TIMEOUT;
2211 
2212     bzero((char*) &thd.net, sizeof(thd.net));		// Safety
2213     bzero((char*) &table_list, sizeof(table_list));	// Safety
2214     thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
2215     thd.security_ctx->host_or_ip= "";
2216     bzero((char*) &info,sizeof(info));
2217     mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
2218     mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
2219     mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
2220     mysql_mutex_lock(&LOCK_thread_count);
2221     delayed_insert_threads++;
2222     delayed_lock= global_system_variables.low_priority_updates ?
2223                                           TL_WRITE_LOW_PRIORITY : TL_WRITE;
2224     mysql_mutex_unlock(&LOCK_thread_count);
2225     DBUG_VOID_RETURN;
2226   }
~Delayed_insert()2227   ~Delayed_insert()
2228   {
2229     /* The following is not really needed, but just for safety */
2230     delayed_row *row;
2231     while ((row=rows.get()))
2232       delete row;
2233     if (table)
2234     {
2235       close_thread_tables(&thd);
2236       thd.mdl_context.release_transactional_locks();
2237     }
2238     mysql_mutex_destroy(&mutex);
2239     mysql_cond_destroy(&cond);
2240     mysql_cond_destroy(&cond_client);
2241 
2242     /*
2243       We could use unlink_not_visible_threads() here, but as
2244       delayed_insert_threads also needs to be protected by
2245       the LOCK_thread_count mutex, we open code this.
2246     */
2247     mysql_mutex_lock(&LOCK_thread_count);
2248     thd.unlink();				// Must be unlinked under lock
2249     delayed_insert_threads--;
2250     mysql_mutex_unlock(&LOCK_thread_count);
2251 
2252     my_free(thd.query());
2253     thd.security_ctx->user= 0;
2254     thd.security_ctx->host= 0;
2255   }
2256 
2257   /* The following is for checking when we can delete ourselves */
lock()2258   inline void lock()
2259   {
2260     locks_in_memory++;				// Assume LOCK_delay_insert
2261   }
unlock()2262   void unlock()
2263   {
2264     mysql_mutex_lock(&LOCK_delayed_insert);
2265     if (!--locks_in_memory)
2266     {
2267       mysql_mutex_lock(&mutex);
2268       if (thd.killed && ! stacked_inserts && ! tables_in_use)
2269       {
2270         mysql_cond_signal(&cond);
2271 	status=1;
2272       }
2273       mysql_mutex_unlock(&mutex);
2274     }
2275     mysql_mutex_unlock(&LOCK_delayed_insert);
2276   }
lock_count()2277   inline uint lock_count() { return locks_in_memory; }
2278 
2279   TABLE* get_local_table(THD* client_thd);
2280   bool open_and_lock_table();
2281   bool handle_inserts(void);
2282 };
2283 
2284 
2285 I_List<Delayed_insert> delayed_threads;
2286 
2287 
2288 /**
2289   Return an instance of delayed insert thread that can handle
2290   inserts into a given table, if it exists. Otherwise return NULL.
2291 */
2292 
2293 static
find_handler(THD * thd,TABLE_LIST * table_list)2294 Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
2295 {
2296   THD_STAGE_INFO(thd, stage_waiting_for_delay_list);
2297   mysql_mutex_lock(&LOCK_delayed_insert);       // Protect master list
2298   I_List_iterator<Delayed_insert> it(delayed_threads);
2299   Delayed_insert *di;
2300   while ((di= it++))
2301   {
2302     if (!cmp(&table_list->db, &di->table_list.db) &&
2303 	!cmp(&table_list->table_name, &di->table_list.table_name))
2304     {
2305       di->lock();
2306       break;
2307     }
2308   }
2309   mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2310   return di;
2311 }
2312 
2313 
2314 /**
2315   Attempt to find or create a delayed insert thread to handle inserts
2316   into this table.
2317 
2318   @return In case of success, table_list->table points to a local copy
2319           of the delayed table or is set to NULL, which indicates a
2320           request for lock upgrade. In case of failure, value of
2321           table_list->table is undefined.
2322   @retval TRUE  - this thread ran out of resources OR
2323                 - a newly created delayed insert thread ran out of
2324                   resources OR
2325                 - the created thread failed to open and lock the table
2326                   (e.g. because it does not exist) OR
2327                 - the table opened in the created thread turned out to
2328                   be a view
2329   @retval FALSE - table successfully opened OR
2330                 - too many delayed insert threads OR
2331                 - the table has triggers and we have to fall back to
2332                   a normal INSERT
2333                 Two latter cases indicate a request for lock upgrade.
2334 
2335   XXX: why do we regard INSERT DELAYED into a view as an error and
2336   do not simply perform a lock upgrade?
2337 
2338   TODO: The approach with using two mutexes to work with the
2339   delayed thread list -- LOCK_delayed_insert and
2340   LOCK_delayed_create -- is redundant, and we only need one of
2341   them to protect the list.  The reason we have two locks is that
2342   we do not want to block look-ups in the list while we're waiting
2343   for the newly created thread to open the delayed table. However,
2344   this wait itself is redundant -- we always call get_local_table
2345   later on, and there wait again until the created thread acquires
2346   a table lock.
2347 
2348   As is redundant the concept of locks_in_memory, since we already
2349   have another counter with similar semantics - tables_in_use,
2350   both of them are devoted to counting the number of producers for
2351   a given consumer (delayed insert thread), only at different
2352   stages of producer-consumer relationship.
2353 
2354   The 'status' variable in Delayed_insert is redundant
2355   too, since there is already di->stacked_inserts.
2356 */
2357 
2358 static
delayed_get_table(THD * thd,MDL_request * grl_protection_request,TABLE_LIST * table_list)2359 bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
2360                        TABLE_LIST *table_list)
2361 {
2362   int error;
2363   Delayed_insert *di;
2364   DBUG_ENTER("delayed_get_table");
2365 
2366   /* Must be set in the parser */
2367   DBUG_ASSERT(table_list->db.str);
2368 
2369   /* Find the thread which handles this table. */
2370   if (!(di= find_handler(thd, table_list)))
2371   {
2372     /*
2373       No match. Create a new thread to handle the table, but
2374       no more than max_insert_delayed_threads.
2375     */
2376     if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
2377       DBUG_RETURN(0);
2378     THD_STAGE_INFO(thd, stage_creating_delayed_handler);
2379     mysql_mutex_lock(&LOCK_delayed_create);
2380     /*
2381       The first search above was done without LOCK_delayed_create.
2382       Another thread might have created the handler in between. Search again.
2383     */
2384     if (! (di= find_handler(thd, table_list)))
2385     {
2386       if (!(di= new Delayed_insert(thd->lex->current_select)))
2387         goto end_create;
2388 
2389       /*
2390         Annotating delayed inserts is not supported.
2391       */
2392       di->thd.variables.binlog_annotate_row_events= 0;
2393 
2394       di->thd.set_db(&table_list->db);
2395       di->thd.set_query(my_strndup(table_list->table_name.str,
2396                                    table_list->table_name.length,
2397                                    MYF(MY_WME | ME_FATALERROR)),
2398                         table_list->table_name.length, system_charset_info);
2399       if (di->thd.db.str == NULL || di->thd.query() == NULL)
2400       {
2401         /* The error is reported */
2402 	delete di;
2403         goto end_create;
2404       }
2405       di->table_list= *table_list;			// Needed to open table
2406       /* Replace volatile strings with local copies */
2407       di->table_list.alias.str=    di->table_list.table_name.str=    di->thd.query();
2408       di->table_list.alias.length= di->table_list.table_name.length= di->thd.query_length();
2409       di->table_list.db= di->thd.db;
2410       /* We need the tickets so that they can be cloned in handle_delayed_insert */
2411       di->grl_protection.init(MDL_key::GLOBAL, "", "",
2412                               MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT);
2413       di->grl_protection.ticket= grl_protection_request->ticket;
2414       init_mdl_requests(&di->table_list);
2415       di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
2416 
2417       di->lock();
2418       mysql_mutex_lock(&di->mutex);
2419       if ((error= mysql_thread_create(key_thread_delayed_insert,
2420                                       &di->thd.real_id, &connection_attrib,
2421                                       handle_delayed_insert, (void*) di)))
2422       {
2423 	DBUG_PRINT("error",
2424 		   ("Can't create thread to handle delayed insert (error %d)",
2425 		    error));
2426         mysql_mutex_unlock(&di->mutex);
2427 	di->unlock();
2428 	delete di;
2429 	my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATALERROR), error);
2430         goto end_create;
2431       }
2432 
2433       /*
2434         Wait until table is open unless the handler thread or the connection
2435         thread has been killed. Note that we in all cases must wait until the
2436         handler thread has been properly initialized before exiting. Otherwise
2437         we risk doing clone_ticket() on a ticket that is no longer valid.
2438       */
2439       THD_STAGE_INFO(thd, stage_waiting_for_handler_open);
2440       while (!di->handler_thread_initialized ||
2441              (!di->thd.killed && !di->table && !thd->killed))
2442       {
2443         mysql_cond_wait(&di->cond_client, &di->mutex);
2444       }
2445       mysql_mutex_unlock(&di->mutex);
2446       THD_STAGE_INFO(thd, stage_got_old_table);
2447       if (thd->killed)
2448       {
2449         di->unlock();
2450         goto end_create;
2451       }
2452       if (di->thd.killed)
2453       {
2454         if (di->thd.is_error() && ! di->retry)
2455         {
2456           /*
2457             Copy the error message. Note that we don't treat fatal
2458             errors in the delayed thread as fatal errors in the
2459             main thread. If delayed thread was killed, we don't
2460             want to send "Server shutdown in progress" in the
2461             INSERT THREAD.
2462           */
2463           my_message(di->thd.get_stmt_da()->sql_errno(),
2464                      di->thd.get_stmt_da()->message(),
2465                      MYF(0));
2466         }
2467         di->unlock();
2468         goto end_create;
2469       }
2470       mysql_mutex_lock(&LOCK_delayed_insert);
2471       delayed_threads.append(di);
2472       mysql_mutex_unlock(&LOCK_delayed_insert);
2473     }
2474     mysql_mutex_unlock(&LOCK_delayed_create);
2475   }
2476 
2477   mysql_mutex_lock(&di->mutex);
2478   table_list->table= di->get_local_table(thd);
2479   mysql_mutex_unlock(&di->mutex);
2480   if (table_list->table)
2481   {
2482     DBUG_ASSERT(! thd->is_error());
2483     thd->di= di;
2484   }
2485   /* Unlock the delayed insert object after its last access. */
2486   di->unlock();
2487   DBUG_RETURN((table_list->table == NULL));
2488 
2489 end_create:
2490   mysql_mutex_unlock(&LOCK_delayed_create);
2491   DBUG_RETURN(thd->is_error());
2492 }
2493 
2494 #define memdup_vcol(thd, vcol)                                            \
2495   if (vcol)                                                               \
2496   {                                                                       \
2497     (vcol)= (Virtual_column_info*)(thd)->memdup((vcol), sizeof(*(vcol))); \
2498     (vcol)->expr= NULL;                                                   \
2499   }
2500 
2501 /**
2502   As we can't let many client threads modify the same TABLE
2503   structure of the dedicated delayed insert thread, we create an
2504   own structure for each client thread. This includes a row
2505   buffer to save the column values and new fields that point to
2506   the new row buffer. The memory is allocated in the client
2507   thread and is freed automatically.
2508 
2509   @pre This function is called from the client thread.  Delayed
2510        insert thread mutex must be acquired before invoking this
2511        function.
2512 
2513   @return Not-NULL table object on success. NULL in case of an error,
2514                     which is set in client_thd.
2515 */
2516 
get_local_table(THD * client_thd)2517 TABLE *Delayed_insert::get_local_table(THD* client_thd)
2518 {
2519   my_ptrdiff_t adjust_ptrs;
2520   Field **field,**org_field, *found_next_number_field;
2521   TABLE *copy;
2522   TABLE_SHARE *share;
2523   uchar *bitmap;
2524   char *copy_tmp;
2525   uint bitmaps_used;
2526   DBUG_ENTER("Delayed_insert::get_local_table");
2527 
2528   /* First request insert thread to get a lock */
2529   status=1;
2530   tables_in_use++;
2531   if (!thd.lock)				// Table is not locked
2532   {
2533     THD_STAGE_INFO(client_thd, stage_waiting_for_handler_lock);
2534     mysql_cond_signal(&cond);			// Tell handler to lock table
2535     while (!thd.killed && !thd.lock && ! client_thd->killed)
2536     {
2537       mysql_cond_wait(&cond_client, &mutex);
2538     }
2539     THD_STAGE_INFO(client_thd, stage_got_handler_lock);
2540     if (client_thd->killed)
2541       goto error;
2542     if (thd.killed)
2543     {
2544       /*
2545         Copy the error message. Note that we don't treat fatal
2546         errors in the delayed thread as fatal errors in the
2547         main thread. If delayed thread was killed, we don't
2548         want to send "Server shutdown in progress" in the
2549         INSERT THREAD.
2550 
2551         The thread could be killed with an error message if
2552         di->handle_inserts() or di->open_and_lock_table() fails.
2553         The thread could be killed without an error message if
2554         killed using THD::notify_shared_lock() or
2555         kill_delayed_threads_for_table().
2556       */
2557       if (!thd.is_error())
2558         my_message(ER_QUERY_INTERRUPTED, ER_THD(&thd, ER_QUERY_INTERRUPTED),
2559                    MYF(0));
2560       else
2561         my_message(thd.get_stmt_da()->sql_errno(),
2562                    thd.get_stmt_da()->message(), MYF(0));
2563       goto error;
2564     }
2565   }
2566   share= table->s;
2567 
2568   /*
2569     Allocate memory for the TABLE object, the field pointers array, and
2570     one record buffer of reclength size. Normally a table has three
2571     record buffers of rec_buff_length size, which includes alignment
2572     bytes. Since the table copy is used for creating one record only,
2573     the other record buffers and alignment are unnecessary.
2574   */
2575   THD_STAGE_INFO(client_thd, stage_allocating_local_table);
2576   copy_tmp= (char*) client_thd->alloc(sizeof(*copy)+
2577                                       (share->fields+1)*sizeof(Field**)+
2578                                       share->reclength +
2579                                       share->column_bitmap_size*4);
2580   if (!copy_tmp)
2581     goto error;
2582 
2583   /* Copy the TABLE object. */
2584   copy= new (copy_tmp) TABLE;
2585   *copy= *table;
2586 
2587   /* We don't need to change the file handler here */
2588   /* Assign the pointers for the field pointers array and the record. */
2589   field= copy->field= (Field**) (copy + 1);
2590   bitmap= (uchar*) (field + share->fields + 1);
2591   copy->record[0]= (bitmap + share->column_bitmap_size*4);
2592   memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2593   if (share->default_fields || share->default_expressions)
2594   {
2595     copy->default_field= (Field**)
2596       client_thd->alloc((share->default_fields +
2597                          share->default_expressions + 1)*
2598                         sizeof(Field*));
2599     if (!copy->default_field)
2600       goto error;
2601   }
2602 
2603   if (share->virtual_fields)
2604   {
2605     copy->vfield= (Field **) client_thd->alloc((share->virtual_fields+1)*
2606                                                sizeof(Field*));
2607     if (!copy->vfield)
2608       goto error;
2609   }
2610   copy->expr_arena= NULL;
2611 
2612   /* Ensure we don't use the table list of the original table */
2613   copy->pos_in_table_list= 0;
2614 
2615   /*
2616     Make a copy of all fields.
2617     The copied fields need to point into the copied record. This is done
2618     by copying the field objects with their old pointer values and then
2619     "move" the pointers by the distance between the original and copied
2620     records. That way we preserve the relative positions in the records.
2621   */
2622   adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
2623   found_next_number_field= table->found_next_number_field;
2624   for (org_field= table->field; *org_field; org_field++, field++)
2625   {
2626     if (!(*field= (*org_field)->make_new_field(client_thd->mem_root, copy, 1)))
2627       goto error;
2628     (*field)->unireg_check= (*org_field)->unireg_check;
2629     (*field)->invisible= (*org_field)->invisible;
2630     (*field)->orig_table= copy;			// Remove connection
2631     (*field)->move_field_offset(adjust_ptrs);	// Point at copy->record[0]
2632     memdup_vcol(client_thd, (*field)->vcol_info);
2633     memdup_vcol(client_thd, (*field)->default_value);
2634     memdup_vcol(client_thd, (*field)->check_constraint);
2635     if (*org_field == found_next_number_field)
2636       (*field)->table->found_next_number_field= *field;
2637   }
2638   *field=0;
2639 
2640   if (share->virtual_fields || share->default_expressions ||
2641       share->default_fields)
2642   {
2643     bool error_reported= FALSE;
2644     if (unlikely(!(copy->def_vcol_set=
2645                    (MY_BITMAP*) alloc_root(client_thd->mem_root,
2646                                            sizeof(MY_BITMAP)))))
2647       goto error;
2648     if (unlikely(parse_vcol_defs(client_thd, client_thd->mem_root, copy,
2649                                  &error_reported,
2650                                  VCOL_INIT_DEPENDENCY_FAILURE_IS_WARNING)))
2651       goto error;
2652   }
2653 
2654   switch_defaults_to_nullable_trigger_fields(copy);
2655 
2656   /* Adjust in_use for pointing to client thread */
2657   copy->in_use= client_thd;
2658 
2659   /* Adjust lock_count. This table object is not part of a lock. */
2660   copy->lock_count= 0;
2661 
2662   /* Adjust bitmaps */
2663   copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
2664   copy->def_write_set.bitmap= ((my_bitmap_map*)
2665                                (bitmap + share->column_bitmap_size));
2666   bitmaps_used= 2;
2667   if (share->virtual_fields)
2668   {
2669     my_bitmap_init(copy->def_vcol_set,
2670                    (my_bitmap_map*) (bitmap +
2671                                      bitmaps_used*share->column_bitmap_size),
2672                    share->fields, FALSE);
2673     bitmaps_used++;
2674     copy->vcol_set= copy->def_vcol_set;
2675   }
2676   if (share->default_fields || share->default_expressions)
2677   {
2678     my_bitmap_init(&copy->has_value_set,
2679                    (my_bitmap_map*) (bitmap +
2680                                      bitmaps_used*share->column_bitmap_size),
2681                    share->fields, FALSE);
2682   }
2683   copy->tmp_set.bitmap= 0;                      // To catch errors
2684   bzero((char*) bitmap, share->column_bitmap_size * bitmaps_used);
2685   copy->read_set=  &copy->def_read_set;
2686   copy->write_set= &copy->def_write_set;
2687 
2688   DBUG_RETURN(copy);
2689 
2690   /* Got fatal error */
2691  error:
2692   tables_in_use--;
2693   mysql_cond_signal(&cond);                     // Inform thread about abort
2694   DBUG_RETURN(0);
2695 }
2696 
2697 
2698 /* Put a question in queue */
2699 
2700 static
write_delayed(THD * thd,TABLE * table,enum_duplicates duplic,LEX_STRING query,bool ignore,bool log_on)2701 int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
2702                   LEX_STRING query, bool ignore, bool log_on)
2703 {
2704   delayed_row *row= 0;
2705   Delayed_insert *di=thd->di;
2706   const Discrete_interval *forced_auto_inc;
2707   size_t user_len, host_len, ip_length;
2708   DBUG_ENTER("write_delayed");
2709   DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
2710                        (ulong) query.length));
2711 
2712   THD_STAGE_INFO(thd, stage_waiting_for_handler_insert);
2713   mysql_mutex_lock(&di->mutex);
2714   while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
2715     mysql_cond_wait(&di->cond_client, &di->mutex);
2716   THD_STAGE_INFO(thd, stage_storing_row_into_queue);
2717 
2718   if (thd->killed)
2719     goto err;
2720 
2721   /*
2722     Take a copy of the query string, if there is any. The string will
2723     be free'ed when the row is destroyed. If there is no query string,
2724     we don't do anything special.
2725    */
2726 
2727   if (query.str)
2728   {
2729     char *str;
2730     if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
2731       goto err;
2732     query.str= str;
2733   }
2734   row= new delayed_row(query, duplic, ignore, log_on);
2735   if (row == NULL)
2736   {
2737     my_free(query.str);
2738     goto err;
2739   }
2740 
2741   user_len= host_len= ip_length= 0;
2742   row->user= row->host= row->ip= NULL;
2743   if (thd->security_ctx)
2744   {
2745     if (thd->security_ctx->user)
2746       user_len= strlen(thd->security_ctx->user) + 1;
2747     if (thd->security_ctx->host)
2748       host_len= strlen(thd->security_ctx->host) + 1;
2749     if (thd->security_ctx->ip)
2750       ip_length= strlen(thd->security_ctx->ip) + 1;
2751   }
2752   /* This can't be THREAD_SPECIFIC as it's freed in delayed thread */
2753   if (!(row->record= (char*) my_malloc(table->s->reclength +
2754                                        user_len + host_len + ip_length,
2755                                        MYF(MY_WME))))
2756     goto err;
2757   memcpy(row->record, table->record[0], table->s->reclength);
2758 
2759   if (thd->security_ctx)
2760   {
2761     if (thd->security_ctx->user)
2762     {
2763       row->user= row->record + table->s->reclength;
2764       memcpy(row->user, thd->security_ctx->user, user_len);
2765     }
2766     if (thd->security_ctx->host)
2767     {
2768       row->host= row->record + table->s->reclength + user_len;
2769       memcpy(row->host, thd->security_ctx->host, host_len);
2770     }
2771     if (thd->security_ctx->ip)
2772     {
2773       row->ip= row->record + table->s->reclength + user_len + host_len;
2774       memcpy(row->ip, thd->security_ctx->ip, ip_length);
2775     }
2776   }
2777   row->query_id= thd->query_id;
2778   row->thread_id= thd->thread_id;
2779 
2780   row->start_time=                thd->start_time;
2781   row->start_time_sec_part=       thd->start_time_sec_part;
2782   row->query_start_sec_part_used= thd->query_start_sec_part_used;
2783   /*
2784     those are for the binlog: LAST_INSERT_ID() has been evaluated at this
2785     time, so record does not need it, but statement-based binlogging of the
2786     INSERT will need when the row is actually inserted.
2787     As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
2788   */
2789   row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
2790     thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2791   row->first_successful_insert_id_in_prev_stmt=
2792     thd->first_successful_insert_id_in_prev_stmt;
2793 
2794   /* Add session variable timezone
2795      Time_zone object will not be freed even the thread is ended.
2796      So we can get time_zone object from thread which handling delayed statement.
2797      See the comment of my_tz_find() for detail.
2798   */
2799   if (thd->time_zone_used)
2800   {
2801     row->time_zone = thd->variables.time_zone;
2802   }
2803   else
2804   {
2805     row->time_zone = NULL;
2806   }
2807   /* Copy session variables. */
2808   row->auto_increment_increment= thd->variables.auto_increment_increment;
2809   row->auto_increment_offset=    thd->variables.auto_increment_offset;
2810   row->sql_mode=                 thd->variables.sql_mode;
2811   row->auto_increment_field_not_null= table->auto_increment_field_not_null;
2812 
2813   /* Copy the next forced auto increment value, if any. */
2814   if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
2815   {
2816     row->forced_insert_id= forced_auto_inc->minimum();
2817     DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
2818                            (ulong) row->forced_insert_id));
2819   }
2820 
2821   di->rows.push_back(row);
2822   di->stacked_inserts++;
2823   di->status=1;
2824   if (table->s->blob_fields)
2825     unlink_blobs(table);
2826   mysql_cond_signal(&di->cond);
2827 
2828   thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
2829   mysql_mutex_unlock(&di->mutex);
2830   DBUG_RETURN(0);
2831 
2832  err:
2833   delete row;
2834   mysql_mutex_unlock(&di->mutex);
2835   DBUG_RETURN(1);
2836 }
2837 
2838 /**
2839   Signal the delayed insert thread that this user connection
2840   is finished using it for this statement.
2841 */
2842 
end_delayed_insert(THD * thd)2843 static void end_delayed_insert(THD *thd)
2844 {
2845   DBUG_ENTER("end_delayed_insert");
2846   Delayed_insert *di=thd->di;
2847   mysql_mutex_lock(&di->mutex);
2848   DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
2849   if (!--di->tables_in_use || di->thd.killed)
2850   {						// Unlock table
2851     di->status=1;
2852     mysql_cond_signal(&di->cond);
2853   }
2854   mysql_mutex_unlock(&di->mutex);
2855   DBUG_VOID_RETURN;
2856 }
2857 
2858 
2859 /* We kill all delayed threads when doing flush-tables */
2860 
kill_delayed_threads(void)2861 void kill_delayed_threads(void)
2862 {
2863   DBUG_ENTER("kill_delayed_threads");
2864   mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
2865 
2866   I_List_iterator<Delayed_insert> it(delayed_threads);
2867   Delayed_insert *di;
2868   while ((di= it++))
2869   {
2870     mysql_mutex_lock(&di->thd.LOCK_thd_kill);
2871     if (di->thd.killed < KILL_CONNECTION)
2872       di->thd.set_killed_no_mutex(KILL_CONNECTION);
2873     if (di->thd.mysys_var)
2874     {
2875       mysql_mutex_lock(&di->thd.mysys_var->mutex);
2876       if (di->thd.mysys_var->current_cond)
2877       {
2878 	/*
2879 	  We need the following test because the main mutex may be locked
2880 	  in handle_delayed_insert()
2881 	*/
2882 	if (&di->mutex != di->thd.mysys_var->current_mutex)
2883           mysql_mutex_lock(di->thd.mysys_var->current_mutex);
2884         mysql_cond_broadcast(di->thd.mysys_var->current_cond);
2885 	if (&di->mutex != di->thd.mysys_var->current_mutex)
2886           mysql_mutex_unlock(di->thd.mysys_var->current_mutex);
2887       }
2888       mysql_mutex_unlock(&di->thd.mysys_var->mutex);
2889     }
2890     mysql_mutex_unlock(&di->thd.LOCK_thd_kill);
2891   }
2892   mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2893   DBUG_VOID_RETURN;
2894 }
2895 
2896 
2897 /**
2898   A strategy for the prelocking algorithm which prevents the
2899   delayed insert thread from opening tables with engines which
2900   do not support delayed inserts.
2901 
2902   Particularly it allows to abort open_tables() as soon as we
2903   discover that we have opened a MERGE table, without acquiring
2904   metadata locks on underlying tables.
2905 */
2906 
2907 class Delayed_prelocking_strategy : public Prelocking_strategy
2908 {
2909 public:
2910   virtual bool handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2911                               Sroutine_hash_entry *rt, sp_head *sp,
2912                               bool *need_prelocking);
2913   virtual bool handle_table(THD *thd, Query_tables_list *prelocking_ctx,
2914                             TABLE_LIST *table_list, bool *need_prelocking);
2915   virtual bool handle_view(THD *thd, Query_tables_list *prelocking_ctx,
2916                            TABLE_LIST *table_list, bool *need_prelocking);
2917 };
2918 
2919 
2920 bool Delayed_prelocking_strategy::
handle_table(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)2921 handle_table(THD *thd, Query_tables_list *prelocking_ctx,
2922              TABLE_LIST *table_list, bool *need_prelocking)
2923 {
2924   DBUG_ASSERT(table_list->lock_type == TL_WRITE_DELAYED);
2925 
2926   if (!(table_list->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
2927   {
2928     my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), table_list->table_name.str);
2929     return TRUE;
2930   }
2931   return FALSE;
2932 }
2933 
2934 
2935 bool Delayed_prelocking_strategy::
handle_routine(THD * thd,Query_tables_list * prelocking_ctx,Sroutine_hash_entry * rt,sp_head * sp,bool * need_prelocking)2936 handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2937                Sroutine_hash_entry *rt, sp_head *sp,
2938                bool *need_prelocking)
2939 {
2940   /* LEX used by the delayed insert thread has no routines. */
2941   DBUG_ASSERT(0);
2942   return FALSE;
2943 }
2944 
2945 
2946 bool Delayed_prelocking_strategy::
handle_view(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)2947 handle_view(THD *thd, Query_tables_list *prelocking_ctx,
2948             TABLE_LIST *table_list, bool *need_prelocking)
2949 {
2950   /* We don't open views in the delayed insert thread. */
2951   DBUG_ASSERT(0);
2952   return FALSE;
2953 }
2954 
2955 
2956 /**
2957    Open and lock table for use by delayed thread and check that
2958    this table is suitable for delayed inserts.
2959 
2960    @retval FALSE - Success.
2961    @retval TRUE  - Failure.
2962 */
2963 
open_and_lock_table()2964 bool Delayed_insert::open_and_lock_table()
2965 {
2966   Delayed_prelocking_strategy prelocking_strategy;
2967 
2968   /*
2969     Use special prelocking strategy to get ER_DELAYED_NOT_SUPPORTED
2970     error for tables with engines which don't support delayed inserts.
2971 
2972     We can't do auto-repair in insert delayed thread, as it would hang
2973     when trying to an exclusive MDL_LOCK on the table during repair
2974     as the connection thread has a SHARED_WRITE lock.
2975   */
2976   if (!(table= open_n_lock_single_table(&thd, &table_list,
2977                                         TL_WRITE_DELAYED,
2978                                         MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
2979                                         MYSQL_OPEN_IGNORE_REPAIR,
2980                                         &prelocking_strategy)))
2981   {
2982     /* If table was crashed, then upper level should retry open+repair */
2983     retry= table_list.crashed;
2984     thd.fatal_error();                      // Abort waiting inserts
2985     return TRUE;
2986   }
2987 
2988   if (table->triggers || table->check_constraints)
2989   {
2990     /*
2991       Table has triggers or check constraints. This is not an error, but we do
2992       not support these with delayed insert. Terminate the delayed
2993       thread without an error and thus request lock upgrade.
2994     */
2995     return TRUE;
2996   }
2997   table->copy_blobs= 1;
2998   return FALSE;
2999 }
3000 
3001 
3002 /*
3003  * Create a new delayed insert thread
3004 */
3005 
handle_delayed_insert(void * arg)3006 pthread_handler_t handle_delayed_insert(void *arg)
3007 {
3008   Delayed_insert *di=(Delayed_insert*) arg;
3009   THD *thd= &di->thd;
3010 
3011   pthread_detach_this_thread();
3012   /* Add thread to THD list so that's it's visible in 'show processlist' */
3013   thd->set_start_time();
3014   add_to_active_threads(thd);
3015   if (abort_loop)
3016     thd->set_killed(KILL_CONNECTION);
3017   else
3018     thd->reset_killed();
3019 
3020   mysql_thread_set_psi_id(thd->thread_id);
3021 
3022   /*
3023     Wait until the client runs into mysql_cond_wait(),
3024     where we free it after the table is opened and di linked in the list.
3025     If we did not wait here, the client might detect the opened table
3026     before it is linked to the list. It would release LOCK_delayed_create
3027     and allow another thread to create another handler for the same table,
3028     since it does not find one in the list.
3029   */
3030   mysql_mutex_lock(&di->mutex);
3031   if (my_thread_init())
3032   {
3033     /* Can't use my_error since store_globals has not yet been called */
3034     thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3035     di->handler_thread_initialized= TRUE;
3036   }
3037   else
3038   {
3039     DBUG_ENTER("handle_delayed_insert");
3040     thd->thread_stack= (char*) &thd;
3041     if (init_thr_lock() || thd->store_globals())
3042     {
3043       /* Can't use my_error since store_globals has perhaps failed */
3044       thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3045       di->handler_thread_initialized= TRUE;
3046       thd->fatal_error();
3047       goto err;
3048     }
3049 
3050     thd->lex->sql_command= SQLCOM_INSERT;        // For innodb::store_lock()
3051 
3052     /*
3053       INSERT DELAYED has to go to row-based format because the time
3054       at which rows are inserted cannot be determined in mixed mode.
3055     */
3056     thd->set_current_stmt_binlog_format_row_if_mixed();
3057 
3058     /*
3059       Clone tickets representing protection against GRL and the lock on
3060       the target table for the insert and add them to the list of granted
3061       metadata locks held by the handler thread. This is safe since the
3062       handler thread is not holding nor waiting on any metadata locks.
3063     */
3064     if (thd->mdl_context.clone_ticket(&di->grl_protection) ||
3065         thd->mdl_context.clone_ticket(&di->table_list.mdl_request))
3066     {
3067       thd->release_transactional_locks();
3068       di->handler_thread_initialized= TRUE;
3069       goto err;
3070     }
3071 
3072     /*
3073       Now that the ticket has been cloned, it is safe for the connection
3074       thread to exit.
3075     */
3076     di->handler_thread_initialized= TRUE;
3077     di->table_list.mdl_request.ticket= NULL;
3078 
3079     if (di->open_and_lock_table())
3080       goto err;
3081 
3082     /*
3083       INSERT DELAYED generally expects thd->lex->current_select to be NULL,
3084       since this is not an attribute of the current thread. This can lead to
3085       problems if the thread that spawned the current one disconnects.
3086       current_select will then point to freed memory. But current_select is
3087       required to resolve the partition function. So, after fulfilling that
3088       requirement, we set the current_select to 0.
3089     */
3090     thd->lex->current_select= NULL;
3091 
3092     /* Tell client that the thread is initialized */
3093     mysql_cond_signal(&di->cond_client);
3094 
3095     /*
3096       Inform mdl that it needs to call mysql_lock_abort to abort locks
3097       for delayed insert.
3098     */
3099     thd->mdl_context.set_needs_thr_lock_abort(TRUE);
3100 
3101     di->table->mark_columns_needed_for_insert();
3102     /* Mark all columns for write as we don't know which columns we get from user */
3103     bitmap_set_all(di->table->write_set);
3104 
3105     /* Now wait until we get an insert or lock to handle */
3106     /* We will not abort as long as a client thread uses this thread */
3107 
3108     for (;;)
3109     {
3110       if (thd->killed)
3111       {
3112         uint lock_count;
3113         DBUG_PRINT("delayed", ("Insert delayed killed"));
3114         /*
3115           Remove this from delay insert list so that no one can request a
3116           table from this
3117         */
3118         mysql_mutex_unlock(&di->mutex);
3119         mysql_mutex_lock(&LOCK_delayed_insert);
3120         di->unlink();
3121         lock_count=di->lock_count();
3122         mysql_mutex_unlock(&LOCK_delayed_insert);
3123         mysql_mutex_lock(&di->mutex);
3124         if (!lock_count && !di->tables_in_use && !di->stacked_inserts &&
3125             !thd->lock)
3126           break;					// Time to die
3127       }
3128 
3129       /* Shouldn't wait if killed or an insert is waiting. */
3130       DBUG_PRINT("delayed",
3131                  ("thd->killed: %d  di->status: %d  di->stacked_inserts: %d",
3132                   thd->killed, di->status, di->stacked_inserts));
3133       if (!thd->killed && !di->status && !di->stacked_inserts)
3134       {
3135         struct timespec abstime;
3136         set_timespec(abstime, delayed_insert_timeout);
3137 
3138         /* Information for pthread_kill */
3139         mysql_mutex_unlock(&di->mutex);
3140         mysql_mutex_lock(&di->thd.mysys_var->mutex);
3141         di->thd.mysys_var->current_mutex= &di->mutex;
3142         di->thd.mysys_var->current_cond= &di->cond;
3143         mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3144         mysql_mutex_lock(&di->mutex);
3145         THD_STAGE_INFO(&(di->thd), stage_waiting_for_insert);
3146 
3147         DBUG_PRINT("info",("Waiting for someone to insert rows"));
3148         while (!thd->killed && !di->status)
3149         {
3150           int error;
3151           mysql_audit_release(thd);
3152           error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
3153 #ifdef EXTRA_DEBUG
3154           if (error && error != EINTR && error != ETIMEDOUT)
3155           {
3156             fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
3157             DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
3158                                  error));
3159           }
3160 #endif
3161           if (error == ETIMEDOUT || error == ETIME)
3162             thd->set_killed(KILL_CONNECTION);
3163         }
3164         /* We can't lock di->mutex and mysys_var->mutex at the same time */
3165         mysql_mutex_unlock(&di->mutex);
3166         mysql_mutex_lock(&di->thd.mysys_var->mutex);
3167         di->thd.mysys_var->current_mutex= 0;
3168         di->thd.mysys_var->current_cond= 0;
3169         mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3170         mysql_mutex_lock(&di->mutex);
3171       }
3172       DBUG_PRINT("delayed",
3173                  ("thd->killed: %d  di->tables_in_use: %d  thd->lock: %d",
3174                   thd->killed, di->tables_in_use, thd->lock != 0));
3175 
3176       if (di->tables_in_use && ! thd->lock && !thd->killed)
3177       {
3178         /*
3179           Request for new delayed insert.
3180           Lock the table, but avoid to be blocked by a global read lock.
3181           If we got here while a global read lock exists, then one or more
3182           inserts started before the lock was requested. These are allowed
3183           to complete their work before the server returns control to the
3184           client which requested the global read lock. The delayed insert
3185           handler will close the table and finish when the outstanding
3186           inserts are done.
3187         */
3188         if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 0)))
3189         {
3190           /* Fatal error */
3191           thd->set_killed(KILL_CONNECTION);
3192         }
3193         mysql_cond_broadcast(&di->cond_client);
3194       }
3195       if (di->stacked_inserts)
3196       {
3197         delayed_row *row;
3198         I_List_iterator<delayed_row> it(di->rows);
3199         my_thread_id cur_thd= di->thd.thread_id;
3200 
3201         while ((row= it++))
3202         {
3203           if (cur_thd != row->thread_id)
3204           {
3205             mysql_audit_external_lock_ex(&di->thd, row->thread_id,
3206                 row->user, row->host, row->ip, row->query_id,
3207                 di->table->s, F_WRLCK);
3208             cur_thd= row->thread_id;
3209           }
3210         }
3211         if (di->handle_inserts())
3212         {
3213           /* Some fatal error */
3214           thd->set_killed(KILL_CONNECTION);
3215         }
3216       }
3217       di->status=0;
3218       if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
3219       {
3220         /*
3221           No one is doing a insert delayed
3222           Unlock table so that other threads can use it
3223         */
3224         MYSQL_LOCK *lock=thd->lock;
3225         thd->lock=0;
3226         mysql_mutex_unlock(&di->mutex);
3227         /*
3228           We need to release next_insert_id before unlocking. This is
3229           enforced by handler::ha_external_lock().
3230         */
3231         di->table->file->ha_release_auto_increment();
3232         mysql_unlock_tables(thd, lock);
3233         trans_commit_stmt(thd);
3234         di->group_count=0;
3235         mysql_audit_release(thd);
3236         mysql_mutex_lock(&di->mutex);
3237       }
3238       if (di->tables_in_use)
3239         mysql_cond_broadcast(&di->cond_client); // If waiting clients
3240     }
3241 
3242   err:
3243     DBUG_LEAVE;
3244   }
3245 
3246   {
3247     DBUG_ENTER("handle_delayed_insert-cleanup");
3248     di->table=0;
3249     mysql_mutex_unlock(&di->mutex);
3250 
3251     /*
3252       Protect against mdl_locks trying to access open tables
3253       We use KILL_CONNECTION_HARD here to ensure that
3254       THD::notify_shared_lock() dosn't try to access open tables after
3255       this.
3256     */
3257     mysql_mutex_lock(&thd->LOCK_thd_data);
3258     thd->mdl_context.set_needs_thr_lock_abort(0);
3259     mysql_mutex_unlock(&thd->LOCK_thd_data);
3260     thd->set_killed(KILL_CONNECTION_HARD);	        // If error
3261 
3262     close_thread_tables(thd);			// Free the table
3263     thd->release_transactional_locks();
3264     mysql_cond_broadcast(&di->cond_client);       // Safety
3265 
3266     mysql_mutex_lock(&LOCK_delayed_create);    // Because of delayed_get_table
3267     mysql_mutex_lock(&LOCK_delayed_insert);
3268     /*
3269       di should be unlinked from the thread handler list and have no active
3270       clients
3271     */
3272     delete di;
3273     mysql_mutex_unlock(&LOCK_delayed_insert);
3274     mysql_mutex_unlock(&LOCK_delayed_create);
3275 
3276     DBUG_LEAVE;
3277   }
3278   my_thread_end();
3279   pthread_exit(0);
3280 
3281   return 0;
3282 }
3283 
3284 
3285 /* Remove all pointers to data for blob fields so that original table doesn't try to free them */
3286 
unlink_blobs(TABLE * table)3287 static void unlink_blobs(TABLE *table)
3288 {
3289   for (Field **ptr=table->field ; *ptr ; ptr++)
3290   {
3291     if ((*ptr)->flags & BLOB_FLAG)
3292       ((Field_blob *) (*ptr))->clear_temporary();
3293   }
3294 }
3295 
3296 /* Free blobs stored in current row */
3297 
free_delayed_insert_blobs(TABLE * table)3298 static void free_delayed_insert_blobs(TABLE *table)
3299 {
3300   for (Field **ptr=table->field ; *ptr ; ptr++)
3301   {
3302     if ((*ptr)->flags & BLOB_FLAG)
3303       ((Field_blob *) *ptr)->free();
3304   }
3305 }
3306 
3307 
3308 /* set value field for blobs to point to data in record */
3309 
set_delayed_insert_blobs(TABLE * table)3310 static void set_delayed_insert_blobs(TABLE *table)
3311 {
3312   for (Field **ptr=table->field ; *ptr ; ptr++)
3313   {
3314     if ((*ptr)->flags & BLOB_FLAG)
3315     {
3316       Field_blob *blob= ((Field_blob *) *ptr);
3317       uchar *data= blob->get_ptr();
3318       if (data)
3319         blob->set_value(data);     // Set value.ptr() to point to data
3320     }
3321   }
3322 }
3323 
3324 
handle_inserts(void)3325 bool Delayed_insert::handle_inserts(void)
3326 {
3327   int error;
3328   ulong max_rows;
3329   bool has_trans = TRUE;
3330   bool using_ignore= 0, using_opt_replace= 0,
3331        using_bin_log= mysql_bin_log.is_open();
3332   delayed_row *row;
3333   DBUG_ENTER("handle_inserts");
3334 
3335   /* Allow client to insert new rows */
3336   mysql_mutex_unlock(&mutex);
3337 
3338   table->next_number_field=table->found_next_number_field;
3339   table->use_all_columns();
3340 
3341   THD_STAGE_INFO(&thd, stage_upgrading_lock);
3342   if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
3343                                    thd.variables.lock_wait_timeout))
3344   {
3345     /*
3346       This can happen if thread is killed either by a shutdown
3347       or if another thread is removing the current table definition
3348       from the table cache.
3349     */
3350     my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATALERROR | ME_NOREFRESH),
3351              table->s->table_name.str);
3352     goto err;
3353   }
3354 
3355   THD_STAGE_INFO(&thd, stage_insert);
3356   max_rows= delayed_insert_limit;
3357   if (thd.killed || table->s->tdc->flushed)
3358   {
3359     thd.set_killed(KILL_SYSTEM_THREAD);
3360     max_rows= ULONG_MAX;                     // Do as much as possible
3361   }
3362 
3363   if (table->file->ha_rnd_init_with_error(0))
3364     goto err;
3365 
3366   /*
3367     We can't use row caching when using the binary log because if
3368     we get a crash, then binary log will contain rows that are not yet
3369     written to disk, which will cause problems in replication.
3370   */
3371   if (!using_bin_log)
3372     table->file->extra(HA_EXTRA_WRITE_CACHE);
3373   mysql_mutex_lock(&mutex);
3374 
3375   while ((row=rows.get()))
3376   {
3377     int tmp_error;
3378     stacked_inserts--;
3379     mysql_mutex_unlock(&mutex);
3380     memcpy(table->record[0],row->record,table->s->reclength);
3381     if (table->s->blob_fields)
3382       set_delayed_insert_blobs(table);
3383 
3384     thd.start_time=row->start_time;
3385     thd.start_time_sec_part=row->start_time_sec_part;
3386     thd.query_start_sec_part_used=row->query_start_sec_part_used;
3387     /*
3388       To get the exact auto_inc interval to store in the binlog we must not
3389       use values from the previous interval (of the previous rows).
3390     */
3391     bool log_query= (row->log_query && row->query.str != NULL);
3392     DBUG_PRINT("delayed", ("query: '%s'  length: %lu", row->query.str ?
3393                            row->query.str : "[NULL]",
3394                            (ulong) row->query.length));
3395     if (log_query)
3396     {
3397       /*
3398         Guaranteed that the INSERT DELAYED STMT will not be here
3399         in SBR when mysql binlog is enabled.
3400       */
3401       DBUG_ASSERT(!(mysql_bin_log.is_open() &&
3402                   !thd.is_current_stmt_binlog_format_row()));
3403 
3404       /*
3405         This is the first value of an INSERT statement.
3406         It is the right place to clear a forced insert_id.
3407         This is usually done after the last value of an INSERT statement,
3408         but we won't know this in the insert delayed thread. But before
3409         the first value is sufficiently equivalent to after the last
3410         value of the previous statement.
3411       */
3412       table->file->ha_release_auto_increment();
3413       thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
3414     }
3415     thd.first_successful_insert_id_in_prev_stmt=
3416       row->first_successful_insert_id_in_prev_stmt;
3417     thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
3418       row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
3419     table->auto_increment_field_not_null= row->auto_increment_field_not_null;
3420 
3421     /* Copy the session variables. */
3422     thd.variables.auto_increment_increment= row->auto_increment_increment;
3423     thd.variables.auto_increment_offset=    row->auto_increment_offset;
3424     thd.variables.sql_mode=                 row->sql_mode;
3425 
3426     /* Copy a forced insert_id, if any. */
3427     if (row->forced_insert_id)
3428     {
3429       DBUG_PRINT("delayed", ("received auto_inc: %lu",
3430                              (ulong) row->forced_insert_id));
3431       thd.force_one_auto_inc_interval(row->forced_insert_id);
3432     }
3433 
3434     info.ignore= row->ignore;
3435     info.handle_duplicates= row->dup;
3436     if (info.ignore ||
3437 	info.handle_duplicates != DUP_ERROR)
3438     {
3439       table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3440       using_ignore=1;
3441     }
3442     if (info.handle_duplicates == DUP_REPLACE &&
3443         (!table->triggers ||
3444          !table->triggers->has_delete_triggers()))
3445     {
3446       table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3447       using_opt_replace= 1;
3448     }
3449     if (info.handle_duplicates == DUP_UPDATE)
3450       table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3451     thd.clear_error(); // reset error for binlog
3452 
3453     tmp_error= 0;
3454     if (unlikely(table->vfield))
3455     {
3456       /*
3457         Virtual fields where not calculated by caller as the temporary
3458         TABLE object used had vcol_set empty. Better to calculate them
3459         here to make the caller faster.
3460       */
3461       tmp_error= table->update_virtual_fields(table->file,
3462                                               VCOL_UPDATE_FOR_WRITE);
3463     }
3464 
3465     if (unlikely(tmp_error) || unlikely(write_record(&thd, table, &info)))
3466     {
3467       info.error_count++;				// Ignore errors
3468       thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3469       row->log_query = 0;
3470     }
3471 
3472     if (using_ignore)
3473     {
3474       using_ignore=0;
3475       table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3476     }
3477     if (using_opt_replace)
3478     {
3479       using_opt_replace= 0;
3480       table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3481     }
3482 
3483     if (table->s->blob_fields)
3484       free_delayed_insert_blobs(table);
3485     thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
3486     thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
3487     mysql_mutex_lock(&mutex);
3488 
3489     /*
3490       Reset the table->auto_increment_field_not_null as it is valid for
3491       only one row.
3492     */
3493     table->auto_increment_field_not_null= FALSE;
3494 
3495     delete row;
3496     /*
3497       Let READ clients do something once in a while
3498       We should however not break in the middle of a multi-line insert
3499       if we have binary logging enabled as we don't want other commands
3500       on this table until all entries has been processed
3501     */
3502     if (group_count++ >= max_rows && (row= rows.head()) &&
3503 	(!(row->log_query & using_bin_log)))
3504     {
3505       group_count=0;
3506       if (stacked_inserts || tables_in_use)	// Let these wait a while
3507       {
3508 	if (tables_in_use)
3509           mysql_cond_broadcast(&cond_client);   // If waiting clients
3510 	THD_STAGE_INFO(&thd, stage_reschedule);
3511         mysql_mutex_unlock(&mutex);
3512 	if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3513 	{
3514 	  /* This should never happen */
3515 	  table->file->print_error(error,MYF(0));
3516 	  sql_print_error("%s", thd.get_stmt_da()->message());
3517           DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
3518 	  goto err;
3519 	}
3520 	query_cache_invalidate3(&thd, table, 1);
3521 	if (thr_reschedule_write_lock(*thd.lock->locks,
3522                                 thd.variables.lock_wait_timeout))
3523 	{
3524           /* This is not known to happen. */
3525           my_error(ER_DELAYED_CANT_CHANGE_LOCK,
3526                    MYF(ME_FATALERROR | ME_NOREFRESH),
3527                    table->s->table_name.str);
3528           goto err;
3529 	}
3530 	if (!using_bin_log)
3531 	  table->file->extra(HA_EXTRA_WRITE_CACHE);
3532         mysql_mutex_lock(&mutex);
3533 	THD_STAGE_INFO(&thd, stage_insert);
3534       }
3535       if (tables_in_use)
3536         mysql_cond_broadcast(&cond_client);     // If waiting clients
3537     }
3538   }
3539 
3540   table->file->ha_rnd_end();
3541 
3542   if (WSREP((&thd)))
3543     thd_proc_info(&thd, "Insert done");
3544   else
3545     thd_proc_info(&thd, 0);
3546   mysql_mutex_unlock(&mutex);
3547 
3548   /*
3549     We need to flush the pending event when using row-based
3550     replication since the flushing normally done in binlog_query() is
3551     not done last in the statement: for delayed inserts, the insert
3552     statement is logged *before* all rows are inserted.
3553 
3554     We can flush the pending event without checking the thd->lock
3555     since the delayed insert *thread* is not inside a stored function
3556     or trigger.
3557 
3558     TODO: Move the logging to last in the sequence of rows.
3559   */
3560   has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
3561               table->file->has_transactions();
3562   if (thd.is_current_stmt_binlog_format_row() &&
3563       thd.binlog_flush_pending_rows_event(TRUE, has_trans))
3564     goto err;
3565 
3566   if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3567   {						// This shouldn't happen
3568     table->file->print_error(error,MYF(0));
3569     sql_print_error("%s", thd.get_stmt_da()->message());
3570     DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
3571     goto err;
3572   }
3573   query_cache_invalidate3(&thd, table, 1);
3574   mysql_mutex_lock(&mutex);
3575   DBUG_RETURN(0);
3576 
3577  err:
3578 #ifndef DBUG_OFF
3579   max_rows= 0;                                  // For DBUG output
3580 #endif
3581   /* Remove all not used rows */
3582   mysql_mutex_lock(&mutex);
3583   while ((row=rows.get()))
3584   {
3585     if (table->s->blob_fields)
3586     {
3587       memcpy(table->record[0],row->record,table->s->reclength);
3588       set_delayed_insert_blobs(table);
3589       free_delayed_insert_blobs(table);
3590     }
3591     delete row;
3592     thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3593     stacked_inserts--;
3594 #ifndef DBUG_OFF
3595     max_rows++;
3596 #endif
3597   }
3598   DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
3599   thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
3600   DBUG_RETURN(1);
3601 }
3602 #endif /* EMBEDDED_LIBRARY */
3603 
3604 /***************************************************************************
3605   Store records in INSERT ... SELECT *
3606 ***************************************************************************/
3607 
3608 
3609 /*
3610   make insert specific preparation and checks after opening tables
3611 
3612   SYNOPSIS
3613     mysql_insert_select_prepare()
3614     thd         thread handler
3615 
3616   RETURN
3617     FALSE OK
3618     TRUE  Error
3619 */
3620 
mysql_insert_select_prepare(THD * thd)3621 bool mysql_insert_select_prepare(THD *thd)
3622 {
3623   LEX *lex= thd->lex;
3624   SELECT_LEX *select_lex= &lex->select_lex;
3625   DBUG_ENTER("mysql_insert_select_prepare");
3626 
3627 
3628   /*
3629     SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
3630     clause if table is VIEW
3631   */
3632 
3633   if (mysql_prepare_insert(thd, lex->query_tables,
3634                            lex->query_tables->table, lex->field_list, 0,
3635                            lex->update_list, lex->value_list, lex->duplicates,
3636                            &select_lex->where, TRUE))
3637     DBUG_RETURN(TRUE);
3638 
3639   DBUG_ASSERT(select_lex->leaf_tables.elements != 0);
3640   List_iterator<TABLE_LIST> ti(select_lex->leaf_tables);
3641   TABLE_LIST *table;
3642   uint insert_tables;
3643 
3644   if (select_lex->first_cond_optimization)
3645   {
3646     /* Back up leaf_tables list. */
3647     Query_arena *arena= thd->stmt_arena, backup;
3648     arena= thd->activate_stmt_arena_if_needed(&backup);  // For easier test
3649 
3650     insert_tables= select_lex->insert_tables;
3651     while ((table= ti++) && insert_tables--)
3652     {
3653       select_lex->leaf_tables_exec.push_back(table);
3654       table->tablenr_exec= table->table->tablenr;
3655       table->map_exec= table->table->map;
3656       table->maybe_null_exec= table->table->maybe_null;
3657     }
3658     if (arena)
3659       thd->restore_active_arena(arena, &backup);
3660   }
3661   ti.rewind();
3662   /*
3663     exclude first table from leaf tables list, because it belong to
3664     INSERT
3665   */
3666   /* skip all leaf tables belonged to view where we are insert */
3667   insert_tables= select_lex->insert_tables;
3668   while ((table= ti++) && insert_tables--)
3669     ti.remove();
3670 
3671   DBUG_RETURN(FALSE);
3672 }
3673 
3674 
select_insert(THD * thd_arg,TABLE_LIST * table_list_par,TABLE * table_par,List<Item> * fields_par,List<Item> * update_fields,List<Item> * update_values,enum_duplicates duplic,bool ignore_check_option_errors)3675 select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par,
3676                              TABLE *table_par,
3677                              List<Item> *fields_par,
3678                              List<Item> *update_fields,
3679                              List<Item> *update_values,
3680                              enum_duplicates duplic,
3681                              bool ignore_check_option_errors):
3682   select_result_interceptor(thd_arg),
3683   table_list(table_list_par), table(table_par), fields(fields_par),
3684   autoinc_value_of_last_inserted_row(0),
3685   insert_into_view(table_list_par && table_list_par->view != 0)
3686 {
3687   bzero((char*) &info,sizeof(info));
3688   info.handle_duplicates= duplic;
3689   info.ignore= ignore_check_option_errors;
3690   info.update_fields= update_fields;
3691   info.update_values= update_values;
3692   info.view= (table_list_par->view ? table_list_par : 0);
3693   info.table_list= table_list_par;
3694 }
3695 
3696 
3697 int
prepare(List<Item> & values,SELECT_LEX_UNIT * u)3698 select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3699 {
3700   LEX *lex= thd->lex;
3701   int res;
3702   table_map map= 0;
3703   SELECT_LEX *lex_current_select_save= lex->current_select;
3704   DBUG_ENTER("select_insert::prepare");
3705 
3706   unit= u;
3707 
3708   /*
3709     Since table in which we are going to insert is added to the first
3710     select, LEX::current_select should point to the first select while
3711     we are fixing fields from insert list.
3712   */
3713   lex->current_select= &lex->select_lex;
3714 
3715   res= (setup_fields(thd, Ref_ptr_array(),
3716                      values, MARK_COLUMNS_READ, 0, NULL, 0) ||
3717         check_insert_fields(thd, table_list, *fields, values,
3718                             !insert_into_view, 1, &map));
3719 
3720   if (!res && fields->elements)
3721   {
3722     bool saved_abort_on_warning= thd->abort_on_warning;
3723     thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
3724     res= check_that_all_fields_are_given_values(thd, table_list->table, table_list);
3725     thd->abort_on_warning= saved_abort_on_warning;
3726   }
3727 
3728   if (info.handle_duplicates == DUP_UPDATE && !res)
3729   {
3730     Name_resolution_context *context= &lex->select_lex.context;
3731     Name_resolution_context_state ctx_state;
3732 
3733     /* Save the state of the current name resolution context. */
3734     ctx_state.save_state(context, table_list);
3735 
3736     /* Perform name resolution only in the first table - 'table_list'. */
3737     table_list->next_local= 0;
3738     context->resolve_in_table_list_only(table_list);
3739 
3740     lex->select_lex.no_wrap_view_item= TRUE;
3741     res= res ||
3742       check_update_fields(thd, context->table_list,
3743                           *info.update_fields, *info.update_values,
3744                           /*
3745                             In INSERT SELECT ON DUPLICATE KEY UPDATE col=x
3746                             'x' can legally refer to a non-inserted table.
3747                             'x' is not even resolved yet.
3748                            */
3749                           true,
3750                           &map);
3751     lex->select_lex.no_wrap_view_item= FALSE;
3752     /*
3753       When we are not using GROUP BY and there are no ungrouped aggregate functions
3754       we can refer to other tables in the ON DUPLICATE KEY part.
3755       We use next_name_resolution_table descructively, so check it first (views?)
3756     */
3757     DBUG_ASSERT (!table_list->next_name_resolution_table);
3758     if (lex->select_lex.group_list.elements == 0 &&
3759         !lex->select_lex.with_sum_func)
3760       /*
3761         We must make a single context out of the two separate name resolution contexts :
3762         the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
3763         To do that we must concatenate the two lists
3764       */
3765       table_list->next_name_resolution_table=
3766         ctx_state.get_first_name_resolution_table();
3767 
3768     res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values,
3769                              MARK_COLUMNS_READ, 0, NULL, 0);
3770     if (!res)
3771     {
3772       /*
3773         Traverse the update values list and substitute fields from the
3774         select for references (Item_ref objects) to them. This is done in
3775         order to get correct values from those fields when the select
3776         employs a temporary table.
3777       */
3778       List_iterator<Item> li(*info.update_values);
3779       Item *item;
3780 
3781       while ((item= li++))
3782       {
3783         item->transform(thd, &Item::update_value_transformer,
3784                         (uchar*)lex->current_select);
3785       }
3786     }
3787 
3788     /* Restore the current context. */
3789     ctx_state.restore_state(context, table_list);
3790   }
3791 
3792   lex->current_select= lex_current_select_save;
3793   if (res)
3794     DBUG_RETURN(1);
3795   /*
3796     if it is INSERT into join view then check_insert_fields already found
3797     real table for insert
3798   */
3799   table= table_list->table;
3800 
3801   /*
3802     Is table which we are changing used somewhere in other parts of
3803     query
3804   */
3805   if (unique_table(thd, table_list, table_list->next_global, 0))
3806   {
3807     /* Using same table for INSERT and SELECT */
3808     lex->current_select->options|= OPTION_BUFFER_RESULT;
3809     lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
3810   }
3811   else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
3812            thd->locked_tables_mode <= LTM_LOCK_TABLES)
3813   {
3814     /*
3815       We must not yet prepare the result table if it is the same as one of the
3816       source tables (INSERT SELECT). The preparation may disable
3817       indexes on the result table, which may be used during the select, if it
3818       is the same table (Bug #6034). Do the preparation after the select phase
3819       in select_insert::prepare2().
3820       We won't start bulk inserts at all if this statement uses functions or
3821       should invoke triggers since they may access to the same table too.
3822     */
3823     table->file->ha_start_bulk_insert((ha_rows) 0);
3824   }
3825   restore_record(table,s->default_values);		// Get empty record
3826   table->reset_default_fields();
3827   table->next_number_field=table->found_next_number_field;
3828 
3829 #ifdef HAVE_REPLICATION
3830   if (thd->rgi_slave &&
3831       (info.handle_duplicates == DUP_UPDATE) &&
3832       (table->next_number_field != NULL) &&
3833       rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
3834     DBUG_RETURN(1);
3835 #endif
3836 
3837   thd->cuted_fields=0;
3838   if (info.ignore || info.handle_duplicates != DUP_ERROR)
3839   {
3840     table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3841     if (table->file->ha_table_flags() & HA_DUPLICATE_POS &&
3842         table->file->ha_rnd_init_with_error(0))
3843       DBUG_RETURN(1);
3844   }
3845   if (info.handle_duplicates == DUP_REPLACE &&
3846       (!table->triggers || !table->triggers->has_delete_triggers()))
3847     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3848   if (info.handle_duplicates == DUP_UPDATE)
3849     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3850   thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
3851   res= (table_list->prepare_where(thd, 0, TRUE) ||
3852         table_list->prepare_check_option(thd));
3853 
3854   if (!res)
3855   {
3856      table->prepare_triggers_for_insert_stmt_or_event();
3857      table->mark_columns_needed_for_insert();
3858   }
3859 
3860   DBUG_RETURN(res);
3861 }
3862 
3863 
3864 /*
3865   Finish the preparation of the result table.
3866 
3867   SYNOPSIS
3868     select_insert::prepare2()
3869     void
3870 
3871   DESCRIPTION
3872     If the result table is the same as one of the source tables (INSERT SELECT),
3873     the result table is not finally prepared at the join prepair phase.
3874     Do the final preparation now.
3875 
3876   RETURN
3877     0   OK
3878 */
3879 
prepare2(JOIN *)3880 int select_insert::prepare2(JOIN *)
3881 {
3882   DBUG_ENTER("select_insert::prepare2");
3883   if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
3884       thd->locked_tables_mode <= LTM_LOCK_TABLES &&
3885       !thd->lex->describe)
3886     table->file->ha_start_bulk_insert((ha_rows) 0);
3887   if (table->validate_default_values_of_unset_fields(thd))
3888     DBUG_RETURN(1);
3889   DBUG_RETURN(0);
3890 }
3891 
3892 
cleanup()3893 void select_insert::cleanup()
3894 {
3895   /* select_insert/select_create are never re-used in prepared statement */
3896   DBUG_ASSERT(0);
3897 }
3898 
~select_insert()3899 select_insert::~select_insert()
3900 {
3901   DBUG_ENTER("~select_insert");
3902   if (table && table->is_created())
3903   {
3904     table->next_number_field=0;
3905     table->auto_increment_field_not_null= FALSE;
3906     table->file->ha_reset();
3907   }
3908   thd->count_cuted_fields= CHECK_FIELD_IGNORE;
3909   thd->abort_on_warning= 0;
3910   DBUG_VOID_RETURN;
3911 }
3912 
3913 
send_data(List<Item> & values)3914 int select_insert::send_data(List<Item> &values)
3915 {
3916   DBUG_ENTER("select_insert::send_data");
3917   bool error=0;
3918 
3919   if (unit->offset_limit_cnt)
3920   {						// using limit offset,count
3921     unit->offset_limit_cnt--;
3922     DBUG_RETURN(0);
3923   }
3924   if (unlikely(thd->killed == ABORT_QUERY))
3925     DBUG_RETURN(0);
3926 
3927   thd->count_cuted_fields= CHECK_FIELD_WARN;	// Calculate cuted fields
3928   store_values(values);
3929   if (table->default_field &&
3930       unlikely(table->update_default_fields(info.ignore)))
3931     DBUG_RETURN(1);
3932   thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
3933   if (unlikely(thd->is_error()))
3934   {
3935     table->auto_increment_field_not_null= FALSE;
3936     DBUG_RETURN(1);
3937   }
3938 
3939   table->vers_write= table->versioned();
3940   if (table_list)                               // Not CREATE ... SELECT
3941   {
3942     switch (table_list->view_check_option(thd, info.ignore)) {
3943     case VIEW_CHECK_SKIP:
3944       DBUG_RETURN(0);
3945     case VIEW_CHECK_ERROR:
3946       DBUG_RETURN(1);
3947     }
3948   }
3949 
3950   error= write_record(thd, table, &info);
3951   table->vers_write= table->versioned();
3952   table->auto_increment_field_not_null= FALSE;
3953 
3954   if (likely(!error))
3955   {
3956     if (table->triggers || info.handle_duplicates == DUP_UPDATE)
3957     {
3958       /*
3959         Restore fields of the record since it is possible that they were
3960         changed by ON DUPLICATE KEY UPDATE clause.
3961 
3962         If triggers exist then whey can modify some fields which were not
3963         originally touched by INSERT ... SELECT, so we have to restore
3964         their original values for the next row.
3965       */
3966       restore_record(table, s->default_values);
3967     }
3968     if (table->next_number_field)
3969     {
3970       /*
3971         If no value has been autogenerated so far, we need to remember the
3972         value we just saw, we may need to send it to client in the end.
3973       */
3974       if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
3975         autoinc_value_of_last_inserted_row=
3976           table->next_number_field->val_int();
3977       /*
3978         Clear auto-increment field for the next record, if triggers are used
3979         we will clear it twice, but this should be cheap.
3980       */
3981       table->next_number_field->reset();
3982     }
3983   }
3984   DBUG_RETURN(error);
3985 }
3986 
3987 
store_values(List<Item> & values)3988 void select_insert::store_values(List<Item> &values)
3989 {
3990   DBUG_ENTER("select_insert::store_values");
3991 
3992   if (fields->elements)
3993     fill_record_n_invoke_before_triggers(thd, table, *fields, values, 1,
3994                                          TRG_EVENT_INSERT);
3995   else
3996     fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
3997                                          values, 1, TRG_EVENT_INSERT);
3998 
3999   DBUG_VOID_RETURN;
4000 }
4001 
prepare_eof()4002 bool select_insert::prepare_eof()
4003 {
4004   int error;
4005   bool const trans_table= table->file->has_transactions();
4006   bool changed;
4007   bool binary_logged= 0;
4008   killed_state killed_status= thd->killed;
4009 
4010   DBUG_ENTER("select_insert::prepare_eof");
4011   DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
4012                        trans_table, table->file->table_type()));
4013 
4014   error= (IF_WSREP((thd->wsrep_conflict_state == MUST_ABORT ||
4015                     thd->wsrep_conflict_state == CERT_FAILURE) ? -1 :, )
4016           (thd->locked_tables_mode <= LTM_LOCK_TABLES ?
4017            table->file->ha_end_bulk_insert() : 0));
4018 
4019   if (likely(!error) && unlikely(thd->is_error()))
4020     error= thd->get_stmt_da()->sql_errno();
4021 
4022   if (info.ignore || info.handle_duplicates != DUP_ERROR)
4023       if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
4024         table->file->ha_rnd_end();
4025   table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4026   table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4027 
4028   if (likely((changed= (info.copied || info.deleted || info.updated))))
4029   {
4030     /*
4031       We must invalidate the table in the query cache before binlog writing
4032       and ha_autocommit_or_rollback.
4033     */
4034     query_cache_invalidate3(thd, table, 1);
4035   }
4036 
4037   if (thd->transaction.stmt.modified_non_trans_table)
4038     thd->transaction.all.modified_non_trans_table= TRUE;
4039   thd->transaction.all.m_unsafe_rollback_flags|=
4040     (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
4041 
4042   DBUG_ASSERT(trans_table || !changed ||
4043               thd->transaction.stmt.modified_non_trans_table);
4044 
4045   /*
4046     Write to binlog before commiting transaction.  No statement will
4047     be written by the binlog_query() below in RBR mode.  All the
4048     events are in the transaction cache and will be written when
4049     ha_autocommit_or_rollback() is issued below.
4050   */
4051   if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
4052       (likely(!error) || thd->transaction.stmt.modified_non_trans_table))
4053   {
4054     int errcode= 0;
4055     int res;
4056     if (likely(!error))
4057       thd->clear_error();
4058     else
4059       errcode= query_error_code(thd, killed_status == NOT_KILLED);
4060     res= thd->binlog_query(THD::ROW_QUERY_TYPE,
4061                            thd->query(), thd->query_length(),
4062                            trans_table, FALSE, FALSE, errcode);
4063     if (res > 0)
4064     {
4065       table->file->ha_release_auto_increment();
4066       DBUG_RETURN(true);
4067     }
4068     binary_logged= res == 0 || !table->s->tmp_table;
4069   }
4070   table->s->table_creation_was_logged|= binary_logged;
4071   table->file->ha_release_auto_increment();
4072 
4073   if (unlikely(error))
4074   {
4075     table->file->print_error(error,MYF(0));
4076     DBUG_RETURN(true);
4077   }
4078 
4079   DBUG_RETURN(false);
4080 }
4081 
send_ok_packet()4082 bool select_insert::send_ok_packet() {
4083   char  message[160];                           /* status message */
4084   ulonglong row_count;                          /* rows affected */
4085   ulonglong id;                                 /* last insert-id */
4086 
4087   DBUG_ENTER("select_insert::send_ok_packet");
4088 
4089   if (info.ignore)
4090     my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4091                 (ulong) info.records, (ulong) (info.records - info.copied),
4092                 (long) thd->get_stmt_da()->current_statement_warn_count());
4093   else
4094     my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4095                 (ulong) info.records, (ulong) (info.deleted + info.updated),
4096                 (long) thd->get_stmt_da()->current_statement_warn_count());
4097 
4098   row_count= info.copied + info.deleted +
4099     ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
4100      info.touched : info.updated);
4101 
4102   id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
4103     thd->first_successful_insert_id_in_cur_stmt :
4104     (thd->arg_of_last_insert_id_function ?
4105      thd->first_successful_insert_id_in_prev_stmt :
4106      (info.copied ? autoinc_value_of_last_inserted_row : 0));
4107 
4108   ::my_ok(thd, row_count, id, message);
4109 
4110   DBUG_RETURN(false);
4111 }
4112 
send_eof()4113 bool select_insert::send_eof()
4114 {
4115   bool res;
4116   DBUG_ENTER("select_insert::send_eof");
4117   res= (prepare_eof() || (!suppress_my_ok && send_ok_packet()));
4118   DBUG_RETURN(res);
4119 }
4120 
abort_result_set()4121 void select_insert::abort_result_set()
4122 {
4123   bool binary_logged= 0;
4124   DBUG_ENTER("select_insert::abort_result_set");
4125   /*
4126     If the creation of the table failed (due to a syntax error, for
4127     example), no table will have been opened and therefore 'table'
4128     will be NULL. In that case, we still need to execute the rollback
4129     and the end of the function.
4130 
4131     If it fail due to inability to insert in multi-table view for example,
4132     table will be assigned with view table structure, but that table will
4133     not be opened really (it is dummy to check fields types & Co).
4134    */
4135   if (table && table->file->get_table())
4136   {
4137     bool changed, transactional_table;
4138     /*
4139       If we are not in prelocked mode, we end the bulk insert started
4140       before.
4141     */
4142     if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4143       table->file->ha_end_bulk_insert();
4144 
4145     if (table->file->inited)
4146       table->file->ha_rnd_end();
4147     table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4148     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4149 
4150     /*
4151       If at least one row has been inserted/modified and will stay in
4152       the table (the table doesn't have transactions) we must write to
4153       the binlog (and the error code will make the slave stop).
4154 
4155       For many errors (example: we got a duplicate key error while
4156       inserting into a MyISAM table), no row will be added to the table,
4157       so passing the error to the slave will not help since there will
4158       be an error code mismatch (the inserts will succeed on the slave
4159       with no error).
4160 
4161       If table creation failed, the number of rows modified will also be
4162       zero, so no check for that is made.
4163     */
4164     changed= (info.copied || info.deleted || info.updated);
4165     transactional_table= table->file->has_transactions();
4166     if (thd->transaction.stmt.modified_non_trans_table ||
4167         thd->log_current_statement)
4168     {
4169         if (!can_rollback_data())
4170           thd->transaction.all.modified_non_trans_table= TRUE;
4171 
4172         if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4173         {
4174           int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4175           int res;
4176           /* error of writing binary log is ignored */
4177           res= thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
4178                                  thd->query_length(),
4179                                  transactional_table, FALSE, FALSE, errcode);
4180           binary_logged= res == 0 || !table->s->tmp_table;
4181         }
4182 	if (changed)
4183 	  query_cache_invalidate3(thd, table, 1);
4184     }
4185     DBUG_ASSERT(transactional_table || !changed ||
4186 		thd->transaction.stmt.modified_non_trans_table);
4187 
4188     table->s->table_creation_was_logged|= binary_logged;
4189     table->file->ha_release_auto_increment();
4190   }
4191 
4192   DBUG_VOID_RETURN;
4193 }
4194 
4195 
4196 /***************************************************************************
4197   CREATE TABLE (SELECT) ...
4198 ***************************************************************************/
4199 
create_field_for_create_select(TABLE * table)4200 Field *Item::create_field_for_create_select(TABLE *table)
4201 {
4202   Field *def_field, *tmp_field;
4203   return ::create_tmp_field(table->in_use, table, this, type(),
4204                             (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0);
4205 }
4206 
4207 
4208 /**
4209   Create table from lists of fields and items (or just return TABLE
4210   object for pre-opened existing table).
4211 
4212   @param thd           [in]     Thread object
4213   @param create_info   [in]     Create information (like MAX_ROWS, ENGINE or
4214                                 temporary table flag)
4215   @param create_table  [in]     Pointer to TABLE_LIST object providing database
4216                                 and name for table to be created or to be open
4217   @param alter_info    [in/out] Initial list of columns and indexes for the
4218                                 table to be created
4219   @param items         [in]     List of items which should be used to produce
4220                                 rest of fields for the table (corresponding
4221                                 fields will be added to the end of
4222                                 alter_info->create_list)
4223   @param lock          [out]    Pointer to the MYSQL_LOCK object for table
4224                                 created will be returned in this parameter.
4225                                 Since this table is not included in THD::lock
4226                                 caller is responsible for explicitly unlocking
4227                                 this table.
4228   @param hooks         [in]     Hooks to be invoked before and after obtaining
4229                                 table lock on the table being created.
4230 
4231   @note
4232     This function assumes that either table exists and was pre-opened and
4233     locked at open_and_lock_tables() stage (and in this case we just emit
4234     error or warning and return pre-opened TABLE object) or an exclusive
4235     metadata lock was acquired on table so we can safely create, open and
4236     lock table in it (we don't acquire metadata lock if this create is
4237     for temporary table).
4238 
4239   @note
4240     Since this function contains some logic specific to CREATE TABLE ...
4241     SELECT it should be changed before it can be used in other contexts.
4242 
4243   @retval non-zero  Pointer to TABLE object for table created or opened
4244   @retval 0         Error
4245 */
4246 
create_table_from_items(THD * thd,List<Item> * items,MYSQL_LOCK ** lock,TABLEOP_HOOKS * hooks)4247 TABLE *select_create::create_table_from_items(THD *thd,
4248                                       List<Item> *items,
4249                                       MYSQL_LOCK **lock,
4250                                       TABLEOP_HOOKS *hooks)
4251 {
4252   TABLE tmp_table;		// Used during 'Create_field()'
4253   TABLE_SHARE share;
4254   TABLE *table= 0;
4255   uint select_field_count= items->elements;
4256   /* Add selected items to field list */
4257   List_iterator_fast<Item> it(*items);
4258   Item *item;
4259   bool save_table_creation_was_logged;
4260   DBUG_ENTER("select_create::create_table_from_items");
4261 
4262   tmp_table.s= &share;
4263   init_tmp_table_share(thd, &share, "", 0, "", "");
4264 
4265   tmp_table.s->db_create_options=0;
4266   tmp_table.null_row= 0;
4267   tmp_table.maybe_null= 0;
4268   tmp_table.in_use= thd;
4269 
4270   if (!opt_explicit_defaults_for_timestamp)
4271     promote_first_timestamp_column(&alter_info->create_list);
4272 
4273   if (create_info->vers_fix_system_fields(thd, alter_info, *create_table))
4274     DBUG_RETURN(NULL);
4275 
4276   while ((item=it++))
4277   {
4278     Field *tmp_field= item->create_field_for_create_select(&tmp_table);
4279 
4280     if (!tmp_field)
4281       DBUG_RETURN(NULL);
4282 
4283     Field *table_field;
4284 
4285     switch (item->type())
4286     {
4287     /*
4288       We have to take into account both the real table's fields and
4289       pseudo-fields used in trigger's body. These fields are used
4290       to copy defaults values later inside constructor of
4291       the class Create_field.
4292     */
4293     case Item::FIELD_ITEM:
4294     case Item::TRIGGER_FIELD_ITEM:
4295       table_field= ((Item_field *) item)->field;
4296       break;
4297     default:
4298       table_field= NULL;
4299     }
4300 
4301     Create_field *cr_field= new (thd->mem_root)
4302                                   Create_field(thd, tmp_field, table_field);
4303 
4304     if (!cr_field)
4305       DBUG_RETURN(NULL);
4306 
4307     if (item->maybe_null)
4308       cr_field->flags &= ~NOT_NULL_FLAG;
4309     alter_info->create_list.push_back(cr_field, thd->mem_root);
4310   }
4311 
4312   if (create_info->vers_check_system_fields(thd, alter_info,
4313                                             create_table->table_name,
4314                                             create_table->db,
4315                                             select_field_count))
4316     DBUG_RETURN(NULL);
4317 
4318   DEBUG_SYNC(thd,"create_table_select_before_create");
4319 
4320   /* Check if LOCK TABLES + CREATE OR REPLACE of existing normal table*/
4321   if (thd->locked_tables_mode && create_table->table &&
4322       !create_info->tmp_table())
4323   {
4324     /* Remember information about the locked table */
4325     create_info->pos_in_locked_tables=
4326       create_table->table->pos_in_locked_tables;
4327     create_info->mdl_ticket= create_table->table->mdl_ticket;
4328   }
4329 
4330   /*
4331     Create and lock table.
4332 
4333     Note that we either creating (or opening existing) temporary table or
4334     creating base table on which name we have exclusive lock. So code below
4335     should not cause deadlocks or races.
4336 
4337     We don't log the statement, it will be logged later.
4338 
4339     If this is a HEAP table, the automatic DELETE FROM which is written to the
4340     binlog when a HEAP table is opened for the first time since startup, must
4341     not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
4342     don't want to delete from it) 2) it would be written before the CREATE
4343     TABLE, which is a wrong order. So we keep binary logging disabled when we
4344     open_table().
4345   */
4346 
4347   if (!mysql_create_table_no_lock(thd, &create_table->db,
4348                                   &create_table->table_name,
4349                                   create_info, alter_info, NULL,
4350                                   select_field_count, create_table))
4351   {
4352     DEBUG_SYNC(thd,"create_table_select_before_open");
4353 
4354     /*
4355       If we had a temporary table or a table used with LOCK TABLES,
4356       it was closed by mysql_create()
4357     */
4358     create_table->table= 0;
4359 
4360     if (!create_info->tmp_table())
4361     {
4362       Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
4363       TABLE_LIST::enum_open_strategy save_open_strategy;
4364 
4365       /* Force the newly created table to be opened */
4366       save_open_strategy= create_table->open_strategy;
4367       create_table->open_strategy= TABLE_LIST::OPEN_NORMAL;
4368       /*
4369         Here we open the destination table, on which we already have
4370         an exclusive metadata lock.
4371       */
4372       if (open_table(thd, create_table, &ot_ctx))
4373       {
4374         quick_rm_table(thd, create_info->db_type, &create_table->db,
4375                        table_case_name(create_info, &create_table->table_name),
4376                        0);
4377       }
4378       /* Restore */
4379       create_table->open_strategy= save_open_strategy;
4380     }
4381     else
4382     {
4383       /*
4384         The pointer to the newly created temporary table has been stored in
4385         table->create_info.
4386       */
4387       create_table->table= create_info->table;
4388       if (!create_table->table)
4389       {
4390         /*
4391           This shouldn't happen as creation of temporary table should make
4392           it preparable for open. Anyway we can't drop temporary table if
4393           we are unable to find it.
4394         */
4395         DBUG_ASSERT(0);
4396       }
4397     }
4398   }
4399   else
4400     create_table->table= 0;                     // Create failed
4401 
4402   if (unlikely(!(table= create_table->table)))
4403   {
4404     if (likely(!thd->is_error()))             // CREATE ... IF NOT EXISTS
4405       my_ok(thd);                             //   succeed, but did nothing
4406     DBUG_RETURN(NULL);
4407   }
4408 
4409   DEBUG_SYNC(thd,"create_table_select_before_lock");
4410 
4411   table->reginfo.lock_type=TL_WRITE;
4412   hooks->prelock(&table, 1);                    // Call prelock hooks
4413 
4414   /*
4415     Ensure that decide_logging_format(), called by mysql_lock_tables(), works
4416     with temporary tables that will be logged later if needed.
4417   */
4418   save_table_creation_was_logged= table->s->table_creation_was_logged;
4419   table->s->table_creation_was_logged= 1;
4420 
4421   /*
4422     mysql_lock_tables() below should never fail with request to reopen table
4423     since it won't wait for the table lock (we have exclusive metadata lock on
4424     the table) and thus can't get aborted.
4425   */
4426   if (unlikely(!((*lock)= mysql_lock_tables(thd, &table, 1, 0)) ||
4427                hooks->postlock(&table, 1)))
4428   {
4429     /* purecov: begin tested */
4430     /*
4431       This can happen in innodb when you get a deadlock when using same table
4432       in insert and select or when you run out of memory.
4433       It can also happen if there was a conflict in
4434       THD::decide_logging_format()
4435     */
4436     if (!thd->is_error())
4437       my_error(ER_CANT_LOCK, MYF(0), my_errno);
4438     if (*lock)
4439     {
4440       mysql_unlock_tables(thd, *lock);
4441       *lock= 0;
4442     }
4443     drop_open_table(thd, table, &create_table->db, &create_table->table_name);
4444     DBUG_RETURN(0);
4445     /* purecov: end */
4446   }
4447   table->s->table_creation_was_logged= save_table_creation_was_logged;
4448   DBUG_RETURN(table);
4449 }
4450 
4451 
4452 int
prepare(List<Item> & _values,SELECT_LEX_UNIT * u)4453 select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
4454 {
4455   List<Item> values(_values, thd->mem_root);
4456   MYSQL_LOCK *extra_lock= NULL;
4457   DBUG_ENTER("select_create::prepare");
4458 
4459   TABLEOP_HOOKS *hook_ptr= NULL;
4460   /*
4461     For row-based replication, the CREATE-SELECT statement is written
4462     in two pieces: the first one contain the CREATE TABLE statement
4463     necessary to create the table and the second part contain the rows
4464     that should go into the table.
4465 
4466     For non-temporary tables, the start of the CREATE-SELECT
4467     implicitly commits the previous transaction, and all events
4468     forming the statement will be stored the transaction cache. At end
4469     of the statement, the entire statement is committed as a
4470     transaction, and all events are written to the binary log.
4471 
4472     On the master, the table is locked for the duration of the
4473     statement, but since the CREATE part is replicated as a simple
4474     statement, there is no way to lock the table for accesses on the
4475     slave.  Hence, we have to hold on to the CREATE part of the
4476     statement until the statement has finished.
4477    */
4478   class MY_HOOKS : public TABLEOP_HOOKS {
4479   public:
4480     MY_HOOKS(select_create *x, TABLE_LIST *create_table_arg,
4481              TABLE_LIST *select_tables_arg)
4482       : ptr(x),
4483         create_table(create_table_arg),
4484         select_tables(select_tables_arg)
4485       {
4486       }
4487 
4488   private:
4489     virtual int do_postlock(TABLE **tables, uint count)
4490     {
4491       int error;
4492       THD *thd= const_cast<THD*>(ptr->get_thd());
4493       TABLE_LIST *save_next_global= create_table->next_global;
4494 
4495       create_table->next_global= select_tables;
4496 
4497       error= thd->decide_logging_format(create_table);
4498 
4499       create_table->next_global= save_next_global;
4500 
4501       if (unlikely(error))
4502         return error;
4503 
4504       TABLE const *const table = *tables;
4505       if (thd->is_current_stmt_binlog_format_row()  &&
4506           !table->s->tmp_table)
4507       {
4508         int error;
4509         if (unlikely((error= ptr->binlog_show_create_table(tables, count))))
4510           return error;
4511       }
4512       return 0;
4513     }
4514     select_create *ptr;
4515     TABLE_LIST *create_table;
4516     TABLE_LIST *select_tables;
4517   };
4518 
4519   MY_HOOKS hooks(this, create_table, select_tables);
4520   hook_ptr= &hooks;
4521 
4522   unit= u;
4523 
4524   /*
4525     Start a statement transaction before the create if we are using
4526     row-based replication for the statement.  If we are creating a
4527     temporary table, we need to start a statement transaction.
4528   */
4529   if (!thd->lex->tmp_table() &&
4530       thd->is_current_stmt_binlog_format_row() &&
4531       mysql_bin_log.is_open())
4532   {
4533     thd->binlog_start_trans_and_stmt();
4534   }
4535 
4536   DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
4537 
4538   if (!(table= create_table_from_items(thd, &values, &extra_lock, hook_ptr)))
4539   {
4540     if (create_info->or_replace())
4541     {
4542       /* Original table was deleted. We have to log it */
4543       log_drop_table(thd, &create_table->db, &create_table->table_name,
4544                      thd->lex->tmp_table());
4545     }
4546 
4547     /* abort() deletes table */
4548     DBUG_RETURN(-1);
4549   }
4550 
4551   if (create_info->tmp_table())
4552   {
4553     /*
4554       When the temporary table was created & opened in create_table_impl(),
4555       the table's TABLE_SHARE (and thus TABLE) object was also linked to THD
4556       temporary tables lists. So, we must temporarily remove it from the
4557       list to keep them inaccessible from inner statements.
4558       e.g. CREATE TEMPORARY TABLE `t1` AS SELECT * FROM `t1`;
4559     */
4560     saved_tmp_table_share= thd->save_tmp_table_share(create_table->table);
4561   }
4562 
4563   if (extra_lock)
4564   {
4565     DBUG_ASSERT(m_plock == NULL);
4566 
4567     if (create_info->tmp_table())
4568       m_plock= &m_lock;
4569     else
4570       m_plock= &thd->extra_lock;
4571 
4572     *m_plock= extra_lock;
4573   }
4574 
4575   if (table->s->fields < values.elements)
4576   {
4577     my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
4578     DBUG_RETURN(-1);
4579   }
4580 
4581   /* First field to copy */
4582   field= table->field+table->s->fields;
4583 
4584   /* Mark all fields that are given values */
4585   for (uint n= values.elements; n; )
4586   {
4587     if ((*--field)->invisible >= INVISIBLE_SYSTEM)
4588       continue;
4589     n--;
4590     bitmap_set_bit(table->write_set, (*field)->field_index);
4591   }
4592 
4593   table->next_number_field=table->found_next_number_field;
4594 
4595   restore_record(table,s->default_values);      // Get empty record
4596   thd->cuted_fields=0;
4597   if (info.ignore || info.handle_duplicates != DUP_ERROR)
4598   {
4599     table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
4600     if (table->file->ha_table_flags() & HA_DUPLICATE_POS &&
4601         table->file->ha_rnd_init_with_error(0))
4602       DBUG_RETURN(1);
4603   }
4604   if (info.handle_duplicates == DUP_REPLACE &&
4605       (!table->triggers || !table->triggers->has_delete_triggers()))
4606     table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
4607   if (info.handle_duplicates == DUP_UPDATE)
4608     table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
4609   if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4610     table->file->ha_start_bulk_insert((ha_rows) 0);
4611   thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
4612   if (check_that_all_fields_are_given_values(thd, table, table_list))
4613     DBUG_RETURN(1);
4614   table->mark_columns_needed_for_insert();
4615   table->file->extra(HA_EXTRA_WRITE_CACHE);
4616   // Mark table as used
4617   table->query_id= thd->query_id;
4618   DBUG_RETURN(0);
4619 }
4620 
4621 int
binlog_show_create_table(TABLE ** tables,uint count)4622 select_create::binlog_show_create_table(TABLE **tables, uint count)
4623 {
4624   /*
4625     Note 1: In RBR mode, we generate a CREATE TABLE statement for the
4626     created table by calling show_create_table().  In the event of an error,
4627     nothing should be written to the binary log, even if the table is
4628     non-transactional; therefore we pretend that the generated CREATE TABLE
4629     statement is for a transactional table.  The event will then be put in the
4630     transaction cache, and any subsequent events (e.g., table-map events and
4631     binrow events) will also be put there.  We can then use
4632     ha_autocommit_or_rollback() to either throw away the entire kaboodle of
4633     events, or write them to the binary log.
4634 
4635     We write the CREATE TABLE statement here and not in prepare()
4636     since there potentially are sub-selects or accesses to information
4637     schema that will do a close_thread_tables(), destroying the
4638     statement transaction cache.
4639   */
4640   DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
4641   DBUG_ASSERT(tables && *tables && count > 0);
4642 
4643   StringBuffer<2048> query(system_charset_info);
4644   int result;
4645   TABLE_LIST tmp_table_list;
4646 
4647   tmp_table_list.reset();
4648   tmp_table_list.table = *tables;
4649 
4650   result= show_create_table(thd, &tmp_table_list, &query,
4651                             create_info, WITH_DB_NAME);
4652   DBUG_ASSERT(result == 0); /* show_create_table() always return 0 */
4653 
4654   if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4655   {
4656     int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4657     result= thd->binlog_query(THD::STMT_QUERY_TYPE,
4658                               query.ptr(), query.length(),
4659                               /* is_trans */ TRUE,
4660                               /* direct */ FALSE,
4661                               /* suppress_use */ FALSE,
4662                               errcode) > 0;
4663   }
4664 
4665   ha_fake_trx_id(thd);
4666 
4667   return result;
4668 }
4669 
store_values(List<Item> & values)4670 void select_create::store_values(List<Item> &values)
4671 {
4672   fill_record_n_invoke_before_triggers(thd, table, field, values, 1,
4673                                        TRG_EVENT_INSERT);
4674 }
4675 
4676 
send_eof()4677 bool select_create::send_eof()
4678 {
4679   DBUG_ENTER("select_create::send_eof");
4680 
4681   /*
4682     The routine that writes the statement in the binary log
4683     is in select_insert::prepare_eof(). For that reason, we
4684     mark the flag at this point.
4685   */
4686   if (table->s->tmp_table)
4687     thd->transaction.stmt.mark_created_temp_table();
4688 
4689   if (prepare_eof())
4690   {
4691     abort_result_set();
4692     DBUG_RETURN(true);
4693   }
4694 
4695   if (table->s->tmp_table)
4696   {
4697     /*
4698       Now is good time to add the new table to THD temporary tables list.
4699       But, before that we need to check if same table got created by the sub-
4700       statement.
4701     */
4702     if (thd->find_tmp_table_share(table->s->table_cache_key.str,
4703                                   table->s->table_cache_key.length))
4704     {
4705       my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table->alias.c_ptr());
4706       abort_result_set();
4707       DBUG_RETURN(true);
4708     }
4709     else
4710     {
4711       DBUG_ASSERT(saved_tmp_table_share);
4712       thd->restore_tmp_table_share(saved_tmp_table_share);
4713     }
4714   }
4715 
4716   /*
4717     Do an implicit commit at end of statement for non-temporary
4718     tables.  This can fail, but we should unlock the table
4719     nevertheless.
4720   */
4721   if (!table->s->tmp_table)
4722   {
4723 #ifdef WITH_WSREP
4724     if (WSREP_ON)
4725     {
4726       /*
4727          append table level exclusive key for CTAS
4728       */
4729       wsrep_key_arr_t key_arr= {0, 0};
4730       wsrep_prepare_keys_for_isolation(thd,
4731                                        create_table->db.str,
4732                                        create_table->table_name.str,
4733                                        table_list,
4734                                        &key_arr);
4735       int rcode = wsrep->append_key(
4736                                 wsrep,
4737                                 &thd->wsrep_ws_handle,
4738                                 key_arr.keys, //&wkey,
4739                                 key_arr.keys_len,
4740                                 WSREP_KEY_EXCLUSIVE,
4741                                 false);
4742       wsrep_keys_free(&key_arr);
4743       if (rcode) {
4744         DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
4745         WSREP_ERROR("Appending table key for CTAS failed: %s, %d",
4746                     (wsrep_thd_query(thd)) ?
4747                     wsrep_thd_query(thd) : "void", rcode);
4748         abort_result_set();
4749         DBUG_RETURN(true);
4750       }
4751       /* If commit fails, we should be able to reset the OK status. */
4752       thd->get_stmt_da()->set_overwrite_status(TRUE);
4753     }
4754 #endif /* WITH_WSREP */
4755     trans_commit_stmt(thd);
4756     if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
4757       trans_commit_implicit(thd);
4758 #ifdef WITH_WSREP
4759     if (WSREP_ON)
4760     {
4761       thd->get_stmt_da()->set_overwrite_status(FALSE);
4762       mysql_mutex_lock(&thd->LOCK_thd_data);
4763       if (thd->wsrep_conflict_state != NO_CONFLICT)
4764       {
4765         WSREP_DEBUG("select_create commit failed, thd: %lld  err: %d %s",
4766                     (longlong) thd->thread_id, thd->wsrep_conflict_state,
4767                     thd->query());
4768         mysql_mutex_unlock(&thd->LOCK_thd_data);
4769         abort_result_set();
4770         DBUG_RETURN(true);
4771       }
4772       mysql_mutex_unlock(&thd->LOCK_thd_data);
4773     }
4774 #endif /* WITH_WSREP */
4775   }
4776 
4777   /*
4778     exit_done must only be set after last potential call to
4779     abort_result_set().
4780   */
4781   exit_done= 1;                                 // Avoid double calls
4782 
4783   send_ok_packet();
4784 
4785   if (m_plock)
4786   {
4787     MYSQL_LOCK *lock= *m_plock;
4788     *m_plock= NULL;
4789     m_plock= NULL;
4790 
4791     if (create_info->pos_in_locked_tables)
4792     {
4793       /*
4794         If we are under lock tables, we have created a table that was
4795         originally locked. We should add back the lock to ensure that
4796         all tables in the thd->open_list are locked!
4797       */
4798       table->mdl_ticket= create_info->mdl_ticket;
4799 
4800       /* The following should never fail, except if out of memory */
4801       if (!thd->locked_tables_list.restore_lock(thd,
4802                                                 create_info->
4803                                                 pos_in_locked_tables,
4804                                                 table, lock))
4805         DBUG_RETURN(false);                     // ok
4806       /* Fail. Continue without locking the table */
4807     }
4808     mysql_unlock_tables(thd, lock);
4809   }
4810   DBUG_RETURN(false);
4811 }
4812 
4813 
abort_result_set()4814 void select_create::abort_result_set()
4815 {
4816   ulonglong save_option_bits;
4817   DBUG_ENTER("select_create::abort_result_set");
4818 
4819   /* Avoid double calls, could happen in case of out of memory on cleanup */
4820   if (exit_done)
4821     DBUG_VOID_RETURN;
4822   exit_done= 1;
4823 
4824   /*
4825     In select_insert::abort_result_set() we roll back the statement, including
4826     truncating the transaction cache of the binary log. To do this, we
4827     pretend that the statement is transactional, even though it might
4828     be the case that it was not.
4829 
4830     We roll back the statement prior to deleting the table and prior
4831     to releasing the lock on the table, since there might be potential
4832     for failure if the rollback is executed after the drop or after
4833     unlocking the table.
4834 
4835     We also roll back the statement regardless of whether the creation
4836     of the table succeeded or not, since we need to reset the binary
4837     log state.
4838 
4839     However if there was an original table that was deleted, as part of
4840     create or replace table, then we must log the statement.
4841   */
4842 
4843   save_option_bits= thd->variables.option_bits;
4844   thd->variables.option_bits&= ~OPTION_BIN_LOG;
4845   select_insert::abort_result_set();
4846   thd->transaction.stmt.modified_non_trans_table= FALSE;
4847   thd->variables.option_bits= save_option_bits;
4848 
4849   /* possible error of writing binary log is ignored deliberately */
4850   (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
4851 
4852   if (table)
4853   {
4854     bool tmp_table= table->s->tmp_table;
4855     bool table_creation_was_logged= (!tmp_table ||
4856                                      table->s->table_creation_was_logged);
4857     if (tmp_table)
4858     {
4859       DBUG_ASSERT(saved_tmp_table_share);
4860       thd->restore_tmp_table_share(saved_tmp_table_share);
4861     }
4862 
4863     if (table->file->inited &&
4864         (info.ignore || info.handle_duplicates != DUP_ERROR) &&
4865         (table->file->ha_table_flags() & HA_DUPLICATE_POS))
4866       table->file->ha_rnd_end();
4867     table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4868     table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4869     table->auto_increment_field_not_null= FALSE;
4870 
4871     if (m_plock)
4872     {
4873       mysql_unlock_tables(thd, *m_plock);
4874       *m_plock= NULL;
4875       m_plock= NULL;
4876     }
4877 
4878     drop_open_table(thd, table, &create_table->db, &create_table->table_name);
4879     table=0;                                    // Safety
4880     if (thd->log_current_statement && mysql_bin_log.is_open())
4881     {
4882       /* Remove logging of drop, create + insert rows */
4883       binlog_reset_cache(thd);
4884       /* Original table was deleted. We have to log it */
4885       if (table_creation_was_logged)
4886         log_drop_table(thd, &create_table->db, &create_table->table_name,
4887                        tmp_table);
4888     }
4889   }
4890 
4891   if (create_info->table_was_deleted)
4892   {
4893     /* Unlock locked table that was dropped by CREATE. */
4894     (void) trans_rollback_stmt(thd);
4895     thd->locked_tables_list.unlock_locked_table(thd, create_info->mdl_ticket);
4896   }
4897 
4898   DBUG_VOID_RETURN;
4899 }
4900