1 /*
2 Copyright (c) 2000, 2016, Oracle and/or its affiliates.
3 Copyright (c) 2010, 2021, MariaDB Corporation
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
17 */
18
19 /* Insert of records */
20
21 /*
22 INSERT DELAYED
23
24 Insert delayed is distinguished from a normal insert by lock_type ==
25 TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
26 "delayed" table (delayed_get_table()), but falls back to
27 open_and_lock_tables() on error and proceeds as normal insert then.
28
29 Opening a "delayed" table means to find a delayed insert thread that
30 has the table open already. If this fails, a new thread is created and
31 waited for to open and lock the table.
32
33 If accessing the thread succeeded, in
34 Delayed_insert::get_local_table() the table of the thread is copied
35 for local use. A copy is required because the normal insert logic
36 works on a target table, but the other threads table object must not
37 be used. The insert logic uses the record buffer to create a record.
38 And the delayed insert thread uses the record buffer to pass the
39 record to the table handler. So there must be different objects. Also
40 the copied table is not included in the lock, so that the statement
41 can proceed even if the real table cannot be accessed at this moment.
42
43 Copying a table object is not a trivial operation. Besides the TABLE
44 object there are the field pointer array, the field objects and the
45 record buffer. After copying the field objects, their pointers into
46 the record must be "moved" to point to the new record buffer.
47
48 After this setup the normal insert logic is used. Only that for
49 delayed inserts write_delayed() is called instead of write_record().
50 It inserts the rows into a queue and signals the delayed insert thread
51 instead of writing directly to the table.
52
53 The delayed insert thread awakes from the signal. It locks the table,
54 inserts the rows from the queue, unlocks the table, and waits for the
55 next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
56
57 */
58
59 #include "mariadb.h" /* NO_EMBEDDED_ACCESS_CHECKS */
60 #include "sql_priv.h"
61 #include "sql_insert.h"
62 #include "sql_update.h" // compare_record
63 #include "sql_base.h" // close_thread_tables
64 #include "sql_cache.h" // query_cache_*
65 #include "key.h" // key_copy
66 #include "lock.h" // mysql_unlock_tables
67 #include "sp_head.h"
68 #include "sql_view.h" // check_key_in_view, insert_view_fields
69 #include "sql_table.h" // mysql_create_table_no_lock
70 #include "sql_trigger.h"
71 #include "sql_select.h"
72 #include "sql_show.h"
73 #include "slave.h"
74 #include "sql_parse.h" // end_active_trans
75 #include "rpl_mi.h"
76 #include "transaction.h"
77 #include "sql_audit.h"
78 #include "sql_derived.h" // mysql_handle_derived
79 #include "sql_prepare.h"
80 #include <my_bit.h>
81
82 #include "debug_sync.h"
83
84 #ifdef WITH_WSREP
85 #include "wsrep_trans_observer.h" /* wsrep_start_transction() */
86 #endif /* WITH_WSREP */
87
88 #ifndef EMBEDDED_LIBRARY
89 static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
90 TABLE_LIST *table_list);
91 static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
92 LEX_STRING query, bool ignore, bool log_on);
93 static void end_delayed_insert(THD *thd);
94 pthread_handler_t handle_delayed_insert(void *arg);
95 static void unlink_blobs(TABLE *table);
96 #endif
97 static bool check_view_insertability(THD *thd, TABLE_LIST *view);
98 static int binlog_show_create_table(THD *thd, TABLE *table,
99 Table_specification_st *create_info);
100
101 /*
102 Check that insert/update fields are from the same single table of a view.
103
104 @param fields The insert/update fields to be checked.
105 @param values The insert/update values to be checked, NULL if
106 checking is not wanted.
107 @param view The view for insert.
108 @param map [in/out] The insert table map.
109
110 This function is called in 2 cases:
111 1. to check insert fields. In this case *map will be set to 0.
112 Insert fields are checked to be all from the same single underlying
113 table of the given view. Otherwise the error is thrown. Found table
114 map is returned in the map parameter.
115 2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
116 In this case *map contains table_map found on the previous call of
117 the function to check insert fields. Update fields are checked to be
118 from the same table as the insert fields.
119
120 @returns false if success.
121 */
122
check_view_single_update(List<Item> & fields,List<Item> * values,TABLE_LIST * view,table_map * map,bool insert)123 static bool check_view_single_update(List<Item> &fields, List<Item> *values,
124 TABLE_LIST *view, table_map *map,
125 bool insert)
126 {
127 /* it is join view => we need to find the table for update */
128 List_iterator_fast<Item> it(fields);
129 Item *item;
130 TABLE_LIST *tbl= 0; // reset for call to check_single_table()
131 table_map tables= 0;
132
133 while ((item= it++))
134 tables|= item->used_tables();
135
136 /*
137 Check that table is only one
138 (we can not rely on check_single_table because it skips some
139 types of tables)
140 */
141 if (my_count_bits(tables) > 1)
142 goto error;
143
144 if (values)
145 {
146 it.init(*values);
147 while ((item= it++))
148 tables|= item->view_used_tables(view);
149 }
150
151 /* Convert to real table bits */
152 tables&= ~PSEUDO_TABLE_BITS;
153
154 /* Check found map against provided map */
155 if (*map)
156 {
157 if (tables != *map)
158 goto error;
159 return FALSE;
160 }
161
162 if (view->check_single_table(&tbl, tables, view) || tbl == 0)
163 goto error;
164
165 /* view->table should have been set in mysql_derived_merge_for_insert */
166 DBUG_ASSERT(view->table);
167
168 /*
169 Use buffer for the insert values that was allocated for the merged view.
170 */
171 tbl->table->insert_values= view->table->insert_values;
172 view->table= tbl->table;
173 if (!tbl->single_table_updatable())
174 {
175 if (insert)
176 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), view->alias.str, "INSERT");
177 else
178 my_error(ER_NON_UPDATABLE_TABLE, MYF(0), view->alias.str, "UPDATE");
179 return TRUE;
180 }
181 *map= tables;
182
183 return FALSE;
184
185 error:
186 my_error(ER_VIEW_MULTIUPDATE, MYF(0),
187 view->view_db.str, view->view_name.str);
188 return TRUE;
189 }
190
191
192 /*
193 Check if insert fields are correct.
194
195 @param thd The current thread.
196 @param table_list The table we are inserting into (may be view)
197 @param fields The insert fields.
198 @param values The insert values.
199 @param check_unique If duplicate values should be rejected.
200 @param fields_and_values_from_different_maps If 'values' are allowed to
201 refer to other tables than those of 'fields'
202 @param map See check_view_single_update
203
204 @returns 0 if success, -1 if error
205 */
206
check_insert_fields(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<Item> & values,bool check_unique,bool fields_and_values_from_different_maps,table_map * map)207 static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
208 List<Item> &fields, List<Item> &values,
209 bool check_unique,
210 bool fields_and_values_from_different_maps,
211 table_map *map)
212 {
213 TABLE *table= table_list->table;
214 DBUG_ENTER("check_insert_fields");
215
216 if (!table_list->single_table_updatable())
217 {
218 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
219 DBUG_RETURN(-1);
220 }
221
222 if (fields.elements == 0 && values.elements != 0)
223 {
224 if (!table)
225 {
226 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
227 table_list->view_db.str, table_list->view_name.str);
228 DBUG_RETURN(-1);
229 }
230 if (values.elements != table->s->visible_fields)
231 {
232 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
233 DBUG_RETURN(-1);
234 }
235 #ifndef NO_EMBEDDED_ACCESS_CHECKS
236 Field_iterator_table_ref field_it;
237 field_it.set(table_list);
238 if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
239 DBUG_RETURN(-1);
240 #endif
241 /*
242 No fields are provided so all fields must be provided in the values.
243 Thus we set all bits in the write set.
244 */
245 bitmap_set_all(table->write_set);
246 }
247 else
248 { // Part field list
249 SELECT_LEX *select_lex= thd->lex->first_select_lex();
250 Name_resolution_context *context= &select_lex->context;
251 Name_resolution_context_state ctx_state;
252 int res;
253
254 if (fields.elements != values.elements)
255 {
256 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
257 DBUG_RETURN(-1);
258 }
259
260 thd->dup_field= 0;
261 select_lex->no_wrap_view_item= TRUE;
262
263 /* Save the state of the current name resolution context. */
264 ctx_state.save_state(context, table_list);
265
266 /*
267 Perform name resolution only in the first table - 'table_list',
268 which is the table that is inserted into.
269 */
270 table_list->next_local= 0;
271 context->resolve_in_table_list_only(table_list);
272 /* 'Unfix' fields to allow correct marking by the setup_fields function. */
273 if (table_list->is_view())
274 unfix_fields(fields);
275
276 res= setup_fields(thd, Ref_ptr_array(),
277 fields, MARK_COLUMNS_WRITE, 0, NULL, 0);
278
279 /* Restore the current context. */
280 ctx_state.restore_state(context, table_list);
281 thd->lex->first_select_lex()->no_wrap_view_item= FALSE;
282
283 if (res)
284 DBUG_RETURN(-1);
285
286 if (table_list->is_view() && table_list->is_merged_derived())
287 {
288 if (check_view_single_update(fields,
289 fields_and_values_from_different_maps ?
290 (List<Item>*) 0 : &values,
291 table_list, map, true))
292 DBUG_RETURN(-1);
293 table= table_list->table;
294 }
295
296 if (check_unique && thd->dup_field)
297 {
298 my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0),
299 thd->dup_field->field_name.str);
300 DBUG_RETURN(-1);
301 }
302 }
303 // For the values we need select_priv
304 #ifndef NO_EMBEDDED_ACCESS_CHECKS
305 table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
306 #endif
307
308 if (check_key_in_view(thd, table_list) ||
309 (table_list->view &&
310 check_view_insertability(thd, table_list)))
311 {
312 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
313 DBUG_RETURN(-1);
314 }
315
316 DBUG_RETURN(0);
317 }
318
has_no_default_value(THD * thd,Field * field,TABLE_LIST * table_list)319 static bool has_no_default_value(THD *thd, Field *field, TABLE_LIST *table_list)
320 {
321 if ((field->flags & NO_DEFAULT_VALUE_FLAG) && field->real_type() != MYSQL_TYPE_ENUM)
322 {
323 bool view= false;
324 if (table_list)
325 {
326 table_list= table_list->top_table();
327 view= table_list->view != NULL;
328 }
329 if (view)
330 {
331 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_VIEW_FIELD,
332 ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
333 table_list->view_db.str, table_list->view_name.str);
334 }
335 else
336 {
337 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_FIELD,
338 ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
339 field->field_name.str);
340 }
341 return thd->really_abort_on_warning();
342 }
343 return false;
344 }
345
346
347 /**
348 Check if update fields are correct.
349
350 @param thd The current thread.
351 @param insert_table_list The table we are inserting into (may be view)
352 @param update_fields The update fields.
353 @param update_values The update values.
354 @param fields_and_values_from_different_maps If 'update_values' are allowed to
355 refer to other tables than those of 'update_fields'
356 @param map See check_view_single_update
357
358 @note
359 If the update fields include an autoinc field, set the
360 table->next_number_field_updated flag.
361
362 @returns 0 if success, -1 if error
363 */
364
check_update_fields(THD * thd,TABLE_LIST * insert_table_list,List<Item> & update_fields,List<Item> & update_values,bool fields_and_values_from_different_maps,table_map * map)365 static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
366 List<Item> &update_fields,
367 List<Item> &update_values,
368 bool fields_and_values_from_different_maps,
369 table_map *map)
370 {
371 TABLE *table= insert_table_list->table;
372 my_bool UNINIT_VAR(autoinc_mark);
373
374 table->next_number_field_updated= FALSE;
375
376 if (table->found_next_number_field)
377 {
378 /*
379 Unmark the auto_increment field so that we can check if this is modified
380 by update_fields
381 */
382 autoinc_mark= bitmap_test_and_clear(table->write_set,
383 table->found_next_number_field->
384 field_index);
385 }
386
387 /* Check the fields we are going to modify */
388 if (setup_fields(thd, Ref_ptr_array(),
389 update_fields, MARK_COLUMNS_WRITE, 0, NULL, 0))
390 return -1;
391
392 if (insert_table_list->is_view() &&
393 insert_table_list->is_merged_derived() &&
394 check_view_single_update(update_fields,
395 fields_and_values_from_different_maps ?
396 (List<Item>*) 0 : &update_values,
397 insert_table_list, map, false))
398 return -1;
399
400 if (table->default_field)
401 table->mark_default_fields_for_write(FALSE);
402
403 if (table->found_next_number_field)
404 {
405 if (bitmap_is_set(table->write_set,
406 table->found_next_number_field->field_index))
407 table->next_number_field_updated= TRUE;
408
409 if (autoinc_mark)
410 bitmap_set_bit(table->write_set,
411 table->found_next_number_field->field_index);
412 }
413
414 return 0;
415 }
416
417 /**
418 Upgrade table-level lock of INSERT statement to TL_WRITE if
419 a more concurrent lock is infeasible for some reason. This is
420 necessary for engines without internal locking support (MyISAM).
421 An engine with internal locking implementation might later
422 downgrade the lock in handler::store_lock() method.
423 */
424
425 static
upgrade_lock_type(THD * thd,thr_lock_type * lock_type,enum_duplicates duplic)426 void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
427 enum_duplicates duplic)
428 {
429 if (duplic == DUP_UPDATE ||
430 (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
431 {
432 *lock_type= TL_WRITE_DEFAULT;
433 return;
434 }
435
436 if (*lock_type == TL_WRITE_DELAYED)
437 {
438 /*
439 We do not use delayed threads if:
440 - we're running in the safe mode or skip-new mode -- the
441 feature is disabled in these modes
442 - we're executing this statement on a replication slave --
443 we need to ensure serial execution of queries on the
444 slave
445 - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
446 insert cannot be concurrent
447 - this statement is directly or indirectly invoked from
448 a stored function or trigger (under pre-locking) - to
449 avoid deadlocks, since INSERT DELAYED involves a lock
450 upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
451 attempt while keeping other table level locks.
452 - this statement itself may require pre-locking.
453 We should upgrade the lock even though in most cases
454 delayed functionality may work. Unfortunately, we can't
455 easily identify whether the subject table is not used in
456 the statement indirectly via a stored function or trigger:
457 if it is used, that will lead to a deadlock between the
458 client connection and the delayed thread.
459 */
460 if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
461 thd->variables.max_insert_delayed_threads == 0 ||
462 thd->locked_tables_mode > LTM_LOCK_TABLES ||
463 thd->lex->uses_stored_routines() /*||
464 thd->lex->describe*/)
465 {
466 *lock_type= TL_WRITE;
467 return;
468 }
469 if (thd->slave_thread)
470 {
471 /* Try concurrent insert */
472 *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
473 TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
474 return;
475 }
476
477 bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
478 if (WSREP_BINLOG_FORMAT(global_system_variables.binlog_format) == BINLOG_FORMAT_STMT &&
479 log_on && mysql_bin_log.is_open())
480 {
481 /*
482 Statement-based binary logging does not work in this case, because:
483 a) two concurrent statements may have their rows intermixed in the
484 queue, leading to autoincrement replication problems on slave (because
485 the values generated used for one statement don't depend only on the
486 value generated for the first row of this statement, so are not
487 replicable)
488 b) if first row of the statement has an error the full statement is
489 not binlogged, while next rows of the statement may be inserted.
490 c) if first row succeeds, statement is binlogged immediately with a
491 zero error code (i.e. "no error"), if then second row fails, query
492 will fail on slave too and slave will stop (wrongly believing that the
493 master got no error).
494 So we fallback to non-delayed INSERT.
495 Note that to be fully correct, we should test the "binlog format which
496 the delayed thread is going to use for this row". But in the common case
497 where the global binlog format is not changed and the session binlog
498 format may be changed, that is equal to the global binlog format.
499 We test it without mutex for speed reasons (condition rarely true), and
500 in the common case (global not changed) it is as good as without mutex;
501 if global value is changed, anyway there is uncertainty as the delayed
502 thread may be old and use the before-the-change value.
503 */
504 *lock_type= TL_WRITE;
505 }
506 }
507 }
508
509
510 /**
511 Find or create a delayed insert thread for the first table in
512 the table list, then open and lock the remaining tables.
513 If a table can not be used with insert delayed, upgrade the lock
514 and open and lock all tables using the standard mechanism.
515
516 @param thd thread context
517 @param table_list list of "descriptors" for tables referenced
518 directly in statement SQL text.
519 The first element in the list corresponds to
520 the destination table for inserts, remaining
521 tables, if any, are usually tables referenced
522 by sub-queries in the right part of the
523 INSERT.
524
525 @return Status of the operation. In case of success 'table'
526 member of every table_list element points to an instance of
527 class TABLE.
528
529 @sa open_and_lock_tables for more information about MySQL table
530 level locking
531 */
532
533 static
open_and_lock_for_insert_delayed(THD * thd,TABLE_LIST * table_list)534 bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
535 {
536 MDL_request protection_request;
537 DBUG_ENTER("open_and_lock_for_insert_delayed");
538
539 #ifndef EMBEDDED_LIBRARY
540 /* INSERT DELAYED is not allowed in a read only transaction. */
541 if (thd->tx_read_only)
542 {
543 my_error(ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION, MYF(0));
544 DBUG_RETURN(true);
545 }
546
547 /*
548 In order for the deadlock detector to be able to find any deadlocks
549 caused by the handler thread waiting for GRL or this table, we acquire
550 protection against GRL (global IX metadata lock) and metadata lock on
551 table to being inserted into inside the connection thread.
552 If this goes ok, the tickets are cloned and added to the list of granted
553 locks held by the handler thread.
554 */
555 if (thd->has_read_only_protection())
556 DBUG_RETURN(TRUE);
557
558 MDL_REQUEST_INIT(&protection_request, MDL_key::BACKUP, "", "",
559 MDL_BACKUP_DML, MDL_STATEMENT);
560
561 if (thd->mdl_context.acquire_lock(&protection_request,
562 thd->variables.lock_wait_timeout))
563 DBUG_RETURN(TRUE);
564
565 if (thd->mdl_context.acquire_lock(&table_list->mdl_request,
566 thd->variables.lock_wait_timeout))
567 /*
568 If a lock can't be acquired, it makes no sense to try normal insert.
569 Therefore we just abort the statement.
570 */
571 DBUG_RETURN(TRUE);
572
573 bool error= FALSE;
574 if (delayed_get_table(thd, &protection_request, table_list))
575 error= TRUE;
576 else if (table_list->table)
577 {
578 /*
579 Open tables used for sub-selects or in stored functions, will also
580 cache these functions.
581 */
582 if (open_and_lock_tables(thd, table_list->next_global, TRUE, 0))
583 {
584 end_delayed_insert(thd);
585 error= TRUE;
586 }
587 else
588 {
589 /*
590 First table was not processed by open_and_lock_tables(),
591 we need to set updatability flag "by hand".
592 */
593 if (!table_list->derived && !table_list->view)
594 table_list->updatable= 1; // usual table
595 }
596 }
597
598 /*
599 We can't release protection against GRL and metadata lock on the table
600 being inserted into here. These locks might be required, for example,
601 because this INSERT DELAYED calls functions which may try to update
602 this or another tables (updating the same table is of course illegal,
603 but such an attempt can be discovered only later during statement
604 execution).
605 */
606
607 /*
608 Reset the ticket in case we end up having to use normal insert and
609 therefore will reopen the table and reacquire the metadata lock.
610 */
611 table_list->mdl_request.ticket= NULL;
612
613 if (error || table_list->table)
614 DBUG_RETURN(error);
615 #endif
616 /*
617 * This is embedded library and we don't have auxiliary
618 threads OR
619 * a lock upgrade was requested inside delayed_get_table
620 because
621 - there are too many delayed insert threads OR
622 - the table has triggers.
623 Use a normal insert.
624 */
625 table_list->lock_type= TL_WRITE;
626 DBUG_RETURN(open_and_lock_tables(thd, table_list, TRUE, 0));
627 }
628
629
630 /**
631 Create a new query string for removing DELAYED keyword for
632 multi INSERT DEALAYED statement.
633
634 @param[in] thd Thread handler
635 @param[in] buf Query string
636
637 @return
638 0 ok
639 1 error
640 */
641 static int
create_insert_stmt_from_insert_delayed(THD * thd,String * buf)642 create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
643 {
644 /* Make a copy of thd->query() and then remove the "DELAYED" keyword */
645 if (buf->append(thd->query()) ||
646 buf->replace(thd->lex->keyword_delayed_begin_offset,
647 thd->lex->keyword_delayed_end_offset -
648 thd->lex->keyword_delayed_begin_offset, NULL, 0))
649 return 1;
650 return 0;
651 }
652
653
save_insert_query_plan(THD * thd,TABLE_LIST * table_list)654 static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
655 {
656 Explain_insert* explain= new (thd->mem_root) Explain_insert(thd->mem_root);
657 explain->table_name.append(table_list->table->alias);
658
659 thd->lex->explain->add_insert_plan(explain);
660
661 /* Save subquery children */
662 for (SELECT_LEX_UNIT *unit= thd->lex->first_select_lex()->first_inner_unit();
663 unit;
664 unit= unit->next_unit())
665 {
666 if (unit->explainable())
667 explain->add_child(unit->first_select()->select_number);
668 }
669 }
670
671
field_to_fill()672 Field **TABLE::field_to_fill()
673 {
674 return triggers && triggers->nullable_fields() ? triggers->nullable_fields() : field;
675 }
676
677
678 /**
679 INSERT statement implementation
680
681 SYNOPSIS
682 mysql_insert()
683 result NULL if the insert is not outputing results
684 via 'RETURNING' clause.
685
686 @note Like implementations of other DDL/DML in MySQL, this function
687 relies on the caller to close the thread tables. This is done in the
688 end of dispatch_command().
689 */
mysql_insert(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<List_item> & values_list,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,bool ignore,select_result * result)690 bool mysql_insert(THD *thd, TABLE_LIST *table_list,
691 List<Item> &fields, List<List_item> &values_list,
692 List<Item> &update_fields, List<Item> &update_values,
693 enum_duplicates duplic, bool ignore, select_result *result)
694 {
695 bool retval= true;
696 int error, res;
697 bool transactional_table, joins_freed= FALSE;
698 bool changed;
699 const bool was_insert_delayed= (table_list->lock_type == TL_WRITE_DELAYED);
700 bool using_bulk_insert= 0;
701 uint value_count;
702 ulong counter = 1;
703 /* counter of iteration in bulk PS operation*/
704 ulonglong iteration= 0;
705 ulonglong id;
706 COPY_INFO info;
707 TABLE *table= 0;
708 List_iterator_fast<List_item> its(values_list);
709 List_item *values;
710 Name_resolution_context *context;
711 Name_resolution_context_state ctx_state;
712 SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0;
713 unsigned char *readbuff= NULL;
714
715 #ifndef EMBEDDED_LIBRARY
716 char *query= thd->query();
717 /*
718 log_on is about delayed inserts only.
719 By default, both logs are enabled (this won't cause problems if the server
720 runs without --log-bin).
721 */
722 bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
723 #endif
724 thr_lock_type lock_type;
725 Item *unused_conds= 0;
726 DBUG_ENTER("mysql_insert");
727
728 bzero((char*) &info,sizeof(info));
729 create_explain_query(thd->lex, thd->mem_root);
730 /*
731 Upgrade lock type if the requested lock is incompatible with
732 the current connection mode or table operation.
733 */
734 upgrade_lock_type(thd, &table_list->lock_type, duplic);
735
736 /*
737 We can't write-delayed into a table locked with LOCK TABLES:
738 this will lead to a deadlock, since the delayed thread will
739 never be able to get a lock on the table.
740 */
741 if (table_list->lock_type == TL_WRITE_DELAYED && thd->locked_tables_mode &&
742 find_locked_table(thd->open_tables, table_list->db.str,
743 table_list->table_name.str))
744 {
745 my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
746 table_list->table_name.str);
747 DBUG_RETURN(TRUE);
748 }
749
750 if (table_list->lock_type == TL_WRITE_DELAYED)
751 {
752 if (open_and_lock_for_insert_delayed(thd, table_list))
753 DBUG_RETURN(TRUE);
754 }
755 else
756 {
757 if (open_and_lock_tables(thd, table_list, TRUE, 0))
758 DBUG_RETURN(TRUE);
759 }
760
761 THD_STAGE_INFO(thd, stage_init_update);
762 lock_type= table_list->lock_type;
763 thd->lex->used_tables=0;
764 values= its++;
765 if (bulk_parameters_set(thd))
766 DBUG_RETURN(TRUE);
767 value_count= values->elements;
768
769 if ((res= mysql_prepare_insert(thd, table_list, fields, values,
770 update_fields, update_values, duplic,
771 &unused_conds, FALSE)))
772 {
773 retval= thd->is_error();
774 if (res < 0)
775 {
776 /*
777 Insert should be ignored but we have to log the query in statement
778 format in the binary log
779 */
780 if (thd->binlog_current_query_unfiltered())
781 retval= 1;
782 }
783 goto abort;
784 }
785 /* mysql_prepare_insert sets table_list->table if it was not set */
786 table= table_list->table;
787
788 /* Prepares LEX::returing_list if it is not empty */
789 if (returning)
790 {
791 result->prepare(returning->item_list, NULL);
792 if (thd->is_bulk_op())
793 {
794 /*
795 It is RETURNING which needs network buffer to write result set and
796 it is array binfing which need network buffer to read parameters.
797 So we allocate yet another network buffer.
798 The old buffer will be freed at the end of operation.
799 */
800 DBUG_ASSERT(thd->protocol == &thd->protocol_binary);
801 readbuff= thd->net.buff; // old buffer
802 if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
803 {
804 readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
805 goto abort;
806 }
807 }
808 }
809
810 context= &thd->lex->first_select_lex()->context;
811 /*
812 These three asserts test the hypothesis that the resetting of the name
813 resolution context below is not necessary at all since the list of local
814 tables for INSERT always consists of one table.
815 */
816 DBUG_ASSERT(!table_list->next_local);
817 DBUG_ASSERT(!context->table_list->next_local);
818 DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
819
820 /* Save the state of the current name resolution context. */
821 ctx_state.save_state(context, table_list);
822
823 /*
824 Perform name resolution only in the first table - 'table_list',
825 which is the table that is inserted into.
826 */
827 table_list->next_local= 0;
828 context->resolve_in_table_list_only(table_list);
829 switch_to_nullable_trigger_fields(*values, table);
830
831 while ((values= its++))
832 {
833 counter++;
834 if (values->elements != value_count)
835 {
836 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
837 goto abort;
838 }
839 if (setup_fields(thd, Ref_ptr_array(),
840 *values, MARK_COLUMNS_READ, 0, NULL, 0))
841 goto abort;
842 switch_to_nullable_trigger_fields(*values, table);
843 }
844 its.rewind ();
845
846 /* Restore the current context. */
847 ctx_state.restore_state(context, table_list);
848
849 if (thd->lex->unit.first_select()->optimize_unflattened_subqueries(false))
850 {
851 goto abort;
852 }
853 save_insert_query_plan(thd, table_list);
854 if (thd->lex->describe)
855 {
856 retval= thd->lex->explain->send_explain(thd);
857 goto abort;
858 }
859
860 /*
861 Fill in the given fields and dump it to the table file
862 */
863 info.ignore= ignore;
864 info.handle_duplicates=duplic;
865 info.update_fields= &update_fields;
866 info.update_values= &update_values;
867 info.view= (table_list->view ? table_list : 0);
868 info.table_list= table_list;
869
870 /*
871 Count warnings for all inserts.
872 For single line insert, generate an error if try to set a NOT NULL field
873 to NULL.
874 */
875 thd->count_cuted_fields= ((values_list.elements == 1 &&
876 !ignore) ?
877 CHECK_FIELD_ERROR_FOR_NULL :
878 CHECK_FIELD_WARN);
879 thd->cuted_fields = 0L;
880 table->next_number_field=table->found_next_number_field;
881
882 #ifdef HAVE_REPLICATION
883 if (thd->rgi_slave &&
884 (info.handle_duplicates == DUP_UPDATE) &&
885 (table->next_number_field != NULL) &&
886 rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
887 goto abort;
888 #endif
889
890 error=0;
891 if (duplic == DUP_REPLACE &&
892 (!table->triggers || !table->triggers->has_delete_triggers()))
893 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
894 if (duplic == DUP_UPDATE)
895 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
896 /*
897 let's *try* to start bulk inserts. It won't necessary
898 start them as values_list.elements should be greater than
899 some - handler dependent - threshold.
900 We should not start bulk inserts if this statement uses
901 functions or invokes triggers since they may access
902 to the same table and therefore should not see its
903 inconsistent state created by this optimization.
904 So we call start_bulk_insert to perform nesessary checks on
905 values_list.elements, and - if nothing else - to initialize
906 the code to make the call of end_bulk_insert() below safe.
907 */
908 #ifndef EMBEDDED_LIBRARY
909 if (lock_type != TL_WRITE_DELAYED)
910 #endif /* EMBEDDED_LIBRARY */
911 {
912 bool create_lookup_handler= duplic != DUP_ERROR;
913 if (duplic != DUP_ERROR || ignore)
914 {
915 create_lookup_handler= true;
916 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
917 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
918 {
919 if (table->file->ha_rnd_init_with_error(0))
920 goto abort;
921 }
922 }
923 table->file->prepare_for_insert(create_lookup_handler);
924 /**
925 This is a simple check for the case when the table has a trigger
926 that reads from it, or when the statement invokes a stored function
927 that reads from the table being inserted to.
928 Engines can't handle a bulk insert in parallel with a read form the
929 same table in the same connection.
930 */
931 if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
932 values_list.elements > 1)
933 {
934 using_bulk_insert= 1;
935 table->file->ha_start_bulk_insert(values_list.elements);
936 }
937 else
938 table->file->ha_reset_copy_info();
939 }
940
941 thd->abort_on_warning= !ignore && thd->is_strict_mode();
942
943 table->reset_default_fields();
944 table->prepare_triggers_for_insert_stmt_or_event();
945 table->mark_columns_needed_for_insert();
946
947 if (fields.elements || !value_count || table_list->view != 0)
948 {
949 if (table->triggers &&
950 table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
951 {
952 /* BEFORE INSERT triggers exist, the check will be done later, per row */
953 }
954 else if (check_that_all_fields_are_given_values(thd, table, table_list))
955 {
956 error= 1;
957 goto values_loop_end;
958 }
959 }
960
961 if (table_list->prepare_where(thd, 0, TRUE) ||
962 table_list->prepare_check_option(thd))
963 error= 1;
964
965 switch_to_nullable_trigger_fields(fields, table);
966 switch_to_nullable_trigger_fields(update_fields, table);
967 switch_to_nullable_trigger_fields(update_values, table);
968
969 if (fields.elements || !value_count)
970 {
971 /*
972 There are possibly some default values:
973 INSERT INTO t1 (fields) VALUES ...
974 INSERT INTO t1 VALUES ()
975 */
976 if (table->validate_default_values_of_unset_fields(thd))
977 {
978 error= 1;
979 goto values_loop_end;
980 }
981 }
982 /*
983 If statement returns result set, we need to send the result set metadata
984 to the client so that it knows that it has to expect an EOF or ERROR.
985 At this point we have all the required information to send the result set
986 metadata.
987 */
988 if (returning &&
989 result->send_result_set_metadata(returning->item_list,
990 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
991 goto values_loop_end;
992
993 THD_STAGE_INFO(thd, stage_update);
994 thd->decide_logging_format_low(table);
995 do
996 {
997 DBUG_PRINT("info", ("iteration %llu", iteration));
998 if (iteration && bulk_parameters_set(thd))
999 {
1000 error= 1;
1001 goto values_loop_end;
1002 }
1003
1004 while ((values= its++))
1005 {
1006 if (fields.elements || !value_count)
1007 {
1008 /*
1009 There are possibly some default values:
1010 INSERT INTO t1 (fields) VALUES ...
1011 INSERT INTO t1 VALUES ()
1012 */
1013 restore_record(table,s->default_values); // Get empty record
1014 table->reset_default_fields();
1015 if (unlikely(fill_record_n_invoke_before_triggers(thd, table, fields,
1016 *values, 0,
1017 TRG_EVENT_INSERT)))
1018 {
1019 if (values_list.elements != 1 && ! thd->is_error())
1020 {
1021 info.records++;
1022 continue;
1023 }
1024 /*
1025 TODO: set thd->abort_on_warning if values_list.elements == 1
1026 and check that all items return warning in case of problem with
1027 storing field.
1028 */
1029 error=1;
1030 break;
1031 }
1032 }
1033 else
1034 {
1035 /*
1036 No field list, all fields are set explicitly:
1037 INSERT INTO t1 VALUES (values)
1038 */
1039 if (thd->lex->used_tables || // Column used in values()
1040 table->s->visible_fields != table->s->fields)
1041 restore_record(table,s->default_values); // Get empty record
1042 else
1043 {
1044 TABLE_SHARE *share= table->s;
1045
1046 /*
1047 Fix delete marker. No need to restore rest of record since it will
1048 be overwritten by fill_record() anyway (and fill_record() does not
1049 use default values in this case).
1050 */
1051 table->record[0][0]= share->default_values[0];
1052
1053 /* Fix undefined null_bits. */
1054 if (share->null_bytes > 1 && share->last_null_bit_pos)
1055 {
1056 table->record[0][share->null_bytes - 1]=
1057 share->default_values[share->null_bytes - 1];
1058 }
1059 }
1060 table->reset_default_fields();
1061 if (unlikely(fill_record_n_invoke_before_triggers(thd, table,
1062 table->
1063 field_to_fill(),
1064 *values, 0,
1065 TRG_EVENT_INSERT)))
1066 {
1067 if (values_list.elements != 1 && ! thd->is_error())
1068 {
1069 info.records++;
1070 continue;
1071 }
1072 error=1;
1073 break;
1074 }
1075 }
1076
1077 /*
1078 with triggers a field can get a value *conditionally*, so we have to
1079 repeat has_no_default_value() check for every row
1080 */
1081 if (table->triggers &&
1082 table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
1083 {
1084 for (Field **f=table->field ; *f ; f++)
1085 {
1086 if (unlikely(!(*f)->has_explicit_value() &&
1087 has_no_default_value(thd, *f, table_list)))
1088 {
1089 error= 1;
1090 goto values_loop_end;
1091 }
1092 }
1093 }
1094
1095 if ((res= table_list->view_check_option(thd,
1096 (values_list.elements == 1 ?
1097 0 :
1098 ignore))) ==
1099 VIEW_CHECK_SKIP)
1100 continue;
1101 else if (res == VIEW_CHECK_ERROR)
1102 {
1103 error= 1;
1104 break;
1105 }
1106
1107 #ifndef EMBEDDED_LIBRARY
1108 if (lock_type == TL_WRITE_DELAYED)
1109 {
1110 LEX_STRING const st_query = { query, thd->query_length() };
1111 DEBUG_SYNC(thd, "before_write_delayed");
1112 error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
1113 DEBUG_SYNC(thd, "after_write_delayed");
1114 query=0;
1115 }
1116 else
1117 #endif
1118 error= write_record(thd, table, &info, result);
1119 if (unlikely(error))
1120 break;
1121 thd->get_stmt_da()->inc_current_row_for_warning();
1122 }
1123 its.rewind();
1124 iteration++;
1125 } while (bulk_parameters_iterations(thd));
1126
1127 values_loop_end:
1128 free_underlaid_joins(thd, thd->lex->first_select_lex());
1129 joins_freed= TRUE;
1130
1131 /*
1132 Now all rows are inserted. Time to update logs and sends response to
1133 user
1134 */
1135 #ifndef EMBEDDED_LIBRARY
1136 if (unlikely(lock_type == TL_WRITE_DELAYED))
1137 {
1138 if (likely(!error))
1139 {
1140 info.copied=values_list.elements;
1141 end_delayed_insert(thd);
1142 }
1143 }
1144 else
1145 #endif
1146 {
1147 /*
1148 Do not do this release if this is a delayed insert, it would steal
1149 auto_inc values from the delayed_insert thread as they share TABLE.
1150 */
1151 table->file->ha_release_auto_increment();
1152 if (using_bulk_insert)
1153 {
1154 if (unlikely(table->file->ha_end_bulk_insert()) &&
1155 !error)
1156 {
1157 table->file->print_error(my_errno,MYF(0));
1158 error=1;
1159 }
1160 }
1161 /* Get better status from handler if handler supports it */
1162 if (table->file->copy_info.records)
1163 {
1164 DBUG_ASSERT(info.copied >= table->file->copy_info.copied);
1165 info.touched= table->file->copy_info.touched;
1166 info.copied= table->file->copy_info.copied;
1167 info.deleted= table->file->copy_info.deleted;
1168 info.updated= table->file->copy_info.updated;
1169 }
1170 if (duplic != DUP_ERROR || ignore)
1171 {
1172 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1173 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1174 table->file->ha_rnd_end();
1175 }
1176
1177 transactional_table= table->file->has_transactions_and_rollback();
1178
1179 if (likely(changed= (info.copied || info.deleted || info.updated)))
1180 {
1181 /*
1182 Invalidate the table in the query cache if something changed.
1183 For the transactional algorithm to work the invalidation must be
1184 before binlog writing and ha_autocommit_or_rollback
1185 */
1186 query_cache_invalidate3(thd, table_list, 1);
1187 }
1188
1189 if (thd->transaction->stmt.modified_non_trans_table)
1190 thd->transaction->all.modified_non_trans_table= TRUE;
1191 thd->transaction->all.m_unsafe_rollback_flags|=
1192 (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
1193
1194 if (error <= 0 ||
1195 thd->transaction->stmt.modified_non_trans_table ||
1196 was_insert_delayed)
1197 {
1198 if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
1199 {
1200 int errcode= 0;
1201 if (error <= 0)
1202 {
1203 /*
1204 [Guilhem wrote] Temporary errors may have filled
1205 thd->net.last_error/errno. For example if there has
1206 been a disk full error when writing the row, and it was
1207 MyISAM, then thd->net.last_error/errno will be set to
1208 "disk full"... and the mysql_file_pwrite() will wait until free
1209 space appears, and so when it finishes then the
1210 write_row() was entirely successful
1211 */
1212 /* todo: consider removing */
1213 thd->clear_error();
1214 }
1215 else
1216 errcode= query_error_code(thd, thd->killed == NOT_KILLED);
1217
1218 ScopedStatementReplication scoped_stmt_rpl(
1219 table->versioned(VERS_TRX_ID) ? thd : NULL);
1220 /* bug#22725:
1221
1222 A query which per-row-loop can not be interrupted with
1223 KILLED, like INSERT, and that does not invoke stored
1224 routines can be binlogged with neglecting the KILLED error.
1225
1226 If there was no error (error == zero) until after the end of
1227 inserting loop the KILLED flag that appeared later can be
1228 disregarded since previously possible invocation of stored
1229 routines did not result in any error due to the KILLED. In
1230 such case the flag is ignored for constructing binlog event.
1231 */
1232 DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0);
1233 if (was_insert_delayed && table_list->lock_type == TL_WRITE)
1234 {
1235 /* Binlog INSERT DELAYED as INSERT without DELAYED. */
1236 String log_query;
1237 if (create_insert_stmt_from_insert_delayed(thd, &log_query))
1238 {
1239 sql_print_error("Event Error: An error occurred while creating query string"
1240 "for INSERT DELAYED stmt, before writing it into binary log.");
1241
1242 error= 1;
1243 }
1244 else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1245 log_query.c_ptr(), log_query.length(),
1246 transactional_table, FALSE, FALSE,
1247 errcode) > 0)
1248 error= 1;
1249 }
1250 else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1251 thd->query(), thd->query_length(),
1252 transactional_table, FALSE, FALSE,
1253 errcode) > 0)
1254 error= 1;
1255 }
1256 }
1257 DBUG_ASSERT(transactional_table || !changed ||
1258 thd->transaction->stmt.modified_non_trans_table);
1259 }
1260 THD_STAGE_INFO(thd, stage_end);
1261 /*
1262 We'll report to the client this id:
1263 - if the table contains an autoincrement column and we successfully
1264 inserted an autogenerated value, the autogenerated value.
1265 - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
1266 called, X.
1267 - if the table contains an autoincrement column, and some rows were
1268 inserted, the id of the last "inserted" row (if IGNORE, that value may not
1269 have been really inserted but ignored).
1270 */
1271 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
1272 thd->first_successful_insert_id_in_cur_stmt :
1273 (thd->arg_of_last_insert_id_function ?
1274 thd->first_successful_insert_id_in_prev_stmt :
1275 ((table->next_number_field && info.copied) ?
1276 table->next_number_field->val_int() : 0));
1277 table->next_number_field=0;
1278 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
1279 table->auto_increment_field_not_null= FALSE;
1280 if (duplic == DUP_REPLACE &&
1281 (!table->triggers || !table->triggers->has_delete_triggers()))
1282 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1283
1284 if (unlikely(error))
1285 goto abort;
1286 if (thd->lex->analyze_stmt)
1287 {
1288 retval= 0;
1289 goto abort;
1290 }
1291 DBUG_PRINT("info", ("touched: %llu copied: %llu updated: %llu deleted: %llu",
1292 (ulonglong) info.touched, (ulonglong) info.copied,
1293 (ulonglong) info.updated, (ulonglong) info.deleted));
1294
1295 if ((iteration * values_list.elements) == 1 &&
1296 (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
1297 {
1298 /*
1299 Client expects an EOF/OK packet if result set metadata was sent. If
1300 LEX::has_returning and the statement returns result set
1301 we send EOF which is the indicator of the end of the row stream.
1302 Oherwise we send an OK packet i.e when the statement returns only the
1303 status information
1304 */
1305 if (returning)
1306 result->send_eof();
1307 else
1308 my_ok(thd, info.copied + info.deleted +
1309 ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1310 info.touched : info.updated), id);
1311 }
1312 else
1313 {
1314 char buff[160];
1315 ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1316 info.touched : info.updated);
1317
1318 if (ignore)
1319 sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1320 (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
1321 (ulong) (info.records - info.copied),
1322 (long) thd->get_stmt_da()->current_statement_warn_count());
1323 else
1324 sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1325 (ulong) (info.deleted + updated),
1326 (long) thd->get_stmt_da()->current_statement_warn_count());
1327 if (returning)
1328 result->send_eof();
1329 else
1330 ::my_ok(thd, info.copied + info.deleted + updated, id, buff);
1331 }
1332 thd->abort_on_warning= 0;
1333 if (thd->lex->current_select->first_cond_optimization)
1334 {
1335 thd->lex->current_select->save_leaf_tables(thd);
1336 thd->lex->current_select->first_cond_optimization= 0;
1337 }
1338 if (readbuff)
1339 my_free(readbuff);
1340 DBUG_RETURN(FALSE);
1341
1342 abort:
1343 #ifndef EMBEDDED_LIBRARY
1344 if (lock_type == TL_WRITE_DELAYED)
1345 {
1346 end_delayed_insert(thd);
1347 /*
1348 In case of an error (e.g. data truncation), the data type specific data
1349 in fields (e.g. Field_blob::value) was not taken over
1350 by the delayed writer thread. All fields in table_list->table
1351 will be freed by free_root() soon. We need to free the specific
1352 data before free_root() to avoid a memory leak.
1353 */
1354 for (Field **ptr= table_list->table->field ; *ptr ; ptr++)
1355 (*ptr)->free();
1356 }
1357 #endif
1358 if (table != NULL)
1359 table->file->ha_release_auto_increment();
1360
1361 if (!joins_freed)
1362 free_underlaid_joins(thd, thd->lex->first_select_lex());
1363 thd->abort_on_warning= 0;
1364 if (readbuff)
1365 my_free(readbuff);
1366 DBUG_RETURN(retval);
1367 }
1368
1369
1370 /*
1371 Additional check for insertability for VIEW
1372
1373 SYNOPSIS
1374 check_view_insertability()
1375 thd - thread handler
1376 view - reference on VIEW
1377
1378 IMPLEMENTATION
1379 A view is insertable if the folloings are true:
1380 - All columns in the view are columns from a table
1381 - All not used columns in table have a default values
1382 - All field in view are unique (not referring to the same column)
1383
1384 RETURN
1385 FALSE - OK
1386 view->contain_auto_increment is 1 if and only if the view contains an
1387 auto_increment field
1388
1389 TRUE - can't be used for insert
1390 */
1391
check_view_insertability(THD * thd,TABLE_LIST * view)1392 static bool check_view_insertability(THD * thd, TABLE_LIST *view)
1393 {
1394 uint num= view->view->first_select_lex()->item_list.elements;
1395 TABLE *table= view->table;
1396 Field_translator *trans_start= view->field_translation,
1397 *trans_end= trans_start + num;
1398 Field_translator *trans;
1399 uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
1400 uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1401 MY_BITMAP used_fields;
1402 enum_column_usage saved_column_usage= thd->column_usage;
1403 DBUG_ENTER("check_key_in_view");
1404
1405 if (!used_fields_buff)
1406 DBUG_RETURN(TRUE); // EOM
1407
1408 DBUG_ASSERT(view->table != 0 && view->field_translation != 0);
1409
1410 (void) my_bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
1411 bitmap_clear_all(&used_fields);
1412
1413 view->contain_auto_increment= 0;
1414 /*
1415 we must not set query_id for fields as they're not
1416 really used in this context
1417 */
1418 thd->column_usage= COLUMNS_WRITE;
1419 /* check simplicity and prepare unique test of view */
1420 for (trans= trans_start; trans != trans_end; trans++)
1421 {
1422 if (trans->item->fix_fields_if_needed(thd, &trans->item))
1423 {
1424 thd->column_usage= saved_column_usage;
1425 DBUG_RETURN(TRUE);
1426 }
1427 Item_field *field;
1428 /* simple SELECT list entry (field without expression) */
1429 if (!(field= trans->item->field_for_view_update()))
1430 {
1431 thd->column_usage= saved_column_usage;
1432 DBUG_RETURN(TRUE);
1433 }
1434 if (field->field->unireg_check == Field::NEXT_NUMBER)
1435 view->contain_auto_increment= 1;
1436 /* prepare unique test */
1437 /*
1438 remove collation (or other transparent for update function) if we have
1439 it
1440 */
1441 trans->item= field;
1442 }
1443 thd->column_usage= saved_column_usage;
1444 /* unique test */
1445 for (trans= trans_start; trans != trans_end; trans++)
1446 {
1447 /* Thanks to test above, we know that all columns are of type Item_field */
1448 Item_field *field= (Item_field *)trans->item;
1449 /* check fields belong to table in which we are inserting */
1450 if (field->field->table == table &&
1451 bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1452 DBUG_RETURN(TRUE);
1453 }
1454
1455 DBUG_RETURN(FALSE);
1456 }
1457
1458
1459 /**
1460 TODO remove when MDEV-17395 will be closed
1461
1462 Checks if REPLACE or ON DUPLICATE UPDATE was executed on table containing
1463 WITHOUT OVERLAPS key.
1464
1465 @return
1466 0 if no error
1467 ER_NOT_SUPPORTED_YET if the above condidion was met
1468 */
check_duplic_insert_without_overlaps(THD * thd,TABLE * table,enum_duplicates duplic)1469 int check_duplic_insert_without_overlaps(THD *thd, TABLE *table,
1470 enum_duplicates duplic)
1471 {
1472 if (duplic == DUP_REPLACE || duplic == DUP_UPDATE)
1473 {
1474 for (uint k = 0; k < table->s->keys; k++)
1475 {
1476 if (table->key_info[k].without_overlaps)
1477 {
1478 my_error(ER_NOT_SUPPORTED_YET, MYF(0), "WITHOUT OVERLAPS");
1479 return ER_NOT_SUPPORTED_YET;
1480 }
1481 }
1482 }
1483 return 0;
1484 }
1485
1486 /*
1487 Check if table can be updated
1488
1489 SYNOPSIS
1490 mysql_prepare_insert_check_table()
1491 thd Thread handle
1492 table_list Table list
1493 fields List of fields to be updated
1494 where Pointer to where clause
1495 select_insert Check is making for SELECT ... INSERT
1496
1497 RETURN
1498 FALSE ok
1499 TRUE ERROR
1500 */
1501
mysql_prepare_insert_check_table(THD * thd,TABLE_LIST * table_list,List<Item> & fields,bool select_insert)1502 static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1503 List<Item> &fields,
1504 bool select_insert)
1505 {
1506 bool insert_into_view= (table_list->view != 0);
1507 DBUG_ENTER("mysql_prepare_insert_check_table");
1508
1509 if (!table_list->single_table_updatable())
1510 {
1511 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
1512 DBUG_RETURN(TRUE);
1513 }
1514 /*
1515 first table in list is the one we'll INSERT into, requires INSERT_ACL.
1516 all others require SELECT_ACL only. the ACL requirement below is for
1517 new leaves only anyway (view-constituents), so check for SELECT rather
1518 than INSERT.
1519 */
1520
1521 if (setup_tables_and_check_access(thd,
1522 &thd->lex->first_select_lex()->context,
1523 &thd->lex->first_select_lex()->
1524 top_join_list,
1525 table_list,
1526 thd->lex->first_select_lex()->leaf_tables,
1527 select_insert, INSERT_ACL, SELECT_ACL,
1528 TRUE))
1529 DBUG_RETURN(TRUE);
1530
1531 if (insert_into_view && !fields.elements)
1532 {
1533 thd->lex->empty_field_list_on_rset= 1;
1534 if (!thd->lex->first_select_lex()->leaf_tables.head()->table ||
1535 table_list->is_multitable())
1536 {
1537 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1538 table_list->view_db.str, table_list->view_name.str);
1539 DBUG_RETURN(TRUE);
1540 }
1541 DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
1542 }
1543
1544 DBUG_RETURN(FALSE);
1545 }
1546
1547
1548 /*
1549 Get extra info for tables we insert into
1550
1551 @param table table(TABLE object) we insert into,
1552 might be NULL in case of view
1553 @param table(TABLE_LIST object) or view we insert into
1554 */
1555
prepare_for_positional_update(TABLE * table,TABLE_LIST * tables)1556 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1557 {
1558 if (table)
1559 {
1560 if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1561 table->prepare_for_position();
1562 return;
1563 }
1564
1565 DBUG_ASSERT(tables->view);
1566 List_iterator<TABLE_LIST> it(*tables->view_tables);
1567 TABLE_LIST *tbl;
1568 while ((tbl= it++))
1569 prepare_for_positional_update(tbl->table, tbl);
1570
1571 return;
1572 }
1573
1574
1575 /*
1576 Prepare items in INSERT statement
1577
1578 SYNOPSIS
1579 mysql_prepare_insert()
1580 thd Thread handler
1581 table_list Global/local table list
1582 where Where clause (for insert ... select)
1583 select_insert TRUE if INSERT ... SELECT statement
1584
1585 TODO (in far future)
1586 In cases of:
1587 INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1588 ON DUPLICATE KEY ...
1589 we should be able to refer to sum1 in the ON DUPLICATE KEY part
1590
1591 WARNING
1592 You MUST set table->insert_values to 0 after calling this function
1593 before releasing the table object.
1594
1595 RETURN VALUE
1596 0 OK
1597 >0 error
1598 <0 insert should be ignored
1599 */
1600
mysql_prepare_insert(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List_item * values,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,COND ** where,bool select_insert)1601 int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1602 List<Item> &fields, List_item *values,
1603 List<Item> &update_fields, List<Item> &update_values,
1604 enum_duplicates duplic, COND **where,
1605 bool select_insert)
1606 {
1607 SELECT_LEX *select_lex= thd->lex->first_select_lex();
1608 Name_resolution_context *context= &select_lex->context;
1609 Name_resolution_context_state ctx_state;
1610 bool insert_into_view= (table_list->view != 0);
1611 bool res= 0;
1612 table_map map= 0;
1613 TABLE *table;
1614 DBUG_ENTER("mysql_prepare_insert");
1615 DBUG_PRINT("enter", ("table_list: %p view: %d",
1616 table_list, (int) insert_into_view));
1617 /* INSERT should have a SELECT or VALUES clause */
1618 DBUG_ASSERT (!select_insert || !values);
1619
1620 if (mysql_handle_derived(thd->lex, DT_INIT))
1621 DBUG_RETURN(1);
1622 if (table_list->handle_derived(thd->lex, DT_MERGE_FOR_INSERT))
1623 DBUG_RETURN(1);
1624 if (thd->lex->handle_list_of_derived(table_list, DT_PREPARE))
1625 DBUG_RETURN(1);
1626
1627 if (duplic == DUP_UPDATE)
1628 {
1629 /* it should be allocated before Item::fix_fields() */
1630 if (table_list->set_insert_values(thd->mem_root))
1631 DBUG_RETURN(1);
1632 }
1633
1634 table= table_list->table;
1635
1636 if (table->file->check_if_updates_are_ignored("INSERT"))
1637 DBUG_RETURN(-1);
1638
1639 if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
1640 DBUG_RETURN(1);
1641
1642 /* Prepare the fields in the statement. */
1643 if (values)
1644 {
1645 /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
1646 DBUG_ASSERT (!select_lex->group_list.elements);
1647
1648 /* Save the state of the current name resolution context. */
1649 ctx_state.save_state(context, table_list);
1650
1651 /*
1652 Perform name resolution only in the first table - 'table_list',
1653 which is the table that is inserted into.
1654 */
1655 table_list->next_local= 0;
1656 context->resolve_in_table_list_only(table_list);
1657
1658 res= setup_returning_fields(thd, table_list) ||
1659 setup_fields(thd, Ref_ptr_array(),
1660 *values, MARK_COLUMNS_READ, 0, NULL, 0) ||
1661 check_insert_fields(thd, context->table_list, fields, *values,
1662 !insert_into_view, 0, &map);
1663
1664 if (!res)
1665 res= setup_fields(thd, Ref_ptr_array(),
1666 update_values, MARK_COLUMNS_READ, 0, NULL, 0);
1667
1668 if (!res && duplic == DUP_UPDATE)
1669 {
1670 select_lex->no_wrap_view_item= TRUE;
1671 res= check_update_fields(thd, context->table_list, update_fields,
1672 update_values, false, &map);
1673 select_lex->no_wrap_view_item= FALSE;
1674 }
1675
1676 /* Restore the current context. */
1677 ctx_state.restore_state(context, table_list);
1678 }
1679
1680 if (res)
1681 DBUG_RETURN(res);
1682
1683 if (check_duplic_insert_without_overlaps(thd, table, duplic) != 0)
1684 DBUG_RETURN(true);
1685
1686 if (table->versioned(VERS_TIMESTAMP) && duplic == DUP_REPLACE)
1687 {
1688 // Additional memory may be required to create historical items.
1689 if (table_list->set_insert_values(thd->mem_root))
1690 DBUG_RETURN(1);
1691 }
1692
1693 if (!select_insert)
1694 {
1695 Item *fake_conds= 0;
1696 TABLE_LIST *duplicate;
1697 if ((duplicate= unique_table(thd, table_list, table_list->next_global,
1698 CHECK_DUP_ALLOW_DIFFERENT_ALIAS)))
1699 {
1700 update_non_unique_table_error(table_list, "INSERT", duplicate);
1701 DBUG_RETURN(1);
1702 }
1703 select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
1704 }
1705 /*
1706 Only call prepare_for_posistion() if we are not performing a DELAYED
1707 operation. It will instead be executed by delayed insert thread.
1708 */
1709 if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1710 prepare_for_positional_update(table, table_list);
1711 DBUG_RETURN(0);
1712 }
1713
1714
1715 /* Check if there is more uniq keys after field */
1716
last_uniq_key(TABLE * table,uint keynr)1717 static int last_uniq_key(TABLE *table,uint keynr)
1718 {
1719 /*
1720 When an underlying storage engine informs that the unique key
1721 conflicts are not reported in the ascending order by setting
1722 the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1723 information to determine the last key conflict.
1724
1725 The information about the last key conflict will be used to
1726 do a replace of the new row on the conflicting row, rather
1727 than doing a delete (of old row) + insert (of new row).
1728
1729 Hence check for this flag and disable replacing the last row
1730 by returning 0 always. Returning 0 will result in doing
1731 a delete + insert always.
1732 */
1733 if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1734 return 0;
1735
1736 while (++keynr < table->s->keys)
1737 if (table->key_info[keynr].flags & HA_NOSAME)
1738 return 0;
1739 return 1;
1740 }
1741
1742
1743 /*
1744 Inserts one historical row to a table.
1745
1746 Copies content of the row from table->record[1] to table->record[0],
1747 sets Sys_end to now() and calls ha_write_row() .
1748 */
1749
vers_insert_history_row(TABLE * table)1750 int vers_insert_history_row(TABLE *table)
1751 {
1752 DBUG_ASSERT(table->versioned(VERS_TIMESTAMP));
1753 if (!table->vers_write)
1754 return 0;
1755 restore_record(table,record[1]);
1756
1757 // Set Sys_end to now()
1758 table->vers_update_end();
1759
1760 Field *row_start= table->vers_start_field();
1761 Field *row_end= table->vers_end_field();
1762 if (row_start->cmp(row_start->ptr, row_end->ptr) >= 0)
1763 return 0;
1764
1765 if (table->vfield &&
1766 table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_READ))
1767 return HA_ERR_GENERIC;
1768
1769 return table->file->ha_write_row(table->record[0]);
1770 }
1771
1772 /*
1773 Write a record to table with optional deleting of conflicting records,
1774 invoke proper triggers if needed.
1775
1776 SYNOPSIS
1777 write_record()
1778 thd - thread context
1779 table - table to which record should be written
1780 info - COPY_INFO structure describing handling of duplicates
1781 and which is used for counting number of records inserted
1782 and deleted.
1783 sink - result sink for the RETURNING clause
1784
1785 NOTE
1786 Once this record will be written to table after insert trigger will
1787 be invoked. If instead of inserting new record we will update old one
1788 then both on update triggers will work instead. Similarly both on
1789 delete triggers will be invoked if we will delete conflicting records.
1790
1791 Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which
1792 is updated didn't have transactions.
1793
1794 RETURN VALUE
1795 0 - success
1796 non-0 - error
1797 */
1798
1799
write_record(THD * thd,TABLE * table,COPY_INFO * info,select_result * sink)1800 int write_record(THD *thd, TABLE *table, COPY_INFO *info, select_result *sink)
1801 {
1802 int error, trg_error= 0;
1803 char *key=0;
1804 MY_BITMAP *save_read_set, *save_write_set;
1805 ulonglong prev_insert_id= table->file->next_insert_id;
1806 ulonglong insert_id_for_cur_row= 0;
1807 ulonglong prev_insert_id_for_cur_row= 0;
1808 DBUG_ENTER("write_record");
1809
1810 info->records++;
1811 save_read_set= table->read_set;
1812 save_write_set= table->write_set;
1813
1814 if (info->handle_duplicates == DUP_REPLACE ||
1815 info->handle_duplicates == DUP_UPDATE)
1816 {
1817 while (unlikely(error=table->file->ha_write_row(table->record[0])))
1818 {
1819 uint key_nr;
1820 /*
1821 If we do more than one iteration of this loop, from the second one the
1822 row will have an explicit value in the autoinc field, which was set at
1823 the first call of handler::update_auto_increment(). So we must save
1824 the autogenerated value to avoid thd->insert_id_for_cur_row to become
1825 0.
1826 */
1827 if (table->file->insert_id_for_cur_row > 0)
1828 insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1829 else
1830 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1831 bool is_duplicate_key_error;
1832 if (table->file->is_fatal_error(error, HA_CHECK_ALL))
1833 goto err;
1834 is_duplicate_key_error=
1835 table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP);
1836 if (!is_duplicate_key_error)
1837 {
1838 /*
1839 We come here when we had an ignorable error which is not a duplicate
1840 key error. In this we ignore error if ignore flag is set, otherwise
1841 report error as usual. We will not do any duplicate key processing.
1842 */
1843 if (info->ignore)
1844 {
1845 table->file->print_error(error, MYF(ME_WARNING));
1846 goto after_trg_or_ignored_err; /* Ignoring a not fatal error */
1847 }
1848 goto err;
1849 }
1850 if (unlikely((int) (key_nr = table->file->get_dup_key(error)) < 0))
1851 {
1852 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1853 goto err;
1854 }
1855 DEBUG_SYNC(thd, "write_row_replace");
1856
1857 /* Read all columns for the row we are going to replace */
1858 table->use_all_columns();
1859 /*
1860 Don't allow REPLACE to replace a row when a auto_increment column
1861 was used. This ensures that we don't get a problem when the
1862 whole range of the key has been used.
1863 */
1864 if (info->handle_duplicates == DUP_REPLACE && table->next_number_field &&
1865 key_nr == table->s->next_number_index && insert_id_for_cur_row > 0)
1866 goto err;
1867 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1868 {
1869 DBUG_ASSERT(table->file->inited == handler::RND);
1870 if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1871 goto err;
1872 }
1873 else
1874 {
1875 if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1876 {
1877 error=my_errno;
1878 goto err;
1879 }
1880
1881 if (!key)
1882 {
1883 if (!(key=(char*) my_safe_alloca(table->s->max_unique_length)))
1884 {
1885 error=ENOMEM;
1886 goto err;
1887 }
1888 }
1889 key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1890 key_part_map keypart_map= (1 << table->key_info[key_nr].user_defined_key_parts) - 1;
1891 if ((error= (table->file->ha_index_read_idx_map(table->record[1],
1892 key_nr, (uchar*) key,
1893 keypart_map,
1894 HA_READ_KEY_EXACT))))
1895 goto err;
1896 }
1897 if (table->vfield)
1898 {
1899 /*
1900 We have not yet called update_virtual_fields(VOL_UPDATE_FOR_READ)
1901 in handler methods for the just read row in record[1].
1902 */
1903 table->move_fields(table->field, table->record[1], table->record[0]);
1904 int verr = table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE);
1905 table->move_fields(table->field, table->record[0], table->record[1]);
1906 if (verr)
1907 goto err;
1908 }
1909 if (info->handle_duplicates == DUP_UPDATE)
1910 {
1911 int res= 0;
1912 /*
1913 We don't check for other UNIQUE keys - the first row
1914 that matches, is updated. If update causes a conflict again,
1915 an error is returned
1916 */
1917 DBUG_ASSERT(table->insert_values != NULL);
1918 store_record(table,insert_values);
1919 restore_record(table,record[1]);
1920 table->reset_default_fields();
1921
1922 /*
1923 in INSERT ... ON DUPLICATE KEY UPDATE the set of modified fields can
1924 change per row. Thus, we have to do reset_default_fields() per row.
1925 Twice (before insert and before update).
1926 */
1927 DBUG_ASSERT(info->update_fields->elements ==
1928 info->update_values->elements);
1929 if (fill_record_n_invoke_before_triggers(thd, table,
1930 *info->update_fields,
1931 *info->update_values,
1932 info->ignore,
1933 TRG_EVENT_UPDATE))
1934 goto before_trg_err;
1935
1936 bool different_records= (!records_are_comparable(table) ||
1937 compare_record(table));
1938 /*
1939 Default fields must be updated before checking view updateability.
1940 This branch of INSERT is executed only when a UNIQUE key was violated
1941 with the ON DUPLICATE KEY UPDATE option. In this case the INSERT
1942 operation is transformed to an UPDATE, and the default fields must
1943 be updated as if this is an UPDATE.
1944 */
1945 if (different_records && table->default_field)
1946 table->evaluate_update_default_function();
1947
1948 /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1949 res= info->table_list->view_check_option(table->in_use, info->ignore);
1950 if (res == VIEW_CHECK_SKIP)
1951 goto after_trg_or_ignored_err;
1952 if (res == VIEW_CHECK_ERROR)
1953 goto before_trg_err;
1954
1955 table->file->restore_auto_increment(prev_insert_id);
1956 info->touched++;
1957 if (different_records)
1958 {
1959 if (unlikely(error=table->file->ha_update_row(table->record[1],
1960 table->record[0])) &&
1961 error != HA_ERR_RECORD_IS_THE_SAME)
1962 {
1963 if (info->ignore &&
1964 !table->file->is_fatal_error(error, HA_CHECK_ALL))
1965 {
1966 if (!(thd->variables.old_behavior &
1967 OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
1968 table->file->print_error(error, MYF(ME_WARNING));
1969 goto after_trg_or_ignored_err;
1970 }
1971 goto err;
1972 }
1973
1974 if (error != HA_ERR_RECORD_IS_THE_SAME)
1975 {
1976 info->updated++;
1977 if (table->versioned())
1978 {
1979 if (table->versioned(VERS_TIMESTAMP))
1980 {
1981 store_record(table, record[2]);
1982 if ((error= vers_insert_history_row(table)))
1983 {
1984 info->last_errno= error;
1985 table->file->print_error(error, MYF(0));
1986 trg_error= 1;
1987 restore_record(table, record[2]);
1988 goto after_trg_or_ignored_err;
1989 }
1990 restore_record(table, record[2]);
1991 }
1992 info->copied++;
1993 }
1994 }
1995 else
1996 error= 0;
1997 /*
1998 If ON DUP KEY UPDATE updates a row instead of inserting
1999 one, it's like a regular UPDATE statement: it should not
2000 affect the value of a next SELECT LAST_INSERT_ID() or
2001 mysql_insert_id(). Except if LAST_INSERT_ID(#) was in the
2002 INSERT query, which is handled separately by
2003 THD::arg_of_last_insert_id_function.
2004 */
2005 prev_insert_id_for_cur_row= table->file->insert_id_for_cur_row;
2006 insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
2007 trg_error= (table->triggers &&
2008 table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
2009 TRG_ACTION_AFTER, TRUE));
2010 info->copied++;
2011 }
2012
2013 /*
2014 Only update next_insert_id if the AUTO_INCREMENT value was explicitly
2015 updated, so we don't update next_insert_id with the value from the
2016 row being updated. Otherwise reset next_insert_id to what it was
2017 before the duplicate key error, since that value is unused.
2018 */
2019 if (table->next_number_field_updated)
2020 {
2021 DBUG_ASSERT(table->next_number_field != NULL);
2022
2023 table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
2024 }
2025 else if (prev_insert_id_for_cur_row)
2026 {
2027 table->file->restore_auto_increment(prev_insert_id_for_cur_row);
2028 }
2029 goto ok;
2030 }
2031 else /* DUP_REPLACE */
2032 {
2033 /*
2034 The manual defines the REPLACE semantics that it is either
2035 an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
2036 InnoDB do not function in the defined way if we allow MySQL
2037 to convert the latter operation internally to an UPDATE.
2038 We also should not perform this conversion if we have
2039 timestamp field with ON UPDATE which is different from DEFAULT.
2040 Another case when conversion should not be performed is when
2041 we have ON DELETE trigger on table so user may notice that
2042 we cheat here. Note that it is ok to do such conversion for
2043 tables which have ON UPDATE but have no ON DELETE triggers,
2044 we just should not expose this fact to users by invoking
2045 ON UPDATE triggers.
2046 */
2047 if (last_uniq_key(table,key_nr) &&
2048 !table->file->referenced_by_foreign_key() &&
2049 (!table->triggers || !table->triggers->has_delete_triggers()))
2050 {
2051 if (table->versioned(VERS_TRX_ID))
2052 {
2053 bitmap_set_bit(table->write_set, table->vers_start_field()->field_index);
2054 table->file->column_bitmaps_signal();
2055 table->vers_start_field()->store(0, false);
2056 }
2057 if (unlikely(error= table->file->ha_update_row(table->record[1],
2058 table->record[0])) &&
2059 error != HA_ERR_RECORD_IS_THE_SAME)
2060 goto err;
2061 if (likely(!error))
2062 {
2063 info->deleted++;
2064 if (!table->file->has_transactions())
2065 thd->transaction->stmt.modified_non_trans_table= TRUE;
2066 if (table->versioned(VERS_TIMESTAMP))
2067 {
2068 store_record(table, record[2]);
2069 error= vers_insert_history_row(table);
2070 restore_record(table, record[2]);
2071 if (unlikely(error))
2072 goto err;
2073 }
2074 }
2075 else
2076 error= 0; // error was HA_ERR_RECORD_IS_THE_SAME
2077 /*
2078 Since we pretend that we have done insert we should call
2079 its after triggers.
2080 */
2081 goto after_trg_n_copied_inc;
2082 }
2083 else
2084 {
2085 if (table->triggers &&
2086 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
2087 TRG_ACTION_BEFORE, TRUE))
2088 goto before_trg_err;
2089
2090 if (!table->versioned(VERS_TIMESTAMP))
2091 error= table->file->ha_delete_row(table->record[1]);
2092 else
2093 {
2094 store_record(table, record[2]);
2095 restore_record(table, record[1]);
2096 table->vers_update_end();
2097 error= table->file->ha_update_row(table->record[1],
2098 table->record[0]);
2099 restore_record(table, record[2]);
2100 }
2101 if (unlikely(error))
2102 goto err;
2103 if (!table->versioned(VERS_TIMESTAMP))
2104 info->deleted++;
2105 else
2106 info->updated++;
2107 if (!table->file->has_transactions_and_rollback())
2108 thd->transaction->stmt.modified_non_trans_table= TRUE;
2109 if (table->triggers &&
2110 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
2111 TRG_ACTION_AFTER, TRUE))
2112 {
2113 trg_error= 1;
2114 goto after_trg_or_ignored_err;
2115 }
2116 /* Let us attempt do write_row() once more */
2117 }
2118 }
2119 }
2120
2121 /*
2122 If more than one iteration of the above while loop is done, from
2123 the second one the row being inserted will have an explicit
2124 value in the autoinc field, which was set at the first call of
2125 handler::update_auto_increment(). This value is saved to avoid
2126 thd->insert_id_for_cur_row becoming 0. Use this saved autoinc
2127 value.
2128 */
2129 if (table->file->insert_id_for_cur_row == 0)
2130 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
2131
2132 /*
2133 Restore column maps if they where replaced during an duplicate key
2134 problem.
2135 */
2136 if (table->read_set != save_read_set ||
2137 table->write_set != save_write_set)
2138 table->column_bitmaps_set(save_read_set, save_write_set);
2139 }
2140 else if (unlikely((error=table->file->ha_write_row(table->record[0]))))
2141 {
2142 DEBUG_SYNC(thd, "write_row_noreplace");
2143 if (!info->ignore ||
2144 table->file->is_fatal_error(error, HA_CHECK_ALL))
2145 goto err;
2146 if (!(thd->variables.old_behavior &
2147 OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
2148 table->file->print_error(error, MYF(ME_WARNING));
2149 table->file->restore_auto_increment(prev_insert_id);
2150 goto after_trg_or_ignored_err;
2151 }
2152
2153 after_trg_n_copied_inc:
2154 info->copied++;
2155 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
2156 trg_error= (table->triggers &&
2157 table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
2158 TRG_ACTION_AFTER, TRUE));
2159
2160 ok:
2161 /*
2162 We send the row after writing it to the table so that the
2163 correct values are sent to the client. Otherwise it won't show
2164 autoinc values (generated inside the handler::ha_write()) and
2165 values updated in ON DUPLICATE KEY UPDATE.
2166 */
2167 if (sink && sink->send_data(thd->lex->returning()->item_list) < 0)
2168 trg_error= 1;
2169
2170 after_trg_or_ignored_err:
2171 if (key)
2172 my_safe_afree(key,table->s->max_unique_length);
2173 if (!table->file->has_transactions_and_rollback())
2174 thd->transaction->stmt.modified_non_trans_table= TRUE;
2175 DBUG_RETURN(trg_error);
2176
2177 err:
2178 info->last_errno= error;
2179 table->file->print_error(error,MYF(0));
2180
2181 before_trg_err:
2182 table->file->restore_auto_increment(prev_insert_id);
2183 if (key)
2184 my_safe_afree(key, table->s->max_unique_length);
2185 table->column_bitmaps_set(save_read_set, save_write_set);
2186 DBUG_RETURN(1);
2187 }
2188
2189
2190 /******************************************************************************
2191 Check that there aren't any null_fields
2192 ******************************************************************************/
2193
2194
check_that_all_fields_are_given_values(THD * thd,TABLE * entry,TABLE_LIST * table_list)2195 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, TABLE_LIST *table_list)
2196 {
2197 int err= 0;
2198 MY_BITMAP *write_set= entry->write_set;
2199
2200 for (Field **field=entry->field ; *field ; field++)
2201 {
2202 if (!bitmap_is_set(write_set, (*field)->field_index) &&
2203 !(*field)->vers_sys_field() &&
2204 has_no_default_value(thd, *field, table_list) &&
2205 ((*field)->real_type() != MYSQL_TYPE_ENUM))
2206 err=1;
2207 }
2208 return thd->abort_on_warning ? err : 0;
2209 }
2210
2211 /*****************************************************************************
2212 Handling of delayed inserts
2213 A thread is created for each table that one uses with the DELAYED attribute.
2214 *****************************************************************************/
2215
2216 #ifndef EMBEDDED_LIBRARY
2217
2218 class delayed_row :public ilink {
2219 public:
2220 char *record;
2221 enum_duplicates dup;
2222 my_time_t start_time;
2223 ulong start_time_sec_part;
2224 sql_mode_t sql_mode;
2225 bool auto_increment_field_not_null;
2226 bool ignore, log_query, query_start_sec_part_used;
2227 bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2228 ulonglong first_successful_insert_id_in_prev_stmt;
2229 ulonglong forced_insert_id;
2230 ulong auto_increment_increment;
2231 ulong auto_increment_offset;
2232 LEX_STRING query;
2233 Time_zone *time_zone;
2234 char *user, *host, *ip;
2235 query_id_t query_id;
2236 my_thread_id thread_id;
2237
delayed_row(LEX_STRING const query_arg,enum_duplicates dup_arg,bool ignore_arg,bool log_query_arg)2238 delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
2239 bool ignore_arg, bool log_query_arg)
2240 : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
2241 forced_insert_id(0), query(query_arg), time_zone(0),
2242 user(0), host(0), ip(0)
2243 {}
~delayed_row()2244 ~delayed_row()
2245 {
2246 my_free(query.str);
2247 my_free(record);
2248 }
2249 };
2250
2251 /**
2252 Delayed_insert - context of a thread responsible for delayed insert
2253 into one table. When processing delayed inserts, we create an own
2254 thread for every distinct table. Later on all delayed inserts directed
2255 into that table are handled by a dedicated thread.
2256 */
2257
2258 class Delayed_insert :public ilink {
2259 uint locks_in_memory;
2260 thr_lock_type delayed_lock;
2261 public:
2262 THD thd;
2263 TABLE *table;
2264 mysql_mutex_t mutex;
2265 mysql_cond_t cond, cond_client;
2266 uint tables_in_use, stacked_inserts;
2267 volatile bool status;
2268 bool retry;
2269 /**
2270 When the handler thread starts, it clones a metadata lock ticket
2271 which protects against GRL and ticket for the table to be inserted.
2272 This is done to allow the deadlock detector to detect deadlocks
2273 resulting from these locks.
2274 Before this is done, the connection thread cannot safely exit
2275 without causing problems for clone_ticket().
2276 Once handler_thread_initialized has been set, it is safe for the
2277 connection thread to exit.
2278 Access to handler_thread_initialized is protected by di->mutex.
2279 */
2280 bool handler_thread_initialized;
2281 COPY_INFO info;
2282 I_List<delayed_row> rows;
2283 ulong group_count;
2284 TABLE_LIST table_list; // Argument
2285 /**
2286 Request for IX metadata lock protecting against GRL which is
2287 passed from connection thread to the handler thread.
2288 */
2289 MDL_request grl_protection;
Delayed_insert(SELECT_LEX * current_select)2290 Delayed_insert(SELECT_LEX *current_select)
2291 :locks_in_memory(0), thd(next_thread_id()),
2292 table(0),tables_in_use(0), stacked_inserts(0),
2293 status(0), retry(0), handler_thread_initialized(FALSE), group_count(0)
2294 {
2295 DBUG_ENTER("Delayed_insert constructor");
2296 thd.security_ctx->user=(char*) delayed_user;
2297 thd.security_ctx->host=(char*) my_localhost;
2298 thd.security_ctx->ip= NULL;
2299 thd.query_id= 0;
2300 strmake_buf(thd.security_ctx->priv_user, thd.security_ctx->user);
2301 thd.current_tablenr=0;
2302 thd.set_command(COM_DELAYED_INSERT);
2303 thd.lex->current_select= current_select;
2304 thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
2305 /*
2306 Prevent changes to global.lock_wait_timeout from affecting
2307 delayed insert threads as any timeouts in delayed inserts
2308 are not communicated to the client.
2309 */
2310 thd.variables.lock_wait_timeout= LONG_TIMEOUT;
2311
2312 bzero((char*) &thd.net, sizeof(thd.net)); // Safety
2313 bzero((char*) &table_list, sizeof(table_list)); // Safety
2314 thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
2315 thd.security_ctx->host_or_ip= "";
2316 bzero((char*) &info,sizeof(info));
2317 mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
2318 mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
2319 mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
2320 mysql_mutex_lock(&LOCK_delayed_insert);
2321 delayed_insert_threads++;
2322 mysql_mutex_unlock(&LOCK_delayed_insert);
2323 delayed_lock= global_system_variables.low_priority_updates ?
2324 TL_WRITE_LOW_PRIORITY : TL_WRITE;
2325 DBUG_VOID_RETURN;
2326 }
~Delayed_insert()2327 ~Delayed_insert()
2328 {
2329 /* The following is not really needed, but just for safety */
2330 delayed_row *row;
2331 while ((row=rows.get()))
2332 delete row;
2333 if (table)
2334 {
2335 close_thread_tables(&thd);
2336 thd.mdl_context.release_transactional_locks(&thd);
2337 }
2338 mysql_mutex_destroy(&mutex);
2339 mysql_cond_destroy(&cond);
2340 mysql_cond_destroy(&cond_client);
2341
2342 server_threads.erase(&thd);
2343 mysql_mutex_assert_owner(&LOCK_delayed_insert);
2344 delayed_insert_threads--;
2345
2346 my_free(thd.query());
2347 thd.security_ctx->user= 0;
2348 thd.security_ctx->host= 0;
2349 }
2350
2351 /* The following is for checking when we can delete ourselves */
lock()2352 inline void lock()
2353 {
2354 locks_in_memory++; // Assume LOCK_delay_insert
2355 }
unlock()2356 void unlock()
2357 {
2358 mysql_mutex_lock(&LOCK_delayed_insert);
2359 if (!--locks_in_memory)
2360 {
2361 mysql_mutex_lock(&mutex);
2362 if (thd.killed && ! stacked_inserts && ! tables_in_use)
2363 {
2364 mysql_cond_signal(&cond);
2365 status=1;
2366 }
2367 mysql_mutex_unlock(&mutex);
2368 }
2369 mysql_mutex_unlock(&LOCK_delayed_insert);
2370 }
lock_count()2371 inline uint lock_count() { return locks_in_memory; }
2372
2373 TABLE* get_local_table(THD* client_thd);
2374 bool open_and_lock_table();
2375 bool handle_inserts(void);
2376 };
2377
2378
2379 I_List<Delayed_insert> delayed_threads;
2380
2381
2382 /**
2383 Return an instance of delayed insert thread that can handle
2384 inserts into a given table, if it exists. Otherwise return NULL.
2385 */
2386
2387 static
find_handler(THD * thd,TABLE_LIST * table_list)2388 Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
2389 {
2390 THD_STAGE_INFO(thd, stage_waiting_for_delay_list);
2391 mysql_mutex_lock(&LOCK_delayed_insert); // Protect master list
2392 I_List_iterator<Delayed_insert> it(delayed_threads);
2393 Delayed_insert *di;
2394 while ((di= it++))
2395 {
2396 if (!cmp(&table_list->db, &di->table_list.db) &&
2397 !cmp(&table_list->table_name, &di->table_list.table_name))
2398 {
2399 di->lock();
2400 break;
2401 }
2402 }
2403 mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2404 return di;
2405 }
2406
2407
2408 /**
2409 Attempt to find or create a delayed insert thread to handle inserts
2410 into this table.
2411
2412 @return In case of success, table_list->table points to a local copy
2413 of the delayed table or is set to NULL, which indicates a
2414 request for lock upgrade. In case of failure, value of
2415 table_list->table is undefined.
2416 @retval TRUE - this thread ran out of resources OR
2417 - a newly created delayed insert thread ran out of
2418 resources OR
2419 - the created thread failed to open and lock the table
2420 (e.g. because it does not exist) OR
2421 - the table opened in the created thread turned out to
2422 be a view
2423 @retval FALSE - table successfully opened OR
2424 - too many delayed insert threads OR
2425 - the table has triggers and we have to fall back to
2426 a normal INSERT
2427 Two latter cases indicate a request for lock upgrade.
2428
2429 XXX: why do we regard INSERT DELAYED into a view as an error and
2430 do not simply perform a lock upgrade?
2431
2432 TODO: The approach with using two mutexes to work with the
2433 delayed thread list -- LOCK_delayed_insert and
2434 LOCK_delayed_create -- is redundant, and we only need one of
2435 them to protect the list. The reason we have two locks is that
2436 we do not want to block look-ups in the list while we're waiting
2437 for the newly created thread to open the delayed table. However,
2438 this wait itself is redundant -- we always call get_local_table
2439 later on, and there wait again until the created thread acquires
2440 a table lock.
2441
2442 As is redundant the concept of locks_in_memory, since we already
2443 have another counter with similar semantics - tables_in_use,
2444 both of them are devoted to counting the number of producers for
2445 a given consumer (delayed insert thread), only at different
2446 stages of producer-consumer relationship.
2447
2448 The 'status' variable in Delayed_insert is redundant
2449 too, since there is already di->stacked_inserts.
2450 */
2451
2452 static
delayed_get_table(THD * thd,MDL_request * grl_protection_request,TABLE_LIST * table_list)2453 bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
2454 TABLE_LIST *table_list)
2455 {
2456 int error;
2457 Delayed_insert *di;
2458 DBUG_ENTER("delayed_get_table");
2459
2460 /* Must be set in the parser */
2461 DBUG_ASSERT(table_list->db.str);
2462
2463 /* Find the thread which handles this table. */
2464 if (!(di= find_handler(thd, table_list)))
2465 {
2466 /*
2467 No match. Create a new thread to handle the table, but
2468 no more than max_insert_delayed_threads.
2469 */
2470 if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
2471 DBUG_RETURN(0);
2472 THD_STAGE_INFO(thd, stage_creating_delayed_handler);
2473 mysql_mutex_lock(&LOCK_delayed_create);
2474 /*
2475 The first search above was done without LOCK_delayed_create.
2476 Another thread might have created the handler in between. Search again.
2477 */
2478 if (! (di= find_handler(thd, table_list)))
2479 {
2480 if (!(di= new Delayed_insert(thd->lex->current_select)))
2481 goto end_create;
2482
2483 /*
2484 Annotating delayed inserts is not supported.
2485 */
2486 di->thd.variables.binlog_annotate_row_events= 0;
2487
2488 di->thd.set_db(&table_list->db);
2489 di->thd.set_query(my_strndup(PSI_INSTRUMENT_ME,
2490 table_list->table_name.str,
2491 table_list->table_name.length,
2492 MYF(MY_WME | ME_FATAL)),
2493 table_list->table_name.length, system_charset_info);
2494 if (di->thd.db.str == NULL || di->thd.query() == NULL)
2495 {
2496 /* The error is reported */
2497 delete di;
2498 goto end_create;
2499 }
2500 di->table_list= *table_list; // Needed to open table
2501 /* Replace volatile strings with local copies */
2502 di->table_list.alias.str= di->table_list.table_name.str= di->thd.query();
2503 di->table_list.alias.length= di->table_list.table_name.length= di->thd.query_length();
2504 di->table_list.db= di->thd.db;
2505 /*
2506 Nulify select_lex because, if the thread that spawned the current one
2507 disconnects, the select_lex will point to freed memory.
2508 */
2509 di->table_list.select_lex= NULL;
2510 /*
2511 We need the tickets so that they can be cloned in
2512 handle_delayed_insert
2513 */
2514 MDL_REQUEST_INIT(&di->grl_protection, MDL_key::BACKUP, "", "",
2515 MDL_BACKUP_DML, MDL_STATEMENT);
2516 di->grl_protection.ticket= grl_protection_request->ticket;
2517 init_mdl_requests(&di->table_list);
2518 di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
2519
2520 di->lock();
2521 mysql_mutex_lock(&di->mutex);
2522 if ((error= mysql_thread_create(key_thread_delayed_insert,
2523 &di->thd.real_id, &connection_attrib,
2524 handle_delayed_insert, (void*) di)))
2525 {
2526 DBUG_PRINT("error",
2527 ("Can't create thread to handle delayed insert (error %d)",
2528 error));
2529 mysql_mutex_unlock(&di->mutex);
2530 di->unlock();
2531 delete di;
2532 my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATAL), error);
2533 goto end_create;
2534 }
2535
2536 /*
2537 Wait until table is open unless the handler thread or the connection
2538 thread has been killed. Note that we in all cases must wait until the
2539 handler thread has been properly initialized before exiting. Otherwise
2540 we risk doing clone_ticket() on a ticket that is no longer valid.
2541 */
2542 THD_STAGE_INFO(thd, stage_waiting_for_handler_open);
2543 while (!di->handler_thread_initialized ||
2544 (!di->thd.killed && !di->table && !thd->killed))
2545 {
2546 mysql_cond_wait(&di->cond_client, &di->mutex);
2547 }
2548 mysql_mutex_unlock(&di->mutex);
2549 THD_STAGE_INFO(thd, stage_got_old_table);
2550 if (thd->killed)
2551 {
2552 di->unlock();
2553 goto end_create;
2554 }
2555 if (di->thd.killed)
2556 {
2557 if (di->thd.is_error() && ! di->retry)
2558 {
2559 /*
2560 Copy the error message. Note that we don't treat fatal
2561 errors in the delayed thread as fatal errors in the
2562 main thread. If delayed thread was killed, we don't
2563 want to send "Server shutdown in progress" in the
2564 INSERT THREAD.
2565 */
2566 my_message(di->thd.get_stmt_da()->sql_errno(),
2567 di->thd.get_stmt_da()->message(),
2568 MYF(0));
2569 }
2570 di->unlock();
2571 goto end_create;
2572 }
2573 mysql_mutex_lock(&LOCK_delayed_insert);
2574 delayed_threads.append(di);
2575 mysql_mutex_unlock(&LOCK_delayed_insert);
2576 }
2577 mysql_mutex_unlock(&LOCK_delayed_create);
2578 }
2579
2580 mysql_mutex_lock(&di->mutex);
2581 table_list->table= di->get_local_table(thd);
2582 mysql_mutex_unlock(&di->mutex);
2583 if (table_list->table)
2584 {
2585 DBUG_ASSERT(! thd->is_error());
2586 thd->di= di;
2587 }
2588 /* Unlock the delayed insert object after its last access. */
2589 di->unlock();
2590 DBUG_PRINT("exit", ("table_list->table: %p", table_list->table));
2591 DBUG_RETURN(thd->is_error());
2592
2593 end_create:
2594 mysql_mutex_unlock(&LOCK_delayed_create);
2595 DBUG_PRINT("exit", ("is_error(): %d", thd->is_error()));
2596 DBUG_RETURN(thd->is_error());
2597 }
2598
2599 #define memdup_vcol(thd, vcol) \
2600 if (vcol) \
2601 { \
2602 (vcol)= (Virtual_column_info*)(thd)->memdup((vcol), sizeof(*(vcol))); \
2603 (vcol)->expr= NULL; \
2604 }
2605
2606 /**
2607 As we can't let many client threads modify the same TABLE
2608 structure of the dedicated delayed insert thread, we create an
2609 own structure for each client thread. This includes a row
2610 buffer to save the column values and new fields that point to
2611 the new row buffer. The memory is allocated in the client
2612 thread and is freed automatically.
2613
2614 @pre This function is called from the client thread. Delayed
2615 insert thread mutex must be acquired before invoking this
2616 function.
2617
2618 @return Not-NULL table object on success. NULL in case of an error,
2619 which is set in client_thd.
2620 */
2621
get_local_table(THD * client_thd)2622 TABLE *Delayed_insert::get_local_table(THD* client_thd)
2623 {
2624 my_ptrdiff_t adjust_ptrs;
2625 Field **field,**org_field, *found_next_number_field;
2626 TABLE *copy;
2627 TABLE_SHARE *share;
2628 uchar *bitmap;
2629 char *copy_tmp;
2630 uint bitmaps_used;
2631 Field **default_fields, **virtual_fields;
2632 uchar *record;
2633 DBUG_ENTER("Delayed_insert::get_local_table");
2634
2635 /* First request insert thread to get a lock */
2636 status=1;
2637 tables_in_use++;
2638 if (!thd.lock) // Table is not locked
2639 {
2640 THD_STAGE_INFO(client_thd, stage_waiting_for_handler_lock);
2641 mysql_cond_signal(&cond); // Tell handler to lock table
2642 while (!thd.killed && !thd.lock && ! client_thd->killed)
2643 {
2644 mysql_cond_wait(&cond_client, &mutex);
2645 }
2646 THD_STAGE_INFO(client_thd, stage_got_handler_lock);
2647 if (client_thd->killed)
2648 goto error;
2649 if (thd.killed)
2650 {
2651 /*
2652 Check how the insert thread was killed. If it was killed
2653 by FLUSH TABLES which calls kill_delayed_threads_for_table(),
2654 then is_error is not set.
2655 In this case, return without setting an error,
2656 which means that the insert will be converted to a normal insert.
2657 */
2658 if (thd.is_error())
2659 {
2660 /*
2661 Copy the error message. Note that we don't treat fatal
2662 errors in the delayed thread as fatal errors in the
2663 main thread. If delayed thread was killed, we don't
2664 want to send "Server shutdown in progress" in the
2665 INSERT THREAD.
2666
2667 The thread could be killed with an error message if
2668 di->handle_inserts() or di->open_and_lock_table() fails.
2669 */
2670 my_message(thd.get_stmt_da()->sql_errno(),
2671 thd.get_stmt_da()->message(), MYF(0));
2672 }
2673 goto error;
2674 }
2675 }
2676 share= table->s;
2677
2678 /*
2679 Allocate memory for the TABLE object, the field pointers array,
2680 and one record buffer of reclength size.
2681 Normally a table has three record buffers of rec_buff_length size,
2682 which includes alignment bytes. Since the table copy is used for
2683 creating one record only, the other record buffers and alignment
2684 are unnecessary.
2685 As the table will also need to calculate default values and
2686 expresions, we have to allocate own version of fields. keys and key
2687 parts. The key and key parts are needed as parse_vcol_defs() changes
2688 them in case of long hash keys.
2689 */
2690 THD_STAGE_INFO(client_thd, stage_allocating_local_table);
2691 if (!multi_alloc_root(client_thd->mem_root,
2692 ©_tmp, sizeof(*table),
2693 &field, (uint) (share->fields+1)*sizeof(Field**),
2694 &default_fields,
2695 (share->default_fields +
2696 share->default_expressions + 1) * sizeof(Field*),
2697 &virtual_fields,
2698 (share->virtual_fields + 1) * sizeof(Field*),
2699 &record, (uint) share->reclength,
2700 &bitmap, (uint) share->column_bitmap_size*4,
2701 NullS))
2702 goto error;
2703
2704 /* Copy the TABLE object. */
2705 copy= new (copy_tmp) TABLE;
2706 *copy= *table;
2707
2708 /* We don't need to change the file handler here */
2709 /* Assign the pointers for the field pointers array and the record. */
2710 copy->field= field;
2711 copy->record[0]= record;
2712 memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2713 if (share->default_fields || share->default_expressions)
2714 copy->default_field= default_fields;
2715 if (share->virtual_fields)
2716 copy->vfield= virtual_fields;
2717
2718 copy->expr_arena= NULL;
2719
2720 /* Ensure we don't use the table list of the original table */
2721 copy->pos_in_table_list= 0;
2722
2723 /*
2724 Make a copy of all fields.
2725 The copied fields need to point into the copied record. This is done
2726 by copying the field objects with their old pointer values and then
2727 "move" the pointers by the distance between the original and copied
2728 records. That way we preserve the relative positions in the records.
2729 */
2730 adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
2731 found_next_number_field= table->found_next_number_field;
2732 for (org_field= table->field; *org_field; org_field++, field++)
2733 {
2734 if (!(*field= (*org_field)->make_new_field(client_thd->mem_root, copy, 1)))
2735 goto error;
2736 (*field)->unireg_check= (*org_field)->unireg_check;
2737 (*field)->invisible= (*org_field)->invisible;
2738 (*field)->orig_table= copy; // Remove connection
2739 (*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0]
2740 (*field)->flags|= ((*org_field)->flags & LONG_UNIQUE_HASH_FIELD);
2741 (*field)->invisible= (*org_field)->invisible;
2742 memdup_vcol(client_thd, (*field)->vcol_info);
2743 memdup_vcol(client_thd, (*field)->default_value);
2744 memdup_vcol(client_thd, (*field)->check_constraint);
2745 if (*org_field == found_next_number_field)
2746 (*field)->table->found_next_number_field= *field;
2747 }
2748 *field=0;
2749
2750 if (copy_keys_from_share(copy, client_thd->mem_root))
2751 goto error;
2752
2753 if (share->virtual_fields || share->default_expressions ||
2754 share->default_fields)
2755 {
2756 bool error_reported= FALSE;
2757 if (unlikely(parse_vcol_defs(client_thd, client_thd->mem_root, copy,
2758 &error_reported,
2759 VCOL_INIT_DEPENDENCY_FAILURE_IS_WARNING)))
2760 goto error;
2761 }
2762
2763 switch_defaults_to_nullable_trigger_fields(copy);
2764
2765 /* Adjust in_use for pointing to client thread */
2766 copy->in_use= client_thd;
2767
2768 /* Adjust lock_count. This table object is not part of a lock. */
2769 copy->lock_count= 0;
2770
2771 /* Adjust bitmaps */
2772 copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
2773 copy->def_write_set.bitmap= ((my_bitmap_map*)
2774 (bitmap + share->column_bitmap_size));
2775 bitmaps_used= 2;
2776 if (share->default_fields || share->default_expressions)
2777 {
2778 my_bitmap_init(©->has_value_set,
2779 (my_bitmap_map*) (bitmap +
2780 bitmaps_used*share->column_bitmap_size),
2781 share->fields, FALSE);
2782 }
2783 copy->tmp_set.bitmap= 0; // To catch errors
2784 bzero((char*) bitmap, share->column_bitmap_size * bitmaps_used);
2785 copy->read_set= ©->def_read_set;
2786 copy->write_set= ©->def_write_set;
2787
2788 DBUG_RETURN(copy);
2789
2790 /* Got fatal error */
2791 error:
2792 tables_in_use--;
2793 mysql_cond_signal(&cond); // Inform thread about abort
2794 DBUG_RETURN(0);
2795 }
2796
2797
2798 /* Put a question in queue */
2799
2800 static
write_delayed(THD * thd,TABLE * table,enum_duplicates duplic,LEX_STRING query,bool ignore,bool log_on)2801 int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
2802 LEX_STRING query, bool ignore, bool log_on)
2803 {
2804 delayed_row *row= 0;
2805 Delayed_insert *di=thd->di;
2806 const Discrete_interval *forced_auto_inc;
2807 size_t user_len, host_len, ip_length;
2808 DBUG_ENTER("write_delayed");
2809 DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
2810 (ulong) query.length));
2811
2812 THD_STAGE_INFO(thd, stage_waiting_for_handler_insert);
2813 mysql_mutex_lock(&di->mutex);
2814 while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
2815 mysql_cond_wait(&di->cond_client, &di->mutex);
2816 THD_STAGE_INFO(thd, stage_storing_row_into_queue);
2817
2818 if (thd->killed)
2819 goto err;
2820
2821 /*
2822 Take a copy of the query string, if there is any. The string will
2823 be free'ed when the row is destroyed. If there is no query string,
2824 we don't do anything special.
2825 */
2826
2827 if (query.str)
2828 {
2829 char *str;
2830 if (!(str= my_strndup(PSI_INSTRUMENT_ME, query.str, query.length,
2831 MYF(MY_WME))))
2832 goto err;
2833 query.str= str;
2834 }
2835 row= new delayed_row(query, duplic, ignore, log_on);
2836 if (row == NULL)
2837 {
2838 my_free(query.str);
2839 goto err;
2840 }
2841
2842 user_len= host_len= ip_length= 0;
2843 row->user= row->host= row->ip= NULL;
2844 if (thd->security_ctx)
2845 {
2846 if (thd->security_ctx->user)
2847 user_len= strlen(thd->security_ctx->user) + 1;
2848 if (thd->security_ctx->host)
2849 host_len= strlen(thd->security_ctx->host) + 1;
2850 if (thd->security_ctx->ip)
2851 ip_length= strlen(thd->security_ctx->ip) + 1;
2852 }
2853 /* This can't be THREAD_SPECIFIC as it's freed in delayed thread */
2854 if (!(row->record= (char*) my_malloc(PSI_INSTRUMENT_ME,
2855 table->s->reclength +
2856 user_len + host_len + ip_length,
2857 MYF(MY_WME))))
2858 goto err;
2859 memcpy(row->record, table->record[0], table->s->reclength);
2860
2861 if (thd->security_ctx)
2862 {
2863 if (thd->security_ctx->user)
2864 {
2865 row->user= row->record + table->s->reclength;
2866 memcpy(row->user, thd->security_ctx->user, user_len);
2867 }
2868 if (thd->security_ctx->host)
2869 {
2870 row->host= row->record + table->s->reclength + user_len;
2871 memcpy(row->host, thd->security_ctx->host, host_len);
2872 }
2873 if (thd->security_ctx->ip)
2874 {
2875 row->ip= row->record + table->s->reclength + user_len + host_len;
2876 memcpy(row->ip, thd->security_ctx->ip, ip_length);
2877 }
2878 }
2879 row->query_id= thd->query_id;
2880 row->thread_id= thd->thread_id;
2881
2882 row->start_time= thd->start_time;
2883 row->start_time_sec_part= thd->start_time_sec_part;
2884 row->query_start_sec_part_used= thd->query_start_sec_part_used;
2885 /*
2886 those are for the binlog: LAST_INSERT_ID() has been evaluated at this
2887 time, so record does not need it, but statement-based binlogging of the
2888 INSERT will need when the row is actually inserted.
2889 As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
2890 */
2891 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
2892 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2893 row->first_successful_insert_id_in_prev_stmt=
2894 thd->first_successful_insert_id_in_prev_stmt;
2895
2896 /* Add session variable timezone
2897 Time_zone object will not be freed even the thread is ended.
2898 So we can get time_zone object from thread which handling delayed statement.
2899 See the comment of my_tz_find() for detail.
2900 */
2901 if (thd->time_zone_used)
2902 {
2903 row->time_zone = thd->variables.time_zone;
2904 }
2905 else
2906 {
2907 row->time_zone = NULL;
2908 }
2909 /* Copy session variables. */
2910 row->auto_increment_increment= thd->variables.auto_increment_increment;
2911 row->auto_increment_offset= thd->variables.auto_increment_offset;
2912 row->sql_mode= thd->variables.sql_mode;
2913 row->auto_increment_field_not_null= table->auto_increment_field_not_null;
2914
2915 /* Copy the next forced auto increment value, if any. */
2916 if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
2917 {
2918 row->forced_insert_id= forced_auto_inc->minimum();
2919 DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
2920 (ulong) row->forced_insert_id));
2921 }
2922
2923 di->rows.push_back(row);
2924 di->stacked_inserts++;
2925 di->status=1;
2926 if (table->s->blob_fields)
2927 unlink_blobs(table);
2928 mysql_cond_signal(&di->cond);
2929
2930 thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
2931 mysql_mutex_unlock(&di->mutex);
2932 DBUG_RETURN(0);
2933
2934 err:
2935 delete row;
2936 mysql_mutex_unlock(&di->mutex);
2937 DBUG_RETURN(1);
2938 }
2939
2940 /**
2941 Signal the delayed insert thread that this user connection
2942 is finished using it for this statement.
2943 */
2944
end_delayed_insert(THD * thd)2945 static void end_delayed_insert(THD *thd)
2946 {
2947 DBUG_ENTER("end_delayed_insert");
2948 Delayed_insert *di=thd->di;
2949 mysql_mutex_lock(&di->mutex);
2950 DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
2951 if (!--di->tables_in_use || di->thd.killed)
2952 { // Unlock table
2953 di->status=1;
2954 mysql_cond_signal(&di->cond);
2955 }
2956 mysql_mutex_unlock(&di->mutex);
2957 DBUG_VOID_RETURN;
2958 }
2959
2960
2961 /* We kill all delayed threads when doing flush-tables */
2962
kill_delayed_threads(void)2963 void kill_delayed_threads(void)
2964 {
2965 DBUG_ENTER("kill_delayed_threads");
2966 mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
2967
2968 I_List_iterator<Delayed_insert> it(delayed_threads);
2969 Delayed_insert *di;
2970 while ((di= it++))
2971 {
2972 mysql_mutex_lock(&di->thd.LOCK_thd_kill);
2973 if (di->thd.killed < KILL_CONNECTION)
2974 di->thd.set_killed_no_mutex(KILL_CONNECTION);
2975 di->thd.abort_current_cond_wait(false);
2976 mysql_mutex_unlock(&di->thd.LOCK_thd_kill);
2977 }
2978 mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2979 DBUG_VOID_RETURN;
2980 }
2981
2982
2983 /**
2984 A strategy for the prelocking algorithm which prevents the
2985 delayed insert thread from opening tables with engines which
2986 do not support delayed inserts.
2987
2988 Particularly it allows to abort open_tables() as soon as we
2989 discover that we have opened a MERGE table, without acquiring
2990 metadata locks on underlying tables.
2991 */
2992
2993 class Delayed_prelocking_strategy : public Prelocking_strategy
2994 {
2995 public:
2996 virtual bool handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2997 Sroutine_hash_entry *rt, sp_head *sp,
2998 bool *need_prelocking);
2999 virtual bool handle_table(THD *thd, Query_tables_list *prelocking_ctx,
3000 TABLE_LIST *table_list, bool *need_prelocking);
3001 virtual bool handle_view(THD *thd, Query_tables_list *prelocking_ctx,
3002 TABLE_LIST *table_list, bool *need_prelocking);
3003 };
3004
3005
3006 bool Delayed_prelocking_strategy::
handle_table(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)3007 handle_table(THD *thd, Query_tables_list *prelocking_ctx,
3008 TABLE_LIST *table_list, bool *need_prelocking)
3009 {
3010 DBUG_ASSERT(table_list->lock_type == TL_WRITE_DELAYED);
3011
3012 if (!(table_list->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
3013 {
3014 my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), table_list->table_name.str);
3015 return TRUE;
3016 }
3017 return FALSE;
3018 }
3019
3020
3021 bool Delayed_prelocking_strategy::
handle_routine(THD * thd,Query_tables_list * prelocking_ctx,Sroutine_hash_entry * rt,sp_head * sp,bool * need_prelocking)3022 handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
3023 Sroutine_hash_entry *rt, sp_head *sp,
3024 bool *need_prelocking)
3025 {
3026 /* LEX used by the delayed insert thread has no routines. */
3027 DBUG_ASSERT(0);
3028 return FALSE;
3029 }
3030
3031
3032 bool Delayed_prelocking_strategy::
handle_view(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)3033 handle_view(THD *thd, Query_tables_list *prelocking_ctx,
3034 TABLE_LIST *table_list, bool *need_prelocking)
3035 {
3036 /* We don't open views in the delayed insert thread. */
3037 DBUG_ASSERT(0);
3038 return FALSE;
3039 }
3040
3041
3042 /**
3043 Open and lock table for use by delayed thread and check that
3044 this table is suitable for delayed inserts.
3045
3046 @retval FALSE - Success.
3047 @retval TRUE - Failure.
3048 */
3049
open_and_lock_table()3050 bool Delayed_insert::open_and_lock_table()
3051 {
3052 Delayed_prelocking_strategy prelocking_strategy;
3053
3054 /*
3055 Use special prelocking strategy to get ER_DELAYED_NOT_SUPPORTED
3056 error for tables with engines which don't support delayed inserts.
3057
3058 We can't do auto-repair in insert delayed thread, as it would hang
3059 when trying to an exclusive MDL_LOCK on the table during repair
3060 as the connection thread has a SHARED_WRITE lock.
3061 */
3062 if (!(table= open_n_lock_single_table(&thd, &table_list,
3063 TL_WRITE_DELAYED,
3064 MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
3065 MYSQL_OPEN_IGNORE_REPAIR,
3066 &prelocking_strategy)))
3067 {
3068 /* If table was crashed, then upper level should retry open+repair */
3069 retry= table_list.crashed;
3070 thd.fatal_error(); // Abort waiting inserts
3071 return TRUE;
3072 }
3073
3074 if (table->triggers || table->check_constraints)
3075 {
3076 /*
3077 Table has triggers or check constraints. This is not an error, but we do
3078 not support these with delayed insert. Terminate the delayed
3079 thread without an error and thus request lock upgrade.
3080 */
3081 return TRUE;
3082 }
3083 table->copy_blobs= 1;
3084
3085 table->file->prepare_for_row_logging();
3086 return FALSE;
3087 }
3088
3089
3090 /*
3091 * Create a new delayed insert thread
3092 */
3093
handle_delayed_insert(void * arg)3094 pthread_handler_t handle_delayed_insert(void *arg)
3095 {
3096 Delayed_insert *di=(Delayed_insert*) arg;
3097 THD *thd= &di->thd;
3098
3099 pthread_detach_this_thread();
3100 /* Add thread to THD list so that's it's visible in 'show processlist' */
3101 thd->set_start_time();
3102 server_threads.insert(thd);
3103 if (abort_loop)
3104 thd->set_killed(KILL_CONNECTION);
3105 else
3106 thd->reset_killed();
3107
3108 mysql_thread_set_psi_id(thd->thread_id);
3109
3110 /*
3111 Wait until the client runs into mysql_cond_wait(),
3112 where we free it after the table is opened and di linked in the list.
3113 If we did not wait here, the client might detect the opened table
3114 before it is linked to the list. It would release LOCK_delayed_create
3115 and allow another thread to create another handler for the same table,
3116 since it does not find one in the list.
3117 */
3118 mysql_mutex_lock(&di->mutex);
3119 if (my_thread_init())
3120 {
3121 /* Can't use my_error since store_globals has not yet been called */
3122 thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3123 di->handler_thread_initialized= TRUE;
3124 }
3125 else
3126 {
3127 DBUG_ENTER("handle_delayed_insert");
3128 thd->thread_stack= (char*) &thd;
3129 if (init_thr_lock())
3130 {
3131 thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3132 di->handler_thread_initialized= TRUE;
3133 thd->fatal_error();
3134 goto err;
3135 }
3136
3137 thd->store_globals();
3138
3139 thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
3140
3141 /*
3142 INSERT DELAYED has to go to row-based format because the time
3143 at which rows are inserted cannot be determined in mixed mode.
3144 */
3145 thd->set_current_stmt_binlog_format_row_if_mixed();
3146 /* Don't annotate insert delayed binlog events */
3147 thd->variables.binlog_annotate_row_events= 0;
3148
3149 /*
3150 Clone tickets representing protection against GRL and the lock on
3151 the target table for the insert and add them to the list of granted
3152 metadata locks held by the handler thread. This is safe since the
3153 handler thread is not holding nor waiting on any metadata locks.
3154 */
3155 if (thd->mdl_context.clone_ticket(&di->grl_protection) ||
3156 thd->mdl_context.clone_ticket(&di->table_list.mdl_request))
3157 {
3158 thd->release_transactional_locks();
3159 di->handler_thread_initialized= TRUE;
3160 goto err;
3161 }
3162
3163 /*
3164 Now that the ticket has been cloned, it is safe for the connection
3165 thread to exit.
3166 */
3167 di->handler_thread_initialized= TRUE;
3168 di->table_list.mdl_request.ticket= NULL;
3169
3170 thd->set_query_id(next_query_id());
3171
3172 if (di->open_and_lock_table())
3173 goto err;
3174
3175 /*
3176 INSERT DELAYED generally expects thd->lex->current_select to be NULL,
3177 since this is not an attribute of the current thread. This can lead to
3178 problems if the thread that spawned the current one disconnects.
3179 current_select will then point to freed memory. But current_select is
3180 required to resolve the partition function. So, after fulfilling that
3181 requirement, we set the current_select to 0.
3182 */
3183 thd->lex->current_select= NULL;
3184
3185 /* Tell client that the thread is initialized */
3186 mysql_cond_signal(&di->cond_client);
3187
3188 /*
3189 Inform mdl that it needs to call mysql_lock_abort to abort locks
3190 for delayed insert.
3191 */
3192 thd->mdl_context.set_needs_thr_lock_abort(TRUE);
3193
3194 di->table->mark_columns_needed_for_insert();
3195 /* Mark all columns for write as we don't know which columns we get from user */
3196 bitmap_set_all(di->table->write_set);
3197
3198 /* Now wait until we get an insert or lock to handle */
3199 /* We will not abort as long as a client thread uses this thread */
3200
3201 for (;;)
3202 {
3203 if (thd->killed)
3204 {
3205 uint lock_count;
3206 DBUG_PRINT("delayed", ("Insert delayed killed"));
3207 /*
3208 Remove this from delay insert list so that no one can request a
3209 table from this
3210 */
3211 mysql_mutex_unlock(&di->mutex);
3212 mysql_mutex_lock(&LOCK_delayed_insert);
3213 di->unlink();
3214 lock_count=di->lock_count();
3215 mysql_mutex_unlock(&LOCK_delayed_insert);
3216 mysql_mutex_lock(&di->mutex);
3217 if (!lock_count && !di->tables_in_use && !di->stacked_inserts &&
3218 !thd->lock)
3219 break; // Time to die
3220 }
3221
3222 /* Shouldn't wait if killed or an insert is waiting. */
3223 DBUG_PRINT("delayed",
3224 ("thd->killed: %d di->status: %d di->stacked_inserts: %d",
3225 thd->killed, di->status, di->stacked_inserts));
3226 if (!thd->killed && !di->status && !di->stacked_inserts)
3227 {
3228 struct timespec abstime;
3229 set_timespec(abstime, delayed_insert_timeout);
3230
3231 /* Information for pthread_kill */
3232 mysql_mutex_unlock(&di->mutex);
3233 mysql_mutex_lock(&di->thd.mysys_var->mutex);
3234 di->thd.mysys_var->current_mutex= &di->mutex;
3235 di->thd.mysys_var->current_cond= &di->cond;
3236 mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3237 mysql_mutex_lock(&di->mutex);
3238 THD_STAGE_INFO(&(di->thd), stage_waiting_for_insert);
3239
3240 DBUG_PRINT("info",("Waiting for someone to insert rows"));
3241 while (!thd->killed && !di->status)
3242 {
3243 int error;
3244 mysql_audit_release(thd);
3245 error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
3246 #ifdef EXTRA_DEBUG
3247 if (error && error != EINTR && error != ETIMEDOUT)
3248 {
3249 fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
3250 DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
3251 error));
3252 }
3253 #endif
3254 if (error == ETIMEDOUT || error == ETIME)
3255 thd->set_killed(KILL_CONNECTION);
3256 }
3257 /* We can't lock di->mutex and mysys_var->mutex at the same time */
3258 mysql_mutex_unlock(&di->mutex);
3259 mysql_mutex_lock(&di->thd.mysys_var->mutex);
3260 di->thd.mysys_var->current_mutex= 0;
3261 di->thd.mysys_var->current_cond= 0;
3262 mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3263 mysql_mutex_lock(&di->mutex);
3264 }
3265
3266 /*
3267 The code depends on that the following ASSERT always hold.
3268 I don't want to accidently introduce and bugs in the following code
3269 in this commit, so I leave the small cleaning up of the code to
3270 a future commit
3271 */
3272 DBUG_ASSERT(thd->lock || di->stacked_inserts == 0);
3273
3274 DBUG_PRINT("delayed",
3275 ("thd->killed: %d di->status: %d di->stacked_insert: %d di->tables_in_use: %d thd->lock: %d",
3276 thd->killed, di->status, di->stacked_inserts, di->tables_in_use, thd->lock != 0));
3277
3278 /*
3279 This is used to test see what happens if killed is sent before
3280 we have time to handle the insert requests.
3281 */
3282 DBUG_EXECUTE_IF("write_delay_wakeup",
3283 if (!thd->killed && di->stacked_inserts)
3284 my_sleep(500000);
3285 );
3286
3287 if (di->tables_in_use && ! thd->lock &&
3288 (!thd->killed || di->stacked_inserts))
3289 {
3290 thd->set_query_id(next_query_id());
3291 /*
3292 Request for new delayed insert.
3293 Lock the table, but avoid to be blocked by a global read lock.
3294 If we got here while a global read lock exists, then one or more
3295 inserts started before the lock was requested. These are allowed
3296 to complete their work before the server returns control to the
3297 client which requested the global read lock. The delayed insert
3298 handler will close the table and finish when the outstanding
3299 inserts are done.
3300 */
3301 if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 0)))
3302 {
3303 /* Fatal error */
3304 thd->set_killed(KILL_CONNECTION);
3305 }
3306 mysql_cond_broadcast(&di->cond_client);
3307 }
3308 if (di->stacked_inserts)
3309 {
3310 delayed_row *row;
3311 I_List_iterator<delayed_row> it(di->rows);
3312 my_thread_id cur_thd= di->thd.thread_id;
3313
3314 while ((row= it++))
3315 {
3316 if (cur_thd != row->thread_id)
3317 {
3318 mysql_audit_external_lock_ex(&di->thd, row->thread_id,
3319 row->user, row->host, row->ip, row->query_id,
3320 di->table->s, F_WRLCK);
3321 cur_thd= row->thread_id;
3322 }
3323 }
3324 if (di->handle_inserts())
3325 {
3326 /* Some fatal error */
3327 thd->set_killed(KILL_CONNECTION);
3328 }
3329 }
3330 di->status=0;
3331 if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
3332 {
3333 /*
3334 No one is doing a insert delayed
3335 Unlock table so that other threads can use it
3336 */
3337 MYSQL_LOCK *lock=thd->lock;
3338 thd->lock=0;
3339 mysql_mutex_unlock(&di->mutex);
3340 /*
3341 We need to release next_insert_id before unlocking. This is
3342 enforced by handler::ha_external_lock().
3343 */
3344 di->table->file->ha_release_auto_increment();
3345 mysql_unlock_tables(thd, lock);
3346 trans_commit_stmt(thd);
3347 di->group_count=0;
3348 mysql_audit_release(thd);
3349 /*
3350 Reset binlog. We can't call ha_reset() for the table as this will
3351 reset the table maps we have calculated earlier.
3352 */
3353 mysql_mutex_lock(&di->mutex);
3354 }
3355
3356 /*
3357 Reset binlog. We can't call ha_reset() for the table as this will
3358 reset the table maps we have calculated earlier.
3359 */
3360 thd->reset_binlog_for_next_statement();
3361
3362 if (di->tables_in_use)
3363 mysql_cond_broadcast(&di->cond_client); // If waiting clients
3364 }
3365
3366 err:
3367 DBUG_LEAVE;
3368 }
3369
3370 {
3371 DBUG_ENTER("handle_delayed_insert-cleanup");
3372 di->table=0;
3373 mysql_mutex_unlock(&di->mutex);
3374
3375 /*
3376 Protect against mdl_locks trying to access open tables
3377 We use KILL_CONNECTION_HARD here to ensure that
3378 THD::notify_shared_lock() dosn't try to access open tables after
3379 this.
3380 */
3381 mysql_mutex_lock(&thd->LOCK_thd_data);
3382 thd->mdl_context.set_needs_thr_lock_abort(0);
3383 mysql_mutex_unlock(&thd->LOCK_thd_data);
3384 thd->set_killed(KILL_CONNECTION_HARD); // If error
3385
3386 close_thread_tables(thd); // Free the table
3387 thd->release_transactional_locks();
3388 mysql_cond_broadcast(&di->cond_client); // Safety
3389
3390 mysql_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table
3391 mysql_mutex_lock(&LOCK_delayed_insert);
3392 /*
3393 di should be unlinked from the thread handler list and have no active
3394 clients
3395 */
3396 delete di;
3397 mysql_mutex_unlock(&LOCK_delayed_insert);
3398 mysql_mutex_unlock(&LOCK_delayed_create);
3399
3400 DBUG_LEAVE;
3401 }
3402 my_thread_end();
3403 pthread_exit(0);
3404
3405 return 0;
3406 }
3407
3408
3409 /* Remove all pointers to data for blob fields so that original table doesn't try to free them */
3410
unlink_blobs(TABLE * table)3411 static void unlink_blobs(TABLE *table)
3412 {
3413 for (Field **ptr=table->field ; *ptr ; ptr++)
3414 {
3415 if ((*ptr)->flags & BLOB_FLAG)
3416 ((Field_blob *) (*ptr))->clear_temporary();
3417 }
3418 }
3419
3420 /* Free blobs stored in current row */
3421
free_delayed_insert_blobs(TABLE * table)3422 static void free_delayed_insert_blobs(TABLE *table)
3423 {
3424 for (Field **ptr=table->field ; *ptr ; ptr++)
3425 {
3426 if ((*ptr)->flags & BLOB_FLAG)
3427 ((Field_blob *) *ptr)->free();
3428 }
3429 }
3430
3431
3432 /* set value field for blobs to point to data in record */
3433
set_delayed_insert_blobs(TABLE * table)3434 static void set_delayed_insert_blobs(TABLE *table)
3435 {
3436 for (Field **ptr=table->field ; *ptr ; ptr++)
3437 {
3438 if ((*ptr)->flags & BLOB_FLAG)
3439 {
3440 Field_blob *blob= ((Field_blob *) *ptr);
3441 uchar *data= blob->get_ptr();
3442 if (data)
3443 blob->set_value(data); // Set value.ptr() to point to data
3444 }
3445 }
3446 }
3447
3448
handle_inserts(void)3449 bool Delayed_insert::handle_inserts(void)
3450 {
3451 int error;
3452 ulong max_rows;
3453 bool using_ignore= 0, using_opt_replace= 0, using_bin_log;
3454 delayed_row *row;
3455 DBUG_ENTER("handle_inserts");
3456
3457 /* Allow client to insert new rows */
3458 mysql_mutex_unlock(&mutex);
3459
3460 table->next_number_field=table->found_next_number_field;
3461 table->use_all_columns();
3462
3463 THD_STAGE_INFO(&thd, stage_upgrading_lock);
3464 if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
3465 thd.variables.lock_wait_timeout))
3466 {
3467 /*
3468 This can happen if thread is killed either by a shutdown
3469 or if another thread is removing the current table definition
3470 from the table cache.
3471 */
3472 my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATAL | ME_ERROR_LOG),
3473 table->s->table_name.str);
3474 goto err;
3475 }
3476
3477 THD_STAGE_INFO(&thd, stage_insert);
3478 max_rows= delayed_insert_limit;
3479 if (thd.killed || table->s->tdc->flushed)
3480 {
3481 thd.set_killed(KILL_SYSTEM_THREAD);
3482 max_rows= ULONG_MAX; // Do as much as possible
3483 }
3484
3485 if (table->file->ha_rnd_init_with_error(0))
3486 goto err;
3487 /*
3488 We have to call prepare_for_row_logging() as the second call to
3489 handler_writes() will not have called decide_logging_format.
3490 */
3491 table->file->prepare_for_row_logging();
3492 table->file->prepare_for_insert(1);
3493 using_bin_log= table->file->row_logging;
3494
3495 /*
3496 We can't use row caching when using the binary log because if
3497 we get a crash, then binary log will contain rows that are not yet
3498 written to disk, which will cause problems in replication.
3499 */
3500 if (!using_bin_log)
3501 table->file->extra(HA_EXTRA_WRITE_CACHE);
3502
3503 mysql_mutex_lock(&mutex);
3504
3505 while ((row=rows.get()))
3506 {
3507 int tmp_error;
3508 stacked_inserts--;
3509 mysql_mutex_unlock(&mutex);
3510 memcpy(table->record[0],row->record,table->s->reclength);
3511 if (table->s->blob_fields)
3512 set_delayed_insert_blobs(table);
3513
3514 thd.start_time=row->start_time;
3515 thd.start_time_sec_part=row->start_time_sec_part;
3516 thd.query_start_sec_part_used=row->query_start_sec_part_used;
3517 /*
3518 To get the exact auto_inc interval to store in the binlog we must not
3519 use values from the previous interval (of the previous rows).
3520 */
3521 bool log_query= (row->log_query && row->query.str != NULL);
3522 DBUG_PRINT("delayed", ("query: '%s' length: %lu", row->query.str ?
3523 row->query.str : "[NULL]",
3524 (ulong) row->query.length));
3525 if (log_query)
3526 {
3527 /*
3528 Guaranteed that the INSERT DELAYED STMT will not be here
3529 in SBR when mysql binlog is enabled.
3530 */
3531 DBUG_ASSERT(!mysql_bin_log.is_open() ||
3532 thd.is_current_stmt_binlog_format_row());
3533
3534 /*
3535 This is the first value of an INSERT statement.
3536 It is the right place to clear a forced insert_id.
3537 This is usually done after the last value of an INSERT statement,
3538 but we won't know this in the insert delayed thread. But before
3539 the first value is sufficiently equivalent to after the last
3540 value of the previous statement.
3541 */
3542 table->file->ha_release_auto_increment();
3543 thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
3544 }
3545 thd.first_successful_insert_id_in_prev_stmt=
3546 row->first_successful_insert_id_in_prev_stmt;
3547 thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
3548 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
3549 table->auto_increment_field_not_null= row->auto_increment_field_not_null;
3550
3551 /* Copy the session variables. */
3552 thd.variables.auto_increment_increment= row->auto_increment_increment;
3553 thd.variables.auto_increment_offset= row->auto_increment_offset;
3554 thd.variables.sql_mode= row->sql_mode;
3555
3556 /* Copy a forced insert_id, if any. */
3557 if (row->forced_insert_id)
3558 {
3559 DBUG_PRINT("delayed", ("received auto_inc: %lu",
3560 (ulong) row->forced_insert_id));
3561 thd.force_one_auto_inc_interval(row->forced_insert_id);
3562 }
3563
3564 info.ignore= row->ignore;
3565 info.handle_duplicates= row->dup;
3566 if (info.ignore ||
3567 info.handle_duplicates != DUP_ERROR)
3568 {
3569 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3570 using_ignore=1;
3571 }
3572 if (info.handle_duplicates == DUP_REPLACE &&
3573 (!table->triggers ||
3574 !table->triggers->has_delete_triggers()))
3575 {
3576 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3577 using_opt_replace= 1;
3578 }
3579 if (info.handle_duplicates == DUP_UPDATE)
3580 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3581 thd.clear_error(); // reset error for binlog
3582
3583 tmp_error= 0;
3584 if (unlikely(table->vfield))
3585 {
3586 /*
3587 Virtual fields where not calculated by caller as the temporary
3588 TABLE object used had vcol_set empty. Better to calculate them
3589 here to make the caller faster.
3590 */
3591 tmp_error= table->update_virtual_fields(table->file,
3592 VCOL_UPDATE_FOR_WRITE);
3593 }
3594
3595 if (unlikely(tmp_error || write_record(&thd, table, &info, NULL)))
3596 {
3597 info.error_count++; // Ignore errors
3598 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3599 row->log_query = 0;
3600 }
3601
3602 if (using_ignore)
3603 {
3604 using_ignore=0;
3605 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3606 }
3607 if (using_opt_replace)
3608 {
3609 using_opt_replace= 0;
3610 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3611 }
3612
3613 if (table->s->blob_fields)
3614 free_delayed_insert_blobs(table);
3615 thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
3616 thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
3617 mysql_mutex_lock(&mutex);
3618
3619 /*
3620 Reset the table->auto_increment_field_not_null as it is valid for
3621 only one row.
3622 */
3623 table->auto_increment_field_not_null= FALSE;
3624
3625 delete row;
3626 /*
3627 Let READ clients do something once in a while
3628 We should however not break in the middle of a multi-line insert
3629 if we have binary logging enabled as we don't want other commands
3630 on this table until all entries has been processed
3631 */
3632 if (group_count++ >= max_rows && (row= rows.head()) &&
3633 (!(row->log_query & using_bin_log)))
3634 {
3635 group_count=0;
3636 if (stacked_inserts || tables_in_use) // Let these wait a while
3637 {
3638 if (tables_in_use)
3639 mysql_cond_broadcast(&cond_client); // If waiting clients
3640 THD_STAGE_INFO(&thd, stage_reschedule);
3641 mysql_mutex_unlock(&mutex);
3642 if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3643 {
3644 /* This should never happen */
3645 table->file->print_error(error,MYF(0));
3646 sql_print_error("%s", thd.get_stmt_da()->message());
3647 DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
3648 goto err;
3649 }
3650 query_cache_invalidate3(&thd, table, 1);
3651 if (thr_reschedule_write_lock(*thd.lock->locks,
3652 thd.variables.lock_wait_timeout))
3653 {
3654 /* This is not known to happen. */
3655 my_error(ER_DELAYED_CANT_CHANGE_LOCK,
3656 MYF(ME_FATAL | ME_ERROR_LOG),
3657 table->s->table_name.str);
3658 goto err;
3659 }
3660 if (!using_bin_log)
3661 table->file->extra(HA_EXTRA_WRITE_CACHE);
3662 mysql_mutex_lock(&mutex);
3663 THD_STAGE_INFO(&thd, stage_insert);
3664 }
3665 if (tables_in_use)
3666 mysql_cond_broadcast(&cond_client); // If waiting clients
3667 }
3668 }
3669
3670 table->file->ha_rnd_end();
3671
3672 if (WSREP((&thd)))
3673 thd_proc_info(&thd, "Insert done");
3674 else
3675 thd_proc_info(&thd, 0);
3676 mysql_mutex_unlock(&mutex);
3677
3678 /*
3679 We need to flush the pending event when using row-based
3680 replication since the flushing normally done in binlog_query() is
3681 not done last in the statement: for delayed inserts, the insert
3682 statement is logged *before* all rows are inserted.
3683
3684 We can flush the pending event without checking the thd->lock
3685 since the delayed insert *thread* is not inside a stored function
3686 or trigger.
3687
3688 TODO: Move the logging to last in the sequence of rows.
3689 */
3690 if (table->file->row_logging &&
3691 thd.binlog_flush_pending_rows_event(TRUE,
3692 table->file->row_logging_has_trans))
3693 goto err;
3694
3695 if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3696 { // This shouldn't happen
3697 table->file->print_error(error,MYF(0));
3698 sql_print_error("%s", thd.get_stmt_da()->message());
3699 DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
3700 goto err;
3701 }
3702 query_cache_invalidate3(&thd, table, 1);
3703 mysql_mutex_lock(&mutex);
3704 DBUG_RETURN(0);
3705
3706 err:
3707 #ifndef DBUG_OFF
3708 max_rows= 0; // For DBUG output
3709 #endif
3710 /* Remove all not used rows */
3711 mysql_mutex_lock(&mutex);
3712 while ((row=rows.get()))
3713 {
3714 if (table->s->blob_fields)
3715 {
3716 memcpy(table->record[0],row->record,table->s->reclength);
3717 set_delayed_insert_blobs(table);
3718 free_delayed_insert_blobs(table);
3719 }
3720 delete row;
3721 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3722 stacked_inserts--;
3723 #ifndef DBUG_OFF
3724 max_rows++;
3725 #endif
3726 }
3727 DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
3728 thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
3729 DBUG_RETURN(1);
3730 }
3731 #endif /* EMBEDDED_LIBRARY */
3732
3733 /***************************************************************************
3734 Store records in INSERT ... SELECT *
3735 ***************************************************************************/
3736
3737
3738 /*
3739 make insert specific preparation and checks after opening tables
3740
3741 SYNOPSIS
3742 mysql_insert_select_prepare()
3743 thd thread handler
3744
3745 RETURN
3746 0 OK
3747 > 0 Error
3748 < 0 Ok, ignore insert
3749 */
3750
mysql_insert_select_prepare(THD * thd,select_result * sel_res)3751 int mysql_insert_select_prepare(THD *thd, select_result *sel_res)
3752 {
3753 int res;
3754 LEX *lex= thd->lex;
3755 SELECT_LEX *select_lex= lex->first_select_lex();
3756 DBUG_ENTER("mysql_insert_select_prepare");
3757
3758 /*
3759 SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
3760 clause if table is VIEW
3761 */
3762
3763 if ((res= mysql_prepare_insert(thd, lex->query_tables, lex->field_list, 0,
3764 lex->update_list, lex->value_list,
3765 lex->duplicates,
3766 &select_lex->where, TRUE)))
3767 DBUG_RETURN(res);
3768
3769 /*
3770 If sel_res is not empty, it means we have items in returing_list.
3771 So we prepare the list now
3772 */
3773 if (sel_res)
3774 sel_res->prepare(lex->returning()->item_list, NULL);
3775
3776 DBUG_ASSERT(select_lex->leaf_tables.elements != 0);
3777 List_iterator<TABLE_LIST> ti(select_lex->leaf_tables);
3778 TABLE_LIST *table;
3779 uint insert_tables;
3780
3781 if (select_lex->first_cond_optimization)
3782 {
3783 /* Back up leaf_tables list. */
3784 Query_arena *arena= thd->stmt_arena, backup;
3785 arena= thd->activate_stmt_arena_if_needed(&backup); // For easier test
3786
3787 insert_tables= select_lex->insert_tables;
3788 while ((table= ti++) && insert_tables--)
3789 {
3790 select_lex->leaf_tables_exec.push_back(table);
3791 table->tablenr_exec= table->table->tablenr;
3792 table->map_exec= table->table->map;
3793 table->maybe_null_exec= table->table->maybe_null;
3794 }
3795 if (arena)
3796 thd->restore_active_arena(arena, &backup);
3797 }
3798 ti.rewind();
3799 /*
3800 exclude first table from leaf tables list, because it belong to
3801 INSERT
3802 */
3803 /* skip all leaf tables belonged to view where we are insert */
3804 insert_tables= select_lex->insert_tables;
3805 while ((table= ti++) && insert_tables--)
3806 ti.remove();
3807
3808 DBUG_RETURN(0);
3809 }
3810
3811
select_insert(THD * thd_arg,TABLE_LIST * table_list_par,TABLE * table_par,List<Item> * fields_par,List<Item> * update_fields,List<Item> * update_values,enum_duplicates duplic,bool ignore_check_option_errors,select_result * result)3812 select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par,
3813 TABLE *table_par,
3814 List<Item> *fields_par,
3815 List<Item> *update_fields,
3816 List<Item> *update_values,
3817 enum_duplicates duplic,
3818 bool ignore_check_option_errors,
3819 select_result *result):
3820 select_result_interceptor(thd_arg),
3821 sel_result(result),
3822 table_list(table_list_par), table(table_par), fields(fields_par),
3823 autoinc_value_of_last_inserted_row(0),
3824 insert_into_view(table_list_par && table_list_par->view != 0)
3825 {
3826 bzero((char*) &info,sizeof(info));
3827 info.handle_duplicates= duplic;
3828 info.ignore= ignore_check_option_errors;
3829 info.update_fields= update_fields;
3830 info.update_values= update_values;
3831 info.view= (table_list_par->view ? table_list_par : 0);
3832 info.table_list= table_list_par;
3833 }
3834
3835
3836 int
prepare(List<Item> & values,SELECT_LEX_UNIT * u)3837 select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3838 {
3839 LEX *lex= thd->lex;
3840 int res= 0;
3841 table_map map= 0;
3842 SELECT_LEX *lex_current_select_save= lex->current_select;
3843 DBUG_ENTER("select_insert::prepare");
3844
3845 unit= u;
3846
3847 /*
3848 Since table in which we are going to insert is added to the first
3849 select, LEX::current_select should point to the first select while
3850 we are fixing fields from insert list.
3851 */
3852 lex->current_select= lex->first_select_lex();
3853
3854 res= (setup_returning_fields(thd, table_list) ||
3855 setup_fields(thd, Ref_ptr_array(), values, MARK_COLUMNS_READ, 0, 0,
3856 0) ||
3857 check_insert_fields(thd, table_list, *fields, values,
3858 !insert_into_view, 1, &map));
3859
3860 if (!res && fields->elements)
3861 {
3862 Abort_on_warning_instant_set aws(thd,
3863 !info.ignore && thd->is_strict_mode());
3864 res= check_that_all_fields_are_given_values(thd, table_list->table,
3865 table_list);
3866 }
3867
3868 if (info.handle_duplicates == DUP_UPDATE && !res)
3869 {
3870 Name_resolution_context *context= &lex->first_select_lex()->context;
3871 Name_resolution_context_state ctx_state;
3872
3873 /* Save the state of the current name resolution context. */
3874 ctx_state.save_state(context, table_list);
3875
3876 /* Perform name resolution only in the first table - 'table_list'. */
3877 table_list->next_local= 0;
3878 context->resolve_in_table_list_only(table_list);
3879
3880 lex->first_select_lex()->no_wrap_view_item= TRUE;
3881 res= res ||
3882 check_update_fields(thd, context->table_list,
3883 *info.update_fields, *info.update_values,
3884 /*
3885 In INSERT SELECT ON DUPLICATE KEY UPDATE col=x
3886 'x' can legally refer to a non-inserted table.
3887 'x' is not even resolved yet.
3888 */
3889 true,
3890 &map);
3891 lex->first_select_lex()->no_wrap_view_item= FALSE;
3892 /*
3893 When we are not using GROUP BY and there are no ungrouped
3894 aggregate functions we can refer to other tables in the ON
3895 DUPLICATE KEY part. We use next_name_resolution_table
3896 descructively, so check it first (views?)
3897 */
3898 DBUG_ASSERT (!table_list->next_name_resolution_table);
3899 if (lex->first_select_lex()->group_list.elements == 0 &&
3900 !lex->first_select_lex()->with_sum_func)
3901 {
3902 /*
3903 We must make a single context out of the two separate name
3904 resolution contexts : the INSERT table and the tables in the
3905 SELECT part of INSERT ... SELECT. To do that we must
3906 concatenate the two lists
3907 */
3908 table_list->next_name_resolution_table=
3909 ctx_state.get_first_name_resolution_table();
3910 }
3911
3912 res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values,
3913 MARK_COLUMNS_READ, 0, NULL, 0);
3914 if (!res)
3915 {
3916 /*
3917 Traverse the update values list and substitute fields from the
3918 select for references (Item_ref objects) to them. This is done in
3919 order to get correct values from those fields when the select
3920 employs a temporary table.
3921 */
3922 List_iterator<Item> li(*info.update_values);
3923 Item *item;
3924
3925 while ((item= li++))
3926 {
3927 item->transform(thd, &Item::update_value_transformer,
3928 (uchar*)lex->current_select);
3929 }
3930 }
3931
3932 /* Restore the current context. */
3933 ctx_state.restore_state(context, table_list);
3934 }
3935
3936 lex->current_select= lex_current_select_save;
3937 if (res)
3938 DBUG_RETURN(1);
3939 /*
3940 if it is INSERT into join view then check_insert_fields already found
3941 real table for insert
3942 */
3943 table= table_list->table;
3944
3945 /*
3946 Is table which we are changing used somewhere in other parts of
3947 query
3948 */
3949 if (unique_table(thd, table_list, table_list->next_global, 0))
3950 {
3951 /* Using same table for INSERT and SELECT */
3952 lex->current_select->options|= OPTION_BUFFER_RESULT;
3953 lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
3954 }
3955 else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
3956 thd->locked_tables_mode <= LTM_LOCK_TABLES)
3957 {
3958 /*
3959 We must not yet prepare the result table if it is the same as one of the
3960 source tables (INSERT SELECT). The preparation may disable
3961 indexes on the result table, which may be used during the select, if it
3962 is the same table (Bug #6034). Do the preparation after the select phase
3963 in select_insert::prepare2().
3964 We won't start bulk inserts at all if this statement uses functions or
3965 should invoke triggers since they may access to the same table too.
3966 */
3967 table->file->ha_start_bulk_insert((ha_rows) 0);
3968 }
3969 restore_record(table,s->default_values); // Get empty record
3970 table->reset_default_fields();
3971 table->next_number_field=table->found_next_number_field;
3972
3973 #ifdef HAVE_REPLICATION
3974 if (thd->rgi_slave &&
3975 (info.handle_duplicates == DUP_UPDATE) &&
3976 (table->next_number_field != NULL) &&
3977 rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
3978 DBUG_RETURN(1);
3979 #endif
3980
3981 thd->cuted_fields=0;
3982 bool create_lookup_handler= info.handle_duplicates != DUP_ERROR;
3983 if (info.ignore || info.handle_duplicates != DUP_ERROR)
3984 {
3985 create_lookup_handler= true;
3986 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3987 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
3988 {
3989 if (table->file->ha_rnd_init_with_error(0))
3990 DBUG_RETURN(1);
3991 }
3992 }
3993 table->file->prepare_for_insert(create_lookup_handler);
3994 if (info.handle_duplicates == DUP_REPLACE &&
3995 (!table->triggers || !table->triggers->has_delete_triggers()))
3996 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3997 if (info.handle_duplicates == DUP_UPDATE)
3998 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3999 thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
4000 res= (table_list->prepare_where(thd, 0, TRUE) ||
4001 table_list->prepare_check_option(thd));
4002
4003 if (!res)
4004 {
4005 table->prepare_triggers_for_insert_stmt_or_event();
4006 table->mark_columns_needed_for_insert();
4007 }
4008
4009 DBUG_RETURN(res);
4010 }
4011
4012
4013 /*
4014 Finish the preparation of the result table.
4015
4016 SYNOPSIS
4017 select_insert::prepare2()
4018 void
4019
4020 DESCRIPTION
4021 If the result table is the same as one of the source tables
4022 (INSERT SELECT), the result table is not finally prepared at the
4023 join prepair phase. Do the final preparation now.
4024
4025 RETURN
4026 0 OK
4027 */
4028
prepare2(JOIN *)4029 int select_insert::prepare2(JOIN *)
4030 {
4031 DBUG_ENTER("select_insert::prepare2");
4032 if (table->validate_default_values_of_unset_fields(thd))
4033 DBUG_RETURN(1);
4034 if (thd->lex->describe)
4035 DBUG_RETURN(0);
4036 if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
4037 thd->locked_tables_mode <= LTM_LOCK_TABLES)
4038 table->file->ha_start_bulk_insert((ha_rows) 0);
4039
4040 /* Same as the other variants of INSERT */
4041 if (sel_result &&
4042 sel_result->send_result_set_metadata(thd->lex->returning()->item_list,
4043 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
4044 DBUG_RETURN(1);
4045 DBUG_RETURN(0);
4046 }
4047
4048
cleanup()4049 void select_insert::cleanup()
4050 {
4051 /* select_insert/select_create are never re-used in prepared statement */
4052 DBUG_ASSERT(0);
4053 }
4054
~select_insert()4055 select_insert::~select_insert()
4056 {
4057 DBUG_ENTER("~select_insert");
4058 sel_result= NULL;
4059 if (table && table->is_created())
4060 {
4061 table->next_number_field=0;
4062 table->auto_increment_field_not_null= FALSE;
4063 table->file->ha_reset();
4064 }
4065 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
4066 thd->abort_on_warning= 0;
4067 DBUG_VOID_RETURN;
4068 }
4069
4070
send_data(List<Item> & values)4071 int select_insert::send_data(List<Item> &values)
4072 {
4073 DBUG_ENTER("select_insert::send_data");
4074 bool error=0;
4075
4076 thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
4077 store_values(values);
4078 if (table->default_field &&
4079 unlikely(table->update_default_fields(info.ignore)))
4080 DBUG_RETURN(1);
4081 thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
4082 if (unlikely(thd->is_error()))
4083 {
4084 table->auto_increment_field_not_null= FALSE;
4085 DBUG_RETURN(1);
4086 }
4087
4088 table->vers_write= table->versioned();
4089 if (table_list) // Not CREATE ... SELECT
4090 {
4091 switch (table_list->view_check_option(thd, info.ignore)) {
4092 case VIEW_CHECK_SKIP:
4093 DBUG_RETURN(0);
4094 case VIEW_CHECK_ERROR:
4095 DBUG_RETURN(1);
4096 }
4097 }
4098
4099 error= write_record(thd, table, &info, sel_result);
4100 table->vers_write= table->versioned();
4101 table->auto_increment_field_not_null= FALSE;
4102
4103 if (likely(!error))
4104 {
4105 if (table->triggers || info.handle_duplicates == DUP_UPDATE)
4106 {
4107 /*
4108 Restore fields of the record since it is possible that they were
4109 changed by ON DUPLICATE KEY UPDATE clause.
4110
4111 If triggers exist then whey can modify some fields which were not
4112 originally touched by INSERT ... SELECT, so we have to restore
4113 their original values for the next row.
4114 */
4115 restore_record(table, s->default_values);
4116 }
4117 if (table->next_number_field)
4118 {
4119 /*
4120 If no value has been autogenerated so far, we need to remember the
4121 value we just saw, we may need to send it to client in the end.
4122 */
4123 if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
4124 autoinc_value_of_last_inserted_row=
4125 table->next_number_field->val_int();
4126 /*
4127 Clear auto-increment field for the next record, if triggers are used
4128 we will clear it twice, but this should be cheap.
4129 */
4130 table->next_number_field->reset();
4131 }
4132 }
4133 DBUG_RETURN(error);
4134 }
4135
4136
store_values(List<Item> & values)4137 void select_insert::store_values(List<Item> &values)
4138 {
4139 DBUG_ENTER("select_insert::store_values");
4140
4141 if (fields->elements)
4142 fill_record_n_invoke_before_triggers(thd, table, *fields, values, 1,
4143 TRG_EVENT_INSERT);
4144 else
4145 fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
4146 values, 1, TRG_EVENT_INSERT);
4147
4148 DBUG_VOID_RETURN;
4149 }
4150
prepare_eof()4151 bool select_insert::prepare_eof()
4152 {
4153 int error;
4154 bool const trans_table= table->file->has_transactions_and_rollback();
4155 bool changed;
4156 bool binary_logged= 0;
4157 killed_state killed_status= thd->killed;
4158
4159 DBUG_ENTER("select_insert::prepare_eof");
4160 DBUG_PRINT("enter", ("trans_table: %d, table_type: '%s'",
4161 trans_table, table->file->table_type()));
4162
4163 #ifdef WITH_WSREP
4164 error= (thd->wsrep_cs().current_error()) ? -1 :
4165 (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
4166 #else
4167 error= (thd->locked_tables_mode <= LTM_LOCK_TABLES) ?
4168 #endif /* WITH_WSREP */
4169 table->file->ha_end_bulk_insert() : 0;
4170
4171 if (likely(!error) && unlikely(thd->is_error()))
4172 error= thd->get_stmt_da()->sql_errno();
4173
4174 if (info.ignore || info.handle_duplicates != DUP_ERROR)
4175 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
4176 table->file->ha_rnd_end();
4177 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4178 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4179
4180 if (likely((changed= (info.copied || info.deleted || info.updated))))
4181 {
4182 /*
4183 We must invalidate the table in the query cache before binlog writing
4184 and ha_autocommit_or_rollback.
4185 */
4186 query_cache_invalidate3(thd, table, 1);
4187 }
4188
4189 if (thd->transaction->stmt.modified_non_trans_table)
4190 thd->transaction->all.modified_non_trans_table= TRUE;
4191 thd->transaction->all.m_unsafe_rollback_flags|=
4192 (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
4193
4194 DBUG_ASSERT(trans_table || !changed ||
4195 thd->transaction->stmt.modified_non_trans_table);
4196
4197 /*
4198 Write to binlog before commiting transaction. No statement will
4199 be written by the binlog_query() below in RBR mode. All the
4200 events are in the transaction cache and will be written when
4201 ha_autocommit_or_rollback() is issued below.
4202 */
4203 if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
4204 (likely(!error) || thd->transaction->stmt.modified_non_trans_table))
4205 {
4206 int errcode= 0;
4207 int res;
4208 if (likely(!error))
4209 thd->clear_error();
4210 else
4211 errcode= query_error_code(thd, killed_status == NOT_KILLED);
4212 res= thd->binlog_query(THD::ROW_QUERY_TYPE,
4213 thd->query(), thd->query_length(),
4214 trans_table, FALSE, FALSE, errcode);
4215 if (res > 0)
4216 {
4217 table->file->ha_release_auto_increment();
4218 DBUG_RETURN(true);
4219 }
4220 binary_logged= res == 0 || !table->s->tmp_table;
4221 }
4222 table->s->table_creation_was_logged|= binary_logged;
4223 table->file->ha_release_auto_increment();
4224
4225 if (unlikely(error))
4226 {
4227 table->file->print_error(error,MYF(0));
4228 DBUG_RETURN(true);
4229 }
4230
4231 DBUG_RETURN(false);
4232 }
4233
send_ok_packet()4234 bool select_insert::send_ok_packet() {
4235 char message[160]; /* status message */
4236 ulonglong row_count; /* rows affected */
4237 ulonglong id; /* last insert-id */
4238 DBUG_ENTER("select_insert::send_ok_packet");
4239
4240 if (info.ignore)
4241 my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4242 (ulong) info.records, (ulong) (info.records - info.copied),
4243 (long) thd->get_stmt_da()->current_statement_warn_count());
4244 else
4245 my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4246 (ulong) info.records, (ulong) (info.deleted + info.updated),
4247 (long) thd->get_stmt_da()->current_statement_warn_count());
4248
4249 row_count= info.copied + info.deleted +
4250 ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
4251 info.touched : info.updated);
4252
4253 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
4254 thd->first_successful_insert_id_in_cur_stmt :
4255 (thd->arg_of_last_insert_id_function ?
4256 thd->first_successful_insert_id_in_prev_stmt :
4257 (info.copied ? autoinc_value_of_last_inserted_row : 0));
4258
4259 /*
4260 Client expects an EOF/OK packet If LEX::has_returning and if result set
4261 meta was sent. See explanation for other variants of INSERT.
4262 */
4263 if (sel_result)
4264 sel_result->send_eof();
4265 else
4266 ::my_ok(thd, row_count, id, message);
4267
4268 DBUG_RETURN(false);
4269 }
4270
send_eof()4271 bool select_insert::send_eof()
4272 {
4273 bool res;
4274 DBUG_ENTER("select_insert::send_eof");
4275 res= (prepare_eof() || (!suppress_my_ok && send_ok_packet()));
4276 DBUG_RETURN(res);
4277 }
4278
abort_result_set()4279 void select_insert::abort_result_set()
4280 {
4281 bool binary_logged= 0;
4282 DBUG_ENTER("select_insert::abort_result_set");
4283 /*
4284 If the creation of the table failed (due to a syntax error, for
4285 example), no table will have been opened and therefore 'table'
4286 will be NULL. In that case, we still need to execute the rollback
4287 and the end of the function.
4288
4289 If it fail due to inability to insert in multi-table view for example,
4290 table will be assigned with view table structure, but that table will
4291 not be opened really (it is dummy to check fields types & Co).
4292 */
4293 if (table && table->file->is_open())
4294 {
4295 bool changed, transactional_table;
4296 /*
4297 If we are not in prelocked mode, we end the bulk insert started
4298 before.
4299 */
4300 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4301 table->file->ha_end_bulk_insert();
4302
4303 if (table->file->inited)
4304 table->file->ha_rnd_end();
4305 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4306 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4307
4308 /*
4309 If at least one row has been inserted/modified and will stay in
4310 the table (the table doesn't have transactions) we must write to
4311 the binlog (and the error code will make the slave stop).
4312
4313 For many errors (example: we got a duplicate key error while
4314 inserting into a MyISAM table), no row will be added to the table,
4315 so passing the error to the slave will not help since there will
4316 be an error code mismatch (the inserts will succeed on the slave
4317 with no error).
4318
4319 If table creation failed, the number of rows modified will also be
4320 zero, so no check for that is made.
4321 */
4322 changed= (info.copied || info.deleted || info.updated);
4323 transactional_table= table->file->has_transactions_and_rollback();
4324 if (thd->transaction->stmt.modified_non_trans_table ||
4325 thd->log_current_statement)
4326 {
4327 if (!can_rollback_data())
4328 thd->transaction->all.modified_non_trans_table= TRUE;
4329
4330 if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4331 {
4332 int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4333 int res;
4334 /* error of writing binary log is ignored */
4335 res= thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
4336 thd->query_length(),
4337 transactional_table, FALSE, FALSE, errcode);
4338 binary_logged= res == 0 || !table->s->tmp_table;
4339 }
4340 if (changed)
4341 query_cache_invalidate3(thd, table, 1);
4342 }
4343 DBUG_ASSERT(transactional_table || !changed ||
4344 thd->transaction->stmt.modified_non_trans_table);
4345
4346 table->s->table_creation_was_logged|= binary_logged;
4347 table->file->ha_release_auto_increment();
4348 }
4349
4350 DBUG_VOID_RETURN;
4351 }
4352
4353
4354 /***************************************************************************
4355 CREATE TABLE (SELECT) ...
4356 ***************************************************************************/
4357
create_field_for_create_select(MEM_ROOT * root,TABLE * table)4358 Field *Item::create_field_for_create_select(MEM_ROOT *root, TABLE *table)
4359 {
4360 static Tmp_field_param param(false, false, false, false);
4361 Tmp_field_src src;
4362 return create_tmp_field_ex(root, table, &src, ¶m);
4363 }
4364
4365
4366 /**
4367 Create table from lists of fields and items (or just return TABLE
4368 object for pre-opened existing table).
4369
4370 @param thd [in] Thread object
4371 @param create_info [in] Create information (like MAX_ROWS, ENGINE or
4372 temporary table flag)
4373 @param create_table [in] Pointer to TABLE_LIST object providing database
4374 and name for table to be created or to be open
4375 @param alter_info [in/out] Initial list of columns and indexes for the
4376 table to be created
4377 @param items [in] List of items which should be used to produce
4378 rest of fields for the table (corresponding
4379 fields will be added to the end of
4380 alter_info->create_list)
4381 @param lock [out] Pointer to the MYSQL_LOCK object for table
4382 created will be returned in this parameter.
4383 Since this table is not included in THD::lock
4384 caller is responsible for explicitly unlocking
4385 this table.
4386 @param hooks [in] Hooks to be invoked before and after obtaining
4387 table lock on the table being created.
4388
4389 @note
4390 This function assumes that either table exists and was pre-opened and
4391 locked at open_and_lock_tables() stage (and in this case we just emit
4392 error or warning and return pre-opened TABLE object) or an exclusive
4393 metadata lock was acquired on table so we can safely create, open and
4394 lock table in it (we don't acquire metadata lock if this create is
4395 for temporary table).
4396
4397 @note
4398 Since this function contains some logic specific to CREATE TABLE ...
4399 SELECT it should be changed before it can be used in other contexts.
4400
4401 @retval non-zero Pointer to TABLE object for table created or opened
4402 @retval 0 Error
4403 */
4404
create_table_from_items(THD * thd,List<Item> * items,MYSQL_LOCK ** lock,TABLEOP_HOOKS * hooks)4405 TABLE *select_create::create_table_from_items(THD *thd, List<Item> *items,
4406 MYSQL_LOCK **lock, TABLEOP_HOOKS *hooks)
4407 {
4408 TABLE tmp_table; // Used during 'Create_field()'
4409 TABLE_SHARE share;
4410 TABLE *table= 0;
4411 uint select_field_count= items->elements;
4412 /* Add selected items to field list */
4413 List_iterator_fast<Item> it(*items);
4414 Item *item;
4415 bool save_table_creation_was_logged;
4416 DBUG_ENTER("select_create::create_table_from_items");
4417
4418 tmp_table.s= &share;
4419 init_tmp_table_share(thd, &share, "", 0, "", "");
4420
4421 tmp_table.s->db_create_options=0;
4422 tmp_table.null_row= 0;
4423 tmp_table.maybe_null= 0;
4424 tmp_table.in_use= thd;
4425
4426 if (!opt_explicit_defaults_for_timestamp)
4427 promote_first_timestamp_column(&alter_info->create_list);
4428
4429 if (create_info->fix_create_fields(thd, alter_info, *create_table))
4430 DBUG_RETURN(NULL);
4431
4432 while ((item=it++))
4433 {
4434 Field *tmp_field= item->create_field_for_create_select(thd->mem_root,
4435 &tmp_table);
4436
4437 if (!tmp_field)
4438 DBUG_RETURN(NULL);
4439
4440 Field *table_field;
4441
4442 switch (item->type())
4443 {
4444 /*
4445 We have to take into account both the real table's fields and
4446 pseudo-fields used in trigger's body. These fields are used
4447 to copy defaults values later inside constructor of
4448 the class Create_field.
4449 */
4450 case Item::FIELD_ITEM:
4451 case Item::TRIGGER_FIELD_ITEM:
4452 table_field= ((Item_field *) item)->field;
4453 break;
4454 default:
4455 table_field= NULL;
4456 }
4457
4458 Create_field *cr_field= new (thd->mem_root)
4459 Create_field(thd, tmp_field, table_field);
4460
4461 if (!cr_field)
4462 DBUG_RETURN(NULL);
4463
4464 if (item->maybe_null)
4465 cr_field->flags &= ~NOT_NULL_FLAG;
4466 alter_info->create_list.push_back(cr_field, thd->mem_root);
4467 }
4468
4469 if (create_info->check_fields(thd, alter_info,
4470 create_table->table_name,
4471 create_table->db,
4472 select_field_count))
4473 DBUG_RETURN(NULL);
4474
4475 DEBUG_SYNC(thd,"create_table_select_before_create");
4476
4477 /* Check if LOCK TABLES + CREATE OR REPLACE of existing normal table*/
4478 if (thd->locked_tables_mode && create_table->table &&
4479 !create_info->tmp_table())
4480 {
4481 /* Remember information about the locked table */
4482 create_info->pos_in_locked_tables=
4483 create_table->table->pos_in_locked_tables;
4484 create_info->mdl_ticket= create_table->table->mdl_ticket;
4485 }
4486
4487 /*
4488 Create and lock table.
4489
4490 Note that we either creating (or opening existing) temporary table or
4491 creating base table on which name we have exclusive lock. So code below
4492 should not cause deadlocks or races.
4493
4494 We don't log the statement, it will be logged later.
4495
4496 If this is a HEAP table, the automatic DELETE FROM which is written to the
4497 binlog when a HEAP table is opened for the first time since startup, must
4498 not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
4499 don't want to delete from it) 2) it would be written before the CREATE
4500 TABLE, which is a wrong order. So we keep binary logging disabled when we
4501 open_table().
4502 */
4503
4504 if (!mysql_create_table_no_lock(thd, &create_table->db,
4505 &create_table->table_name,
4506 create_info, alter_info, NULL,
4507 select_field_count, create_table))
4508 {
4509 DEBUG_SYNC(thd,"create_table_select_before_open");
4510
4511 /*
4512 If we had a temporary table or a table used with LOCK TABLES,
4513 it was closed by mysql_create()
4514 */
4515 create_table->table= 0;
4516
4517 if (!create_info->tmp_table())
4518 {
4519 Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
4520 TABLE_LIST::enum_open_strategy save_open_strategy;
4521
4522 /* Force the newly created table to be opened */
4523 save_open_strategy= create_table->open_strategy;
4524 create_table->open_strategy= TABLE_LIST::OPEN_NORMAL;
4525 /*
4526 Here we open the destination table, on which we already have
4527 an exclusive metadata lock.
4528 */
4529 if (open_table(thd, create_table, &ot_ctx))
4530 {
4531 quick_rm_table(thd, create_info->db_type, &create_table->db,
4532 table_case_name(create_info, &create_table->table_name),
4533 0);
4534 }
4535 /* Restore */
4536 create_table->open_strategy= save_open_strategy;
4537 }
4538 else
4539 {
4540 /*
4541 The pointer to the newly created temporary table has been stored in
4542 table->create_info.
4543 */
4544 create_table->table= create_info->table;
4545 if (!create_table->table)
4546 {
4547 /*
4548 This shouldn't happen as creation of temporary table should make
4549 it preparable for open. Anyway we can't drop temporary table if
4550 we are unable to find it.
4551 */
4552 DBUG_ASSERT(0);
4553 }
4554 }
4555 }
4556 else
4557 create_table->table= 0; // Create failed
4558
4559 if (unlikely(!(table= create_table->table)))
4560 {
4561 if (likely(!thd->is_error())) // CREATE ... IF NOT EXISTS
4562 my_ok(thd); // succeed, but did nothing
4563 DBUG_RETURN(NULL);
4564 }
4565
4566 DEBUG_SYNC(thd,"create_table_select_before_lock");
4567
4568 table->reginfo.lock_type=TL_WRITE;
4569 hooks->prelock(&table, 1); // Call prelock hooks
4570
4571 /*
4572 Ensure that decide_logging_format(), called by mysql_lock_tables(), works
4573 with temporary tables that will be logged later if needed.
4574 */
4575 save_table_creation_was_logged= table->s->table_creation_was_logged;
4576 table->s->table_creation_was_logged= 1;
4577
4578 /*
4579 mysql_lock_tables() below should never fail with request to reopen table
4580 since it won't wait for the table lock (we have exclusive metadata lock on
4581 the table) and thus can't get aborted.
4582 */
4583 if (unlikely(!((*lock)= mysql_lock_tables(thd, &table, 1, 0)) ||
4584 hooks->postlock(&table, 1)))
4585 {
4586 /* purecov: begin tested */
4587 /*
4588 This can happen in innodb when you get a deadlock when using same table
4589 in insert and select or when you run out of memory.
4590 It can also happen if there was a conflict in
4591 THD::decide_logging_format()
4592 */
4593 if (!thd->is_error())
4594 my_error(ER_CANT_LOCK, MYF(0), my_errno);
4595 if (*lock)
4596 {
4597 mysql_unlock_tables(thd, *lock);
4598 *lock= 0;
4599 }
4600 drop_open_table(thd, table, &create_table->db, &create_table->table_name);
4601 DBUG_RETURN(0);
4602 /* purecov: end */
4603 }
4604 table->s->table_creation_was_logged= save_table_creation_was_logged;
4605 if (!table->s->tmp_table)
4606 table->file->prepare_for_row_logging();
4607
4608 /*
4609 If slave is converting a statement event to row events, log the original
4610 create statement as an annotated row
4611 */
4612 #ifdef HAVE_REPLICATION
4613 if (thd->slave_thread && opt_replicate_annotate_row_events &&
4614 thd->is_current_stmt_binlog_format_row())
4615 thd->variables.binlog_annotate_row_events= 1;
4616 #endif
4617 DBUG_RETURN(table);
4618 }
4619
4620
4621 int
prepare(List<Item> & _values,SELECT_LEX_UNIT * u)4622 select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
4623 {
4624 List<Item> values(_values, thd->mem_root);
4625 MYSQL_LOCK *extra_lock= NULL;
4626 DBUG_ENTER("select_create::prepare");
4627
4628 TABLEOP_HOOKS *hook_ptr= NULL;
4629 /*
4630 For row-based replication, the CREATE-SELECT statement is written
4631 in two pieces: the first one contain the CREATE TABLE statement
4632 necessary to create the table and the second part contain the rows
4633 that should go into the table.
4634
4635 For non-temporary tables, the start of the CREATE-SELECT
4636 implicitly commits the previous transaction, and all events
4637 forming the statement will be stored the transaction cache. At end
4638 of the statement, the entire statement is committed as a
4639 transaction, and all events are written to the binary log.
4640
4641 On the master, the table is locked for the duration of the
4642 statement, but since the CREATE part is replicated as a simple
4643 statement, there is no way to lock the table for accesses on the
4644 slave. Hence, we have to hold on to the CREATE part of the
4645 statement until the statement has finished.
4646 */
4647 class MY_HOOKS : public TABLEOP_HOOKS {
4648 public:
4649 MY_HOOKS(select_create *x, TABLE_LIST *create_table_arg,
4650 TABLE_LIST *select_tables_arg)
4651 : ptr(x),
4652 create_table(create_table_arg),
4653 select_tables(select_tables_arg)
4654 {
4655 }
4656
4657 private:
4658 virtual int do_postlock(TABLE **tables, uint count)
4659 {
4660 int error;
4661 THD *thd= const_cast<THD*>(ptr->get_thd());
4662 TABLE_LIST *save_next_global= create_table->next_global;
4663
4664 create_table->next_global= select_tables;
4665
4666 error= thd->decide_logging_format(create_table);
4667
4668 create_table->next_global= save_next_global;
4669
4670 if (unlikely(error))
4671 return error;
4672
4673 TABLE const *const table = *tables;
4674 if (thd->is_current_stmt_binlog_format_row() &&
4675 !table->s->tmp_table)
4676 return binlog_show_create_table(thd, *tables, ptr->create_info);
4677 return 0;
4678 }
4679 select_create *ptr;
4680 TABLE_LIST *create_table;
4681 TABLE_LIST *select_tables;
4682 };
4683
4684 MY_HOOKS hooks(this, create_table, select_tables);
4685 hook_ptr= &hooks;
4686
4687 unit= u;
4688
4689 /*
4690 Start a statement transaction before the create if we are using
4691 row-based replication for the statement. If we are creating a
4692 temporary table, we need to start a statement transaction.
4693 */
4694 if (!thd->lex->tmp_table() &&
4695 thd->is_current_stmt_binlog_format_row() &&
4696 mysql_bin_log.is_open())
4697 {
4698 thd->binlog_start_trans_and_stmt();
4699 }
4700
4701 if (!(table= create_table_from_items(thd, &values, &extra_lock, hook_ptr)))
4702 {
4703 if (create_info->or_replace())
4704 {
4705 /* Original table was deleted. We have to log it */
4706 log_drop_table(thd, &create_table->db, &create_table->table_name,
4707 thd->lex->tmp_table());
4708 }
4709
4710 /* abort() deletes table */
4711 DBUG_RETURN(-1);
4712 }
4713
4714 if (create_info->tmp_table())
4715 {
4716 /*
4717 When the temporary table was created & opened in create_table_impl(),
4718 the table's TABLE_SHARE (and thus TABLE) object was also linked to THD
4719 temporary tables lists. So, we must temporarily remove it from the
4720 list to keep them inaccessible from inner statements.
4721 e.g. CREATE TEMPORARY TABLE `t1` AS SELECT * FROM `t1`;
4722 */
4723 saved_tmp_table_share= thd->save_tmp_table_share(create_table->table);
4724 }
4725
4726 if (extra_lock)
4727 {
4728 DBUG_ASSERT(m_plock == NULL);
4729
4730 if (create_info->tmp_table())
4731 m_plock= &m_lock;
4732 else
4733 m_plock= &thd->extra_lock;
4734
4735 *m_plock= extra_lock;
4736 }
4737
4738 if (table->s->fields < values.elements)
4739 {
4740 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
4741 DBUG_RETURN(-1);
4742 }
4743
4744 /* First field to copy */
4745 field= table->field+table->s->fields;
4746
4747 /* Mark all fields that are given values */
4748 for (uint n= values.elements; n; )
4749 {
4750 if ((*--field)->invisible >= INVISIBLE_SYSTEM)
4751 continue;
4752 n--;
4753 bitmap_set_bit(table->write_set, (*field)->field_index);
4754 }
4755
4756 table->next_number_field=table->found_next_number_field;
4757
4758 restore_record(table,s->default_values); // Get empty record
4759 thd->cuted_fields=0;
4760 bool create_lookup_handler= info.handle_duplicates != DUP_ERROR;
4761 if (info.ignore || info.handle_duplicates != DUP_ERROR)
4762 {
4763 create_lookup_handler= true;
4764 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
4765 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
4766 {
4767 if (table->file->ha_rnd_init_with_error(0))
4768 DBUG_RETURN(1);
4769 }
4770 }
4771 table->file->prepare_for_insert(create_lookup_handler);
4772 if (info.handle_duplicates == DUP_REPLACE &&
4773 (!table->triggers || !table->triggers->has_delete_triggers()))
4774 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
4775 if (info.handle_duplicates == DUP_UPDATE)
4776 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
4777 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4778 table->file->ha_start_bulk_insert((ha_rows) 0);
4779 thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
4780 if (check_that_all_fields_are_given_values(thd, table, table_list))
4781 DBUG_RETURN(1);
4782 table->mark_columns_needed_for_insert();
4783 table->file->extra(HA_EXTRA_WRITE_CACHE);
4784 // Mark table as used
4785 table->query_id= thd->query_id;
4786 DBUG_RETURN(0);
4787 }
4788
4789
binlog_show_create_table(THD * thd,TABLE * table,Table_specification_st * create_info)4790 static int binlog_show_create_table(THD *thd, TABLE *table,
4791 Table_specification_st *create_info)
4792 {
4793 /*
4794 Note 1: In RBR mode, we generate a CREATE TABLE statement for the
4795 created table by calling show_create_table(). In the event of an error,
4796 nothing should be written to the binary log, even if the table is
4797 non-transactional; therefore we pretend that the generated CREATE TABLE
4798 statement is for a transactional table. The event will then be put in the
4799 transaction cache, and any subsequent events (e.g., table-map events and
4800 binrow events) will also be put there. We can then use
4801 ha_autocommit_or_rollback() to either throw away the entire kaboodle of
4802 events, or write them to the binary log.
4803
4804 We write the CREATE TABLE statement here and not in prepare()
4805 since there potentially are sub-selects or accesses to information
4806 schema that will do a close_thread_tables(), destroying the
4807 statement transaction cache.
4808 */
4809 DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
4810 StringBuffer<2048> query(system_charset_info);
4811 int result;
4812 TABLE_LIST tmp_table_list;
4813
4814 tmp_table_list.reset();
4815 tmp_table_list.table = table;
4816
4817 result= show_create_table(thd, &tmp_table_list, &query,
4818 create_info, WITH_DB_NAME);
4819 DBUG_ASSERT(result == 0); /* show_create_table() always return 0 */
4820
4821 if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4822 {
4823 int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4824 result= thd->binlog_query(THD::STMT_QUERY_TYPE,
4825 query.ptr(), query.length(),
4826 /* is_trans */ TRUE,
4827 /* direct */ FALSE,
4828 /* suppress_use */ FALSE,
4829 errcode) > 0;
4830 }
4831 #ifdef WITH_WSREP
4832 if (thd->wsrep_trx().active())
4833 {
4834 WSREP_DEBUG("transaction already started for CTAS");
4835 }
4836 else
4837 {
4838 wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
4839 }
4840 #endif
4841 return result;
4842 }
4843
4844
4845 /**
4846 Log CREATE TABLE to binary log
4847
4848 @param thd Thread handler
4849 @param table Log create statement for this table
4850
4851 This function is called from ALTER TABLE for a shared table converted
4852 to a not shared table.
4853 */
4854
binlog_create_table(THD * thd,TABLE * table,bool replace)4855 bool binlog_create_table(THD *thd, TABLE *table, bool replace)
4856 {
4857 Table_specification_st create_info;
4858 bool result;
4859 ulonglong save_option_bits;
4860
4861 /* Don't log temporary tables in row format */
4862 if (thd->variables.binlog_format == BINLOG_FORMAT_ROW &&
4863 table->s->tmp_table)
4864 return 0;
4865 if (!thd->binlog_table_should_be_logged(&table->s->db))
4866 return 0;
4867
4868 /*
4869 We have to use ROW format to ensure that future row inserts will be
4870 logged
4871 */
4872 thd->set_current_stmt_binlog_format_row();
4873 table->file->prepare_for_row_logging();
4874
4875 create_info.lex_start();
4876 save_option_bits= thd->variables.option_bits;
4877 if (replace)
4878 create_info.set(DDL_options_st::OPT_OR_REPLACE);
4879 /* Ensure we write ENGINE=xxx and CHARSET=... to binary log */
4880 create_info.used_fields|= (HA_CREATE_USED_ENGINE |
4881 HA_CREATE_USED_DEFAULT_CHARSET);
4882 /* Ensure we write all engine options to binary log */
4883 create_info.used_fields|= HA_CREATE_PRINT_ALL_OPTIONS;
4884 result= binlog_show_create_table(thd, table, &create_info) != 0;
4885 thd->variables.option_bits= save_option_bits;
4886 return result;
4887 }
4888
4889
4890 /**
4891 Log DROP TABLE to binary log
4892
4893 @param thd Thread handler
4894 @param table Log create statement for this table
4895
4896 This function is called from ALTER TABLE for a shared table converted
4897 to a not shared table.
4898 */
4899
binlog_drop_table(THD * thd,TABLE * table)4900 bool binlog_drop_table(THD *thd, TABLE *table)
4901 {
4902 StringBuffer<2048> query(system_charset_info);
4903 /* Don't log temporary tables in row format */
4904 if (!table->s->table_creation_was_logged)
4905 return 0;
4906 if (!thd->binlog_table_should_be_logged(&table->s->db))
4907 return 0;
4908
4909 query.append("DROP ");
4910 if (table->s->tmp_table)
4911 query.append("TEMPORARY ");
4912 query.append("TABLE IF EXISTS ");
4913 append_identifier(thd, &query, &table->s->db);
4914 query.append(".");
4915 append_identifier(thd, &query, &table->s->table_name);
4916
4917 return thd->binlog_query(THD::STMT_QUERY_TYPE,
4918 query.ptr(), query.length(),
4919 /* is_trans */ TRUE,
4920 /* direct */ FALSE,
4921 /* suppress_use */ TRUE,
4922 0) > 0;
4923 }
4924
4925
store_values(List<Item> & values)4926 void select_create::store_values(List<Item> &values)
4927 {
4928 fill_record_n_invoke_before_triggers(thd, table, field, values, 1,
4929 TRG_EVENT_INSERT);
4930 }
4931
4932
send_eof()4933 bool select_create::send_eof()
4934 {
4935 DBUG_ENTER("select_create::send_eof");
4936
4937 /*
4938 The routine that writes the statement in the binary log
4939 is in select_insert::prepare_eof(). For that reason, we
4940 mark the flag at this point.
4941 */
4942 if (table->s->tmp_table)
4943 thd->transaction->stmt.mark_created_temp_table();
4944
4945 if (thd->slave_thread)
4946 thd->variables.binlog_annotate_row_events= 0;
4947
4948 if (prepare_eof())
4949 {
4950 abort_result_set();
4951 DBUG_RETURN(true);
4952 }
4953
4954 if (table->s->tmp_table)
4955 {
4956 /*
4957 Now is good time to add the new table to THD temporary tables list.
4958 But, before that we need to check if same table got created by the sub-
4959 statement.
4960 */
4961 if (thd->find_tmp_table_share(table->s->table_cache_key.str,
4962 table->s->table_cache_key.length))
4963 {
4964 my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table->alias.c_ptr());
4965 abort_result_set();
4966 DBUG_RETURN(true);
4967 }
4968 else
4969 {
4970 DBUG_ASSERT(saved_tmp_table_share);
4971 thd->restore_tmp_table_share(saved_tmp_table_share);
4972 }
4973 }
4974
4975 /*
4976 Do an implicit commit at end of statement for non-temporary
4977 tables. This can fail, but we should unlock the table
4978 nevertheless.
4979 */
4980 if (!table->s->tmp_table)
4981 {
4982 #ifdef WITH_WSREP
4983 if (WSREP(thd) &&
4984 table->file->ht->db_type == DB_TYPE_INNODB)
4985 {
4986 if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
4987 {
4988 wsrep_start_transaction(thd, thd->wsrep_next_trx_id());
4989 }
4990 DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
4991 WSREP_DEBUG("CTAS key append for trx: %" PRIu64 " thd %llu query %lld ",
4992 thd->wsrep_trx_id(), thd->thread_id, thd->query_id);
4993
4994 /*
4995 append table level exclusive key for CTAS
4996 */
4997 wsrep_key_arr_t key_arr= {0, 0};
4998 wsrep_prepare_keys_for_isolation(thd,
4999 create_table->db.str,
5000 create_table->table_name.str,
5001 table_list,
5002 &key_arr);
5003 int rcode= wsrep_thd_append_key(thd, key_arr.keys, key_arr.keys_len,
5004 WSREP_SERVICE_KEY_EXCLUSIVE);
5005 wsrep_keys_free(&key_arr);
5006 if (rcode)
5007 {
5008 DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
5009 WSREP_ERROR("Appending table key for CTAS failed: %s, %d",
5010 (wsrep_thd_query(thd)) ?
5011 wsrep_thd_query(thd) : "void", rcode);
5012 abort_result_set();
5013 DBUG_RETURN(true);
5014 }
5015 /* If commit fails, we should be able to reset the OK status. */
5016 thd->get_stmt_da()->set_overwrite_status(true);
5017 }
5018 #endif /* WITH_WSREP */
5019 trans_commit_stmt(thd);
5020 if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
5021 trans_commit_implicit(thd);
5022 #ifdef WITH_WSREP
5023 if (WSREP(thd))
5024 {
5025 thd->get_stmt_da()->set_overwrite_status(FALSE);
5026 mysql_mutex_lock(&thd->LOCK_thd_data);
5027 if (wsrep_current_error(thd))
5028 {
5029 WSREP_DEBUG("select_create commit failed, thd: %llu err: %s %s",
5030 thd->thread_id,
5031 wsrep_thd_transaction_state_str(thd), wsrep_thd_query(thd));
5032 mysql_mutex_unlock(&thd->LOCK_thd_data);
5033 abort_result_set();
5034 DBUG_RETURN(true);
5035 }
5036 mysql_mutex_unlock(&thd->LOCK_thd_data);
5037 }
5038 #endif /* WITH_WSREP */
5039 }
5040
5041 /*
5042 exit_done must only be set after last potential call to
5043 abort_result_set().
5044 */
5045 exit_done= 1; // Avoid double calls
5046
5047 send_ok_packet();
5048
5049 if (m_plock)
5050 {
5051 MYSQL_LOCK *lock= *m_plock;
5052 *m_plock= NULL;
5053 m_plock= NULL;
5054
5055 if (create_info->pos_in_locked_tables)
5056 {
5057 /*
5058 If we are under lock tables, we have created a table that was
5059 originally locked. We should add back the lock to ensure that
5060 all tables in the thd->open_list are locked!
5061 */
5062 table->mdl_ticket= create_info->mdl_ticket;
5063
5064 /* The following should never fail, except if out of memory */
5065 if (!thd->locked_tables_list.restore_lock(thd,
5066 create_info->
5067 pos_in_locked_tables,
5068 table, lock))
5069 DBUG_RETURN(false); // ok
5070 /* Fail. Continue without locking the table */
5071 }
5072 mysql_unlock_tables(thd, lock);
5073 }
5074 DBUG_RETURN(false);
5075 }
5076
5077
abort_result_set()5078 void select_create::abort_result_set()
5079 {
5080 ulonglong save_option_bits;
5081 DBUG_ENTER("select_create::abort_result_set");
5082
5083 /* Avoid double calls, could happen in case of out of memory on cleanup */
5084 if (exit_done)
5085 DBUG_VOID_RETURN;
5086 exit_done= 1;
5087
5088 /*
5089 In select_insert::abort_result_set() we roll back the statement, including
5090 truncating the transaction cache of the binary log. To do this, we
5091 pretend that the statement is transactional, even though it might
5092 be the case that it was not.
5093
5094 We roll back the statement prior to deleting the table and prior
5095 to releasing the lock on the table, since there might be potential
5096 for failure if the rollback is executed after the drop or after
5097 unlocking the table.
5098
5099 We also roll back the statement regardless of whether the creation
5100 of the table succeeded or not, since we need to reset the binary
5101 log state.
5102
5103 However if there was an original table that was deleted, as part of
5104 create or replace table, then we must log the statement.
5105 */
5106
5107 save_option_bits= thd->variables.option_bits;
5108 thd->variables.option_bits&= ~OPTION_BIN_LOG;
5109 select_insert::abort_result_set();
5110 thd->transaction->stmt.modified_non_trans_table= FALSE;
5111 thd->variables.option_bits= save_option_bits;
5112
5113 /* possible error of writing binary log is ignored deliberately */
5114 (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
5115
5116 if (table)
5117 {
5118 bool tmp_table= table->s->tmp_table;
5119 bool table_creation_was_logged= (!tmp_table ||
5120 table->s->table_creation_was_logged);
5121 if (tmp_table)
5122 {
5123 DBUG_ASSERT(saved_tmp_table_share);
5124 thd->restore_tmp_table_share(saved_tmp_table_share);
5125 }
5126
5127 if (table->file->inited &&
5128 (info.ignore || info.handle_duplicates != DUP_ERROR) &&
5129 (table->file->ha_table_flags() & HA_DUPLICATE_POS))
5130 table->file->ha_rnd_end();
5131 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
5132 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
5133 table->auto_increment_field_not_null= FALSE;
5134
5135 if (m_plock)
5136 {
5137 mysql_unlock_tables(thd, *m_plock);
5138 *m_plock= NULL;
5139 m_plock= NULL;
5140 }
5141
5142 drop_open_table(thd, table, &create_table->db, &create_table->table_name);
5143 table=0; // Safety
5144 if (thd->log_current_statement && mysql_bin_log.is_open())
5145 {
5146 /* Remove logging of drop, create + insert rows */
5147 binlog_reset_cache(thd);
5148 /* Original table was deleted. We have to log it */
5149 if (table_creation_was_logged)
5150 log_drop_table(thd, &create_table->db, &create_table->table_name,
5151 tmp_table);
5152 }
5153 }
5154
5155 if (create_info->table_was_deleted)
5156 {
5157 /* Unlock locked table that was dropped by CREATE. */
5158 (void) trans_rollback_stmt(thd);
5159 thd->locked_tables_list.unlock_locked_table(thd, create_info->mdl_ticket);
5160 }
5161
5162 DBUG_VOID_RETURN;
5163 }
5164