1 /*
2 Copyright (c) 2000, 2016, Oracle and/or its affiliates.
3 Copyright (c) 2010, 2019, MariaDB Corporation
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
17 */
18
19 /* Insert of records */
20
21 /*
22 INSERT DELAYED
23
24 Insert delayed is distinguished from a normal insert by lock_type ==
25 TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
26 "delayed" table (delayed_get_table()), but falls back to
27 open_and_lock_tables() on error and proceeds as normal insert then.
28
29 Opening a "delayed" table means to find a delayed insert thread that
30 has the table open already. If this fails, a new thread is created and
31 waited for to open and lock the table.
32
33 If accessing the thread succeeded, in
34 Delayed_insert::get_local_table() the table of the thread is copied
35 for local use. A copy is required because the normal insert logic
36 works on a target table, but the other threads table object must not
37 be used. The insert logic uses the record buffer to create a record.
38 And the delayed insert thread uses the record buffer to pass the
39 record to the table handler. So there must be different objects. Also
40 the copied table is not included in the lock, so that the statement
41 can proceed even if the real table cannot be accessed at this moment.
42
43 Copying a table object is not a trivial operation. Besides the TABLE
44 object there are the field pointer array, the field objects and the
45 record buffer. After copying the field objects, their pointers into
46 the record must be "moved" to point to the new record buffer.
47
48 After this setup the normal insert logic is used. Only that for
49 delayed inserts write_delayed() is called instead of write_record().
50 It inserts the rows into a queue and signals the delayed insert thread
51 instead of writing directly to the table.
52
53 The delayed insert thread awakes from the signal. It locks the table,
54 inserts the rows from the queue, unlocks the table, and waits for the
55 next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
56
57 */
58
59 #include "mariadb.h" /* NO_EMBEDDED_ACCESS_CHECKS */
60 #include "sql_priv.h"
61 #include "sql_insert.h"
62 #include "sql_update.h" // compare_record
63 #include "sql_base.h" // close_thread_tables
64 #include "sql_cache.h" // query_cache_*
65 #include "key.h" // key_copy
66 #include "lock.h" // mysql_unlock_tables
67 #include "sp_head.h"
68 #include "sql_view.h" // check_key_in_view, insert_view_fields
69 #include "sql_table.h" // mysql_create_table_no_lock
70 #include "sql_acl.h" // *_ACL, check_grant_all_columns
71 #include "sql_trigger.h"
72 #include "sql_select.h"
73 #include "sql_show.h"
74 #include "slave.h"
75 #include "sql_parse.h" // end_active_trans
76 #include "rpl_mi.h"
77 #include "transaction.h"
78 #include "sql_audit.h"
79 #include "sql_derived.h" // mysql_handle_derived
80 #include "sql_prepare.h"
81 #include <my_bit.h>
82
83 #include "debug_sync.h"
84
85 #ifndef EMBEDDED_LIBRARY
86 static bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
87 TABLE_LIST *table_list);
88 static int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
89 LEX_STRING query, bool ignore, bool log_on);
90 static void end_delayed_insert(THD *thd);
91 pthread_handler_t handle_delayed_insert(void *arg);
92 static void unlink_blobs(TABLE *table);
93 #endif
94 static bool check_view_insertability(THD *thd, TABLE_LIST *view);
95
96 /*
97 Check that insert/update fields are from the same single table of a view.
98
99 @param fields The insert/update fields to be checked.
100 @param values The insert/update values to be checked, NULL if
101 checking is not wanted.
102 @param view The view for insert.
103 @param map [in/out] The insert table map.
104
105 This function is called in 2 cases:
106 1. to check insert fields. In this case *map will be set to 0.
107 Insert fields are checked to be all from the same single underlying
108 table of the given view. Otherwise the error is thrown. Found table
109 map is returned in the map parameter.
110 2. to check update fields of the ON DUPLICATE KEY UPDATE clause.
111 In this case *map contains table_map found on the previous call of
112 the function to check insert fields. Update fields are checked to be
113 from the same table as the insert fields.
114
115 @returns false if success.
116 */
117
check_view_single_update(List<Item> & fields,List<Item> * values,TABLE_LIST * view,table_map * map,bool insert)118 static bool check_view_single_update(List<Item> &fields, List<Item> *values,
119 TABLE_LIST *view, table_map *map,
120 bool insert)
121 {
122 /* it is join view => we need to find the table for update */
123 List_iterator_fast<Item> it(fields);
124 Item *item;
125 TABLE_LIST *tbl= 0; // reset for call to check_single_table()
126 table_map tables= 0;
127
128 while ((item= it++))
129 tables|= item->used_tables();
130
131 /*
132 Check that table is only one
133 (we can not rely on check_single_table because it skips some
134 types of tables)
135 */
136 if (my_count_bits(tables) > 1)
137 goto error;
138
139 if (values)
140 {
141 it.init(*values);
142 while ((item= it++))
143 tables|= item->view_used_tables(view);
144 }
145
146 /* Convert to real table bits */
147 tables&= ~PSEUDO_TABLE_BITS;
148
149 /* Check found map against provided map */
150 if (*map)
151 {
152 if (tables != *map)
153 goto error;
154 return FALSE;
155 }
156
157 if (view->check_single_table(&tbl, tables, view) || tbl == 0)
158 goto error;
159
160 /* view->table should have been set in mysql_derived_merge_for_insert */
161 DBUG_ASSERT(view->table);
162
163 /*
164 Use buffer for the insert values that was allocated for the merged view.
165 */
166 tbl->table->insert_values= view->table->insert_values;
167 view->table= tbl->table;
168 if (!tbl->single_table_updatable())
169 {
170 if (insert)
171 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), view->alias.str, "INSERT");
172 else
173 my_error(ER_NON_UPDATABLE_TABLE, MYF(0), view->alias.str, "UPDATE");
174 return TRUE;
175 }
176 *map= tables;
177
178 return FALSE;
179
180 error:
181 my_error(ER_VIEW_MULTIUPDATE, MYF(0),
182 view->view_db.str, view->view_name.str);
183 return TRUE;
184 }
185
186
187 /*
188 Check if insert fields are correct.
189
190 @param thd The current thread.
191 @param table_list The table we are inserting into (may be view)
192 @param fields The insert fields.
193 @param values The insert values.
194 @param check_unique If duplicate values should be rejected.
195 @param fields_and_values_from_different_maps If 'values' are allowed to
196 refer to other tables than those of 'fields'
197 @param map See check_view_single_update
198
199 @returns 0 if success, -1 if error
200 */
201
check_insert_fields(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<Item> & values,bool check_unique,bool fields_and_values_from_different_maps,table_map * map)202 static int check_insert_fields(THD *thd, TABLE_LIST *table_list,
203 List<Item> &fields, List<Item> &values,
204 bool check_unique,
205 bool fields_and_values_from_different_maps,
206 table_map *map)
207 {
208 TABLE *table= table_list->table;
209 DBUG_ENTER("check_insert_fields");
210
211 if (!table_list->single_table_updatable())
212 {
213 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
214 DBUG_RETURN(-1);
215 }
216
217 if (fields.elements == 0 && values.elements != 0)
218 {
219 if (!table)
220 {
221 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
222 table_list->view_db.str, table_list->view_name.str);
223 DBUG_RETURN(-1);
224 }
225 if (values.elements != table->s->visible_fields)
226 {
227 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
228 DBUG_RETURN(-1);
229 }
230 #ifndef NO_EMBEDDED_ACCESS_CHECKS
231 Field_iterator_table_ref field_it;
232 field_it.set(table_list);
233 if (check_grant_all_columns(thd, INSERT_ACL, &field_it))
234 DBUG_RETURN(-1);
235 #endif
236 /*
237 No fields are provided so all fields must be provided in the values.
238 Thus we set all bits in the write set.
239 */
240 bitmap_set_all(table->write_set);
241 }
242 else
243 { // Part field list
244 SELECT_LEX *select_lex= &thd->lex->select_lex;
245 Name_resolution_context *context= &select_lex->context;
246 Name_resolution_context_state ctx_state;
247 int res;
248
249 if (fields.elements != values.elements)
250 {
251 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
252 DBUG_RETURN(-1);
253 }
254
255 thd->dup_field= 0;
256 select_lex->no_wrap_view_item= TRUE;
257
258 /* Save the state of the current name resolution context. */
259 ctx_state.save_state(context, table_list);
260
261 /*
262 Perform name resolution only in the first table - 'table_list',
263 which is the table that is inserted into.
264 */
265 table_list->next_local= 0;
266 context->resolve_in_table_list_only(table_list);
267 /* 'Unfix' fields to allow correct marking by the setup_fields function. */
268 if (table_list->is_view())
269 unfix_fields(fields);
270
271 res= setup_fields(thd, Ref_ptr_array(),
272 fields, MARK_COLUMNS_WRITE, 0, NULL, 0);
273
274 /* Restore the current context. */
275 ctx_state.restore_state(context, table_list);
276 thd->lex->select_lex.no_wrap_view_item= FALSE;
277
278 if (res)
279 DBUG_RETURN(-1);
280
281 if (table_list->is_view() && table_list->is_merged_derived())
282 {
283 if (check_view_single_update(fields,
284 fields_and_values_from_different_maps ?
285 (List<Item>*) 0 : &values,
286 table_list, map, true))
287 DBUG_RETURN(-1);
288 table= table_list->table;
289 }
290
291 if (check_unique && thd->dup_field)
292 {
293 my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0),
294 thd->dup_field->field_name.str);
295 DBUG_RETURN(-1);
296 }
297 }
298 // For the values we need select_priv
299 #ifndef NO_EMBEDDED_ACCESS_CHECKS
300 table->grant.want_privilege= (SELECT_ACL & ~table->grant.privilege);
301 #endif
302
303 if (check_key_in_view(thd, table_list) ||
304 (table_list->view &&
305 check_view_insertability(thd, table_list)))
306 {
307 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
308 DBUG_RETURN(-1);
309 }
310
311 DBUG_RETURN(0);
312 }
313
has_no_default_value(THD * thd,Field * field,TABLE_LIST * table_list)314 static bool has_no_default_value(THD *thd, Field *field, TABLE_LIST *table_list)
315 {
316 if ((field->flags & NO_DEFAULT_VALUE_FLAG) && field->real_type() != MYSQL_TYPE_ENUM)
317 {
318 bool view= false;
319 if (table_list)
320 {
321 table_list= table_list->top_table();
322 view= table_list->view != NULL;
323 }
324 if (view)
325 {
326 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_VIEW_FIELD,
327 ER_THD(thd, ER_NO_DEFAULT_FOR_VIEW_FIELD),
328 table_list->view_db.str, table_list->view_name.str);
329 }
330 else
331 {
332 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_NO_DEFAULT_FOR_FIELD,
333 ER_THD(thd, ER_NO_DEFAULT_FOR_FIELD),
334 field->field_name.str);
335 }
336 return thd->really_abort_on_warning();
337 }
338 return false;
339 }
340
341
342 /**
343 Check if update fields are correct.
344
345 @param thd The current thread.
346 @param insert_table_list The table we are inserting into (may be view)
347 @param update_fields The update fields.
348 @param update_values The update values.
349 @param fields_and_values_from_different_maps If 'update_values' are allowed to
350 refer to other tables than those of 'update_fields'
351 @param map See check_view_single_update
352
353 @note
354 If the update fields include an autoinc field, set the
355 table->next_number_field_updated flag.
356
357 @returns 0 if success, -1 if error
358 */
359
check_update_fields(THD * thd,TABLE_LIST * insert_table_list,List<Item> & update_fields,List<Item> & update_values,bool fields_and_values_from_different_maps,table_map * map)360 static int check_update_fields(THD *thd, TABLE_LIST *insert_table_list,
361 List<Item> &update_fields,
362 List<Item> &update_values,
363 bool fields_and_values_from_different_maps,
364 table_map *map)
365 {
366 TABLE *table= insert_table_list->table;
367 my_bool UNINIT_VAR(autoinc_mark);
368
369 table->next_number_field_updated= FALSE;
370
371 if (table->found_next_number_field)
372 {
373 /*
374 Unmark the auto_increment field so that we can check if this is modified
375 by update_fields
376 */
377 autoinc_mark= bitmap_test_and_clear(table->write_set,
378 table->found_next_number_field->
379 field_index);
380 }
381
382 /* Check the fields we are going to modify */
383 if (setup_fields(thd, Ref_ptr_array(),
384 update_fields, MARK_COLUMNS_WRITE, 0, NULL, 0))
385 return -1;
386
387 if (insert_table_list->is_view() &&
388 insert_table_list->is_merged_derived() &&
389 check_view_single_update(update_fields,
390 fields_and_values_from_different_maps ?
391 (List<Item>*) 0 : &update_values,
392 insert_table_list, map, false))
393 return -1;
394
395 if (table->default_field)
396 table->mark_default_fields_for_write(FALSE);
397
398 if (table->found_next_number_field)
399 {
400 if (bitmap_is_set(table->write_set,
401 table->found_next_number_field->field_index))
402 table->next_number_field_updated= TRUE;
403
404 if (autoinc_mark)
405 bitmap_set_bit(table->write_set,
406 table->found_next_number_field->field_index);
407 }
408
409 return 0;
410 }
411
412 /**
413 Upgrade table-level lock of INSERT statement to TL_WRITE if
414 a more concurrent lock is infeasible for some reason. This is
415 necessary for engines without internal locking support (MyISAM).
416 An engine with internal locking implementation might later
417 downgrade the lock in handler::store_lock() method.
418 */
419
420 static
upgrade_lock_type(THD * thd,thr_lock_type * lock_type,enum_duplicates duplic)421 void upgrade_lock_type(THD *thd, thr_lock_type *lock_type,
422 enum_duplicates duplic)
423 {
424 if (duplic == DUP_UPDATE ||
425 (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
426 {
427 *lock_type= TL_WRITE_DEFAULT;
428 return;
429 }
430
431 if (*lock_type == TL_WRITE_DELAYED)
432 {
433 /*
434 We do not use delayed threads if:
435 - we're running in the safe mode or skip-new mode -- the
436 feature is disabled in these modes
437 - we're executing this statement on a replication slave --
438 we need to ensure serial execution of queries on the
439 slave
440 - it is INSERT .. ON DUPLICATE KEY UPDATE - in this case the
441 insert cannot be concurrent
442 - this statement is directly or indirectly invoked from
443 a stored function or trigger (under pre-locking) - to
444 avoid deadlocks, since INSERT DELAYED involves a lock
445 upgrade (TL_WRITE_DELAYED -> TL_WRITE) which we should not
446 attempt while keeping other table level locks.
447 - this statement itself may require pre-locking.
448 We should upgrade the lock even though in most cases
449 delayed functionality may work. Unfortunately, we can't
450 easily identify whether the subject table is not used in
451 the statement indirectly via a stored function or trigger:
452 if it is used, that will lead to a deadlock between the
453 client connection and the delayed thread.
454 */
455 if (specialflag & (SPECIAL_NO_NEW_FUNC | SPECIAL_SAFE_MODE) ||
456 thd->variables.max_insert_delayed_threads == 0 ||
457 thd->locked_tables_mode > LTM_LOCK_TABLES ||
458 thd->lex->uses_stored_routines() /*||
459 thd->lex->describe*/)
460 {
461 *lock_type= TL_WRITE;
462 return;
463 }
464 if (thd->slave_thread)
465 {
466 /* Try concurrent insert */
467 *lock_type= (duplic == DUP_UPDATE || duplic == DUP_REPLACE) ?
468 TL_WRITE : TL_WRITE_CONCURRENT_INSERT;
469 return;
470 }
471
472 bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
473 if (global_system_variables.binlog_format == BINLOG_FORMAT_STMT &&
474 log_on && mysql_bin_log.is_open())
475 {
476 /*
477 Statement-based binary logging does not work in this case, because:
478 a) two concurrent statements may have their rows intermixed in the
479 queue, leading to autoincrement replication problems on slave (because
480 the values generated used for one statement don't depend only on the
481 value generated for the first row of this statement, so are not
482 replicable)
483 b) if first row of the statement has an error the full statement is
484 not binlogged, while next rows of the statement may be inserted.
485 c) if first row succeeds, statement is binlogged immediately with a
486 zero error code (i.e. "no error"), if then second row fails, query
487 will fail on slave too and slave will stop (wrongly believing that the
488 master got no error).
489 So we fallback to non-delayed INSERT.
490 Note that to be fully correct, we should test the "binlog format which
491 the delayed thread is going to use for this row". But in the common case
492 where the global binlog format is not changed and the session binlog
493 format may be changed, that is equal to the global binlog format.
494 We test it without mutex for speed reasons (condition rarely true), and
495 in the common case (global not changed) it is as good as without mutex;
496 if global value is changed, anyway there is uncertainty as the delayed
497 thread may be old and use the before-the-change value.
498 */
499 *lock_type= TL_WRITE;
500 }
501 }
502 }
503
504
505 /**
506 Find or create a delayed insert thread for the first table in
507 the table list, then open and lock the remaining tables.
508 If a table can not be used with insert delayed, upgrade the lock
509 and open and lock all tables using the standard mechanism.
510
511 @param thd thread context
512 @param table_list list of "descriptors" for tables referenced
513 directly in statement SQL text.
514 The first element in the list corresponds to
515 the destination table for inserts, remaining
516 tables, if any, are usually tables referenced
517 by sub-queries in the right part of the
518 INSERT.
519
520 @return Status of the operation. In case of success 'table'
521 member of every table_list element points to an instance of
522 class TABLE.
523
524 @sa open_and_lock_tables for more information about MySQL table
525 level locking
526 */
527
528 static
open_and_lock_for_insert_delayed(THD * thd,TABLE_LIST * table_list)529 bool open_and_lock_for_insert_delayed(THD *thd, TABLE_LIST *table_list)
530 {
531 MDL_request protection_request;
532 DBUG_ENTER("open_and_lock_for_insert_delayed");
533
534 #ifndef EMBEDDED_LIBRARY
535 /* INSERT DELAYED is not allowed in a read only transaction. */
536 if (thd->tx_read_only)
537 {
538 my_error(ER_CANT_EXECUTE_IN_READ_ONLY_TRANSACTION, MYF(0));
539 DBUG_RETURN(true);
540 }
541
542 /*
543 In order for the deadlock detector to be able to find any deadlocks
544 caused by the handler thread waiting for GRL or this table, we acquire
545 protection against GRL (global IX metadata lock) and metadata lock on
546 table to being inserted into inside the connection thread.
547 If this goes ok, the tickets are cloned and added to the list of granted
548 locks held by the handler thread.
549 */
550 if (thd->global_read_lock.can_acquire_protection())
551 DBUG_RETURN(TRUE);
552
553 protection_request.init(MDL_key::GLOBAL, "", "", MDL_INTENTION_EXCLUSIVE,
554 MDL_STATEMENT);
555
556 if (thd->mdl_context.acquire_lock(&protection_request,
557 thd->variables.lock_wait_timeout))
558 DBUG_RETURN(TRUE);
559
560 if (thd->mdl_context.acquire_lock(&table_list->mdl_request,
561 thd->variables.lock_wait_timeout))
562 /*
563 If a lock can't be acquired, it makes no sense to try normal insert.
564 Therefore we just abort the statement.
565 */
566 DBUG_RETURN(TRUE);
567
568 bool error= FALSE;
569 if (delayed_get_table(thd, &protection_request, table_list))
570 error= TRUE;
571 else if (table_list->table)
572 {
573 /*
574 Open tables used for sub-selects or in stored functions, will also
575 cache these functions.
576 */
577 if (open_and_lock_tables(thd, table_list->next_global, TRUE, 0))
578 {
579 end_delayed_insert(thd);
580 error= TRUE;
581 }
582 else
583 {
584 /*
585 First table was not processed by open_and_lock_tables(),
586 we need to set updatability flag "by hand".
587 */
588 if (!table_list->derived && !table_list->view)
589 table_list->updatable= 1; // usual table
590 }
591 }
592
593 /*
594 We can't release protection against GRL and metadata lock on the table
595 being inserted into here. These locks might be required, for example,
596 because this INSERT DELAYED calls functions which may try to update
597 this or another tables (updating the same table is of course illegal,
598 but such an attempt can be discovered only later during statement
599 execution).
600 */
601
602 /*
603 Reset the ticket in case we end up having to use normal insert and
604 therefore will reopen the table and reacquire the metadata lock.
605 */
606 table_list->mdl_request.ticket= NULL;
607
608 if (error || table_list->table)
609 DBUG_RETURN(error);
610 #endif
611 /*
612 * This is embedded library and we don't have auxiliary
613 threads OR
614 * a lock upgrade was requested inside delayed_get_table
615 because
616 - there are too many delayed insert threads OR
617 - the table has triggers.
618 Use a normal insert.
619 */
620 table_list->lock_type= TL_WRITE;
621 DBUG_RETURN(open_and_lock_tables(thd, table_list, TRUE, 0));
622 }
623
624
625 /**
626 Create a new query string for removing DELAYED keyword for
627 multi INSERT DEALAYED statement.
628
629 @param[in] thd Thread handler
630 @param[in] buf Query string
631
632 @return
633 0 ok
634 1 error
635 */
636 static int
create_insert_stmt_from_insert_delayed(THD * thd,String * buf)637 create_insert_stmt_from_insert_delayed(THD *thd, String *buf)
638 {
639 /* Make a copy of thd->query() and then remove the "DELAYED" keyword */
640 if (buf->append(thd->query()) ||
641 buf->replace(thd->lex->keyword_delayed_begin_offset,
642 thd->lex->keyword_delayed_end_offset -
643 thd->lex->keyword_delayed_begin_offset, 0))
644 return 1;
645 return 0;
646 }
647
648
save_insert_query_plan(THD * thd,TABLE_LIST * table_list)649 static void save_insert_query_plan(THD* thd, TABLE_LIST *table_list)
650 {
651 Explain_insert* explain= new (thd->mem_root) Explain_insert(thd->mem_root);
652 explain->table_name.append(table_list->table->alias);
653
654 thd->lex->explain->add_insert_plan(explain);
655
656 /* See Update_plan::updating_a_view for details */
657 bool skip= MY_TEST(table_list->view);
658
659 /* Save subquery children */
660 for (SELECT_LEX_UNIT *unit= thd->lex->select_lex.first_inner_unit();
661 unit;
662 unit= unit->next_unit())
663 {
664 if (skip)
665 {
666 skip= false;
667 continue;
668 }
669 /*
670 Table elimination doesn't work for INSERTS, but let's still have this
671 here for consistency
672 */
673 if (!(unit->item && unit->item->eliminated))
674 explain->add_child(unit->first_select()->select_number);
675 }
676 }
677
678
field_to_fill()679 Field **TABLE::field_to_fill()
680 {
681 return triggers && triggers->nullable_fields() ? triggers->nullable_fields() : field;
682 }
683
684
685 /**
686 INSERT statement implementation
687
688 @note Like implementations of other DDL/DML in MySQL, this function
689 relies on the caller to close the thread tables. This is done in the
690 end of dispatch_command().
691 */
692
mysql_insert(THD * thd,TABLE_LIST * table_list,List<Item> & fields,List<List_item> & values_list,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,bool ignore)693 bool mysql_insert(THD *thd,TABLE_LIST *table_list,
694 List<Item> &fields,
695 List<List_item> &values_list,
696 List<Item> &update_fields,
697 List<Item> &update_values,
698 enum_duplicates duplic,
699 bool ignore)
700 {
701 bool retval= true;
702 int error, res;
703 bool transactional_table, joins_freed= FALSE;
704 bool changed;
705 const bool was_insert_delayed= (table_list->lock_type == TL_WRITE_DELAYED);
706 bool using_bulk_insert= 0;
707 uint value_count;
708 ulong counter = 1;
709 /* counter of iteration in bulk PS operation*/
710 ulonglong iteration= 0;
711 ulonglong id;
712 COPY_INFO info;
713 TABLE *table= 0;
714 List_iterator_fast<List_item> its(values_list);
715 List_item *values;
716 Name_resolution_context *context;
717 Name_resolution_context_state ctx_state;
718 #ifndef EMBEDDED_LIBRARY
719 char *query= thd->query();
720 /*
721 log_on is about delayed inserts only.
722 By default, both logs are enabled (this won't cause problems if the server
723 runs without --log-bin).
724 */
725 bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
726 #endif
727 thr_lock_type lock_type;
728 Item *unused_conds= 0;
729 DBUG_ENTER("mysql_insert");
730
731 create_explain_query(thd->lex, thd->mem_root);
732 /*
733 Upgrade lock type if the requested lock is incompatible with
734 the current connection mode or table operation.
735 */
736 upgrade_lock_type(thd, &table_list->lock_type, duplic);
737
738 /*
739 We can't write-delayed into a table locked with LOCK TABLES:
740 this will lead to a deadlock, since the delayed thread will
741 never be able to get a lock on the table.
742 */
743 if (table_list->lock_type == TL_WRITE_DELAYED &&
744 thd->locked_tables_mode &&
745 find_locked_table(thd->open_tables, table_list->db.str,
746 table_list->table_name.str))
747 {
748 my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
749 table_list->table_name.str);
750 DBUG_RETURN(TRUE);
751 }
752
753 if (table_list->lock_type == TL_WRITE_DELAYED)
754 {
755 if (open_and_lock_for_insert_delayed(thd, table_list))
756 DBUG_RETURN(TRUE);
757 }
758 else
759 {
760 if (open_and_lock_tables(thd, table_list, TRUE, 0))
761 DBUG_RETURN(TRUE);
762 }
763
764 THD_STAGE_INFO(thd, stage_init_update);
765 lock_type= table_list->lock_type;
766 thd->lex->used_tables=0;
767 values= its++;
768 if (bulk_parameters_set(thd))
769 DBUG_RETURN(TRUE);
770 value_count= values->elements;
771
772 if (mysql_prepare_insert(thd, table_list, table, fields, values,
773 update_fields, update_values, duplic, &unused_conds,
774 FALSE))
775 goto abort;
776
777 /* mysql_prepare_insert sets table_list->table if it was not set */
778 table= table_list->table;
779
780 context= &thd->lex->select_lex.context;
781 /*
782 These three asserts test the hypothesis that the resetting of the name
783 resolution context below is not necessary at all since the list of local
784 tables for INSERT always consists of one table.
785 */
786 DBUG_ASSERT(!table_list->next_local);
787 DBUG_ASSERT(!context->table_list->next_local);
788 DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
789
790 /* Save the state of the current name resolution context. */
791 ctx_state.save_state(context, table_list);
792
793 /*
794 Perform name resolution only in the first table - 'table_list',
795 which is the table that is inserted into.
796 */
797 table_list->next_local= 0;
798 context->resolve_in_table_list_only(table_list);
799 switch_to_nullable_trigger_fields(*values, table);
800
801 while ((values= its++))
802 {
803 counter++;
804 if (values->elements != value_count)
805 {
806 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
807 goto abort;
808 }
809 if (setup_fields(thd, Ref_ptr_array(),
810 *values, MARK_COLUMNS_READ, 0, NULL, 0))
811 goto abort;
812 switch_to_nullable_trigger_fields(*values, table);
813 }
814 its.rewind ();
815
816 /* Restore the current context. */
817 ctx_state.restore_state(context, table_list);
818
819 if (thd->lex->unit.first_select()->optimize_unflattened_subqueries(false))
820 {
821 goto abort;
822 }
823 save_insert_query_plan(thd, table_list);
824 if (thd->lex->describe)
825 {
826 retval= thd->lex->explain->send_explain(thd);
827 goto abort;
828 }
829
830 /*
831 Fill in the given fields and dump it to the table file
832 */
833 bzero((char*) &info,sizeof(info));
834 info.ignore= ignore;
835 info.handle_duplicates=duplic;
836 info.update_fields= &update_fields;
837 info.update_values= &update_values;
838 info.view= (table_list->view ? table_list : 0);
839 info.table_list= table_list;
840
841 /*
842 Count warnings for all inserts.
843 For single line insert, generate an error if try to set a NOT NULL field
844 to NULL.
845 */
846 thd->count_cuted_fields= ((values_list.elements == 1 &&
847 !ignore) ?
848 CHECK_FIELD_ERROR_FOR_NULL :
849 CHECK_FIELD_WARN);
850 thd->cuted_fields = 0L;
851 table->next_number_field=table->found_next_number_field;
852
853 #ifdef HAVE_REPLICATION
854 if (thd->rgi_slave &&
855 (info.handle_duplicates == DUP_UPDATE) &&
856 (table->next_number_field != NULL) &&
857 rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
858 goto abort;
859 #endif
860
861 error=0;
862 if (duplic == DUP_REPLACE &&
863 (!table->triggers || !table->triggers->has_delete_triggers()))
864 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
865 if (duplic == DUP_UPDATE)
866 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
867 /*
868 let's *try* to start bulk inserts. It won't necessary
869 start them as values_list.elements should be greater than
870 some - handler dependent - threshold.
871 We should not start bulk inserts if this statement uses
872 functions or invokes triggers since they may access
873 to the same table and therefore should not see its
874 inconsistent state created by this optimization.
875 So we call start_bulk_insert to perform nesessary checks on
876 values_list.elements, and - if nothing else - to initialize
877 the code to make the call of end_bulk_insert() below safe.
878 */
879 #ifndef EMBEDDED_LIBRARY
880 if (lock_type != TL_WRITE_DELAYED)
881 #endif /* EMBEDDED_LIBRARY */
882 {
883 if (duplic != DUP_ERROR || ignore)
884 {
885 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
886 if (table->file->ha_table_flags() & HA_DUPLICATE_POS &&
887 table->file->ha_rnd_init_with_error(0))
888 goto abort;
889 }
890 /**
891 This is a simple check for the case when the table has a trigger
892 that reads from it, or when the statement invokes a stored function
893 that reads from the table being inserted to.
894 Engines can't handle a bulk insert in parallel with a read form the
895 same table in the same connection.
896 */
897 if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
898 values_list.elements > 1)
899 {
900 using_bulk_insert= 1;
901 table->file->ha_start_bulk_insert(values_list.elements);
902 }
903 }
904
905 thd->abort_on_warning= !ignore && thd->is_strict_mode();
906
907 table->reset_default_fields();
908 table->prepare_triggers_for_insert_stmt_or_event();
909 table->mark_columns_needed_for_insert();
910
911 if (fields.elements || !value_count || table_list->view != 0)
912 {
913 if (table->triggers &&
914 table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
915 {
916 /* BEFORE INSERT triggers exist, the check will be done later, per row */
917 }
918 else if (check_that_all_fields_are_given_values(thd, table, table_list))
919 {
920 error= 1;
921 goto values_loop_end;
922 }
923 }
924
925 if (table_list->prepare_where(thd, 0, TRUE) ||
926 table_list->prepare_check_option(thd))
927 error= 1;
928
929 switch_to_nullable_trigger_fields(fields, table);
930 switch_to_nullable_trigger_fields(update_fields, table);
931 switch_to_nullable_trigger_fields(update_values, table);
932
933 if (fields.elements || !value_count)
934 {
935 /*
936 There are possibly some default values:
937 INSERT INTO t1 (fields) VALUES ...
938 INSERT INTO t1 VALUES ()
939 */
940 if (table->validate_default_values_of_unset_fields(thd))
941 {
942 error= 1;
943 goto values_loop_end;
944 }
945 }
946
947 THD_STAGE_INFO(thd, stage_update);
948 do
949 {
950 DBUG_PRINT("info", ("iteration %llu", iteration));
951 if (iteration && bulk_parameters_set(thd))
952 {
953 error= 1;
954 goto values_loop_end;
955 }
956
957 while ((values= its++))
958 {
959 if (fields.elements || !value_count)
960 {
961 /*
962 There are possibly some default values:
963 INSERT INTO t1 (fields) VALUES ...
964 INSERT INTO t1 VALUES ()
965 */
966 restore_record(table,s->default_values); // Get empty record
967 table->reset_default_fields();
968 if (unlikely(fill_record_n_invoke_before_triggers(thd, table, fields,
969 *values, 0,
970 TRG_EVENT_INSERT)))
971 {
972 if (values_list.elements != 1 && ! thd->is_error())
973 {
974 info.records++;
975 continue;
976 }
977 /*
978 TODO: set thd->abort_on_warning if values_list.elements == 1
979 and check that all items return warning in case of problem with
980 storing field.
981 */
982 error=1;
983 break;
984 }
985 }
986 else
987 {
988 /*
989 No field list, all fields are set explicitly:
990 INSERT INTO t1 VALUES (values)
991 */
992 if (thd->lex->used_tables || // Column used in values()
993 table->s->visible_fields != table->s->fields)
994 restore_record(table,s->default_values); // Get empty record
995 else
996 {
997 TABLE_SHARE *share= table->s;
998
999 /*
1000 Fix delete marker. No need to restore rest of record since it will
1001 be overwritten by fill_record() anyway (and fill_record() does not
1002 use default values in this case).
1003 */
1004 table->record[0][0]= share->default_values[0];
1005
1006 /* Fix undefined null_bits. */
1007 if (share->null_bytes > 1 && share->last_null_bit_pos)
1008 {
1009 table->record[0][share->null_bytes - 1]=
1010 share->default_values[share->null_bytes - 1];
1011 }
1012 }
1013 table->reset_default_fields();
1014 if (unlikely(fill_record_n_invoke_before_triggers(thd, table,
1015 table->
1016 field_to_fill(),
1017 *values, 0,
1018 TRG_EVENT_INSERT)))
1019 {
1020 if (values_list.elements != 1 && ! thd->is_error())
1021 {
1022 info.records++;
1023 continue;
1024 }
1025 error=1;
1026 break;
1027 }
1028 }
1029
1030 /*
1031 with triggers a field can get a value *conditionally*, so we have to
1032 repeat has_no_default_value() check for every row
1033 */
1034 if (table->triggers &&
1035 table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
1036 {
1037 for (Field **f=table->field ; *f ; f++)
1038 {
1039 if (unlikely(!(*f)->has_explicit_value() &&
1040 has_no_default_value(thd, *f, table_list)))
1041 {
1042 error= 1;
1043 goto values_loop_end;
1044 }
1045 }
1046 }
1047
1048 if ((res= table_list->view_check_option(thd,
1049 (values_list.elements == 1 ?
1050 0 :
1051 ignore))) ==
1052 VIEW_CHECK_SKIP)
1053 continue;
1054 else if (res == VIEW_CHECK_ERROR)
1055 {
1056 error= 1;
1057 break;
1058 }
1059
1060 thd->decide_logging_format_low(table);
1061 #ifndef EMBEDDED_LIBRARY
1062 if (lock_type == TL_WRITE_DELAYED)
1063 {
1064 LEX_STRING const st_query = { query, thd->query_length() };
1065 DEBUG_SYNC(thd, "before_write_delayed");
1066 error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
1067 DEBUG_SYNC(thd, "after_write_delayed");
1068 query=0;
1069 }
1070 else
1071 #endif
1072 error=write_record(thd, table ,&info);
1073 if (unlikely(error))
1074 break;
1075 thd->get_stmt_da()->inc_current_row_for_warning();
1076 }
1077 its.rewind();
1078 iteration++;
1079 } while (bulk_parameters_iterations(thd));
1080
1081 values_loop_end:
1082 free_underlaid_joins(thd, &thd->lex->select_lex);
1083 joins_freed= TRUE;
1084
1085 /*
1086 Now all rows are inserted. Time to update logs and sends response to
1087 user
1088 */
1089 #ifndef EMBEDDED_LIBRARY
1090 if (unlikely(lock_type == TL_WRITE_DELAYED))
1091 {
1092 if (likely(!error))
1093 {
1094 info.copied=values_list.elements;
1095 end_delayed_insert(thd);
1096 }
1097 }
1098 else
1099 #endif
1100 {
1101 /*
1102 Do not do this release if this is a delayed insert, it would steal
1103 auto_inc values from the delayed_insert thread as they share TABLE.
1104 */
1105 table->file->ha_release_auto_increment();
1106 if (using_bulk_insert && unlikely(table->file->ha_end_bulk_insert()) &&
1107 !error)
1108 {
1109 table->file->print_error(my_errno,MYF(0));
1110 error=1;
1111 }
1112 if (duplic != DUP_ERROR || ignore)
1113 {
1114 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1115 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1116 table->file->ha_rnd_end();
1117 }
1118
1119 transactional_table= table->file->has_transactions();
1120
1121 if (likely(changed= (info.copied || info.deleted || info.updated)))
1122 {
1123 /*
1124 Invalidate the table in the query cache if something changed.
1125 For the transactional algorithm to work the invalidation must be
1126 before binlog writing and ha_autocommit_or_rollback
1127 */
1128 query_cache_invalidate3(thd, table_list, 1);
1129 }
1130
1131 if (thd->transaction.stmt.modified_non_trans_table)
1132 thd->transaction.all.modified_non_trans_table= TRUE;
1133 thd->transaction.all.m_unsafe_rollback_flags|=
1134 (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
1135
1136 if (error <= 0 ||
1137 thd->transaction.stmt.modified_non_trans_table ||
1138 was_insert_delayed)
1139 {
1140 if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
1141 {
1142 int errcode= 0;
1143 if (error <= 0)
1144 {
1145 /*
1146 [Guilhem wrote] Temporary errors may have filled
1147 thd->net.last_error/errno. For example if there has
1148 been a disk full error when writing the row, and it was
1149 MyISAM, then thd->net.last_error/errno will be set to
1150 "disk full"... and the mysql_file_pwrite() will wait until free
1151 space appears, and so when it finishes then the
1152 write_row() was entirely successful
1153 */
1154 /* todo: consider removing */
1155 thd->clear_error();
1156 }
1157 else
1158 errcode= query_error_code(thd, thd->killed == NOT_KILLED);
1159
1160 ScopedStatementReplication scoped_stmt_rpl(
1161 table->versioned(VERS_TRX_ID) ? thd : NULL);
1162 /* bug#22725:
1163
1164 A query which per-row-loop can not be interrupted with
1165 KILLED, like INSERT, and that does not invoke stored
1166 routines can be binlogged with neglecting the KILLED error.
1167
1168 If there was no error (error == zero) until after the end of
1169 inserting loop the KILLED flag that appeared later can be
1170 disregarded since previously possible invocation of stored
1171 routines did not result in any error due to the KILLED. In
1172 such case the flag is ignored for constructing binlog event.
1173 */
1174 DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0);
1175 if (was_insert_delayed && table_list->lock_type == TL_WRITE)
1176 {
1177 /* Binlog INSERT DELAYED as INSERT without DELAYED. */
1178 String log_query;
1179 if (create_insert_stmt_from_insert_delayed(thd, &log_query))
1180 {
1181 sql_print_error("Event Error: An error occurred while creating query string"
1182 "for INSERT DELAYED stmt, before writing it into binary log.");
1183
1184 error= 1;
1185 }
1186 else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1187 log_query.c_ptr(), log_query.length(),
1188 transactional_table, FALSE, FALSE,
1189 errcode) > 0)
1190 error= 1;
1191 }
1192 else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
1193 thd->query(), thd->query_length(),
1194 transactional_table, FALSE, FALSE,
1195 errcode) > 0)
1196 error= 1;
1197 }
1198 }
1199 DBUG_ASSERT(transactional_table || !changed ||
1200 thd->transaction.stmt.modified_non_trans_table);
1201 }
1202 THD_STAGE_INFO(thd, stage_end);
1203 /*
1204 We'll report to the client this id:
1205 - if the table contains an autoincrement column and we successfully
1206 inserted an autogenerated value, the autogenerated value.
1207 - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
1208 called, X.
1209 - if the table contains an autoincrement column, and some rows were
1210 inserted, the id of the last "inserted" row (if IGNORE, that value may not
1211 have been really inserted but ignored).
1212 */
1213 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
1214 thd->first_successful_insert_id_in_cur_stmt :
1215 (thd->arg_of_last_insert_id_function ?
1216 thd->first_successful_insert_id_in_prev_stmt :
1217 ((table->next_number_field && info.copied) ?
1218 table->next_number_field->val_int() : 0));
1219 table->next_number_field=0;
1220 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
1221 table->auto_increment_field_not_null= FALSE;
1222 if (duplic == DUP_REPLACE &&
1223 (!table->triggers || !table->triggers->has_delete_triggers()))
1224 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1225
1226 if (unlikely(error))
1227 goto abort;
1228 if (thd->lex->analyze_stmt)
1229 {
1230 retval= thd->lex->explain->send_explain(thd);
1231 goto abort;
1232 }
1233 if ((iteration * values_list.elements) == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
1234 !thd->cuted_fields))
1235 {
1236 my_ok(thd, info.copied + info.deleted +
1237 ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1238 info.touched : info.updated),
1239 id);
1240 }
1241 else
1242 {
1243 char buff[160];
1244 ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
1245 info.touched : info.updated);
1246 if (ignore)
1247 sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1248 (lock_type == TL_WRITE_DELAYED) ? (ulong) 0 :
1249 (ulong) (info.records - info.copied),
1250 (long) thd->get_stmt_da()->current_statement_warn_count());
1251 else
1252 sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
1253 (ulong) (info.deleted + updated),
1254 (long) thd->get_stmt_da()->current_statement_warn_count());
1255 ::my_ok(thd, info.copied + info.deleted + updated, id, buff);
1256 }
1257 thd->abort_on_warning= 0;
1258 if (thd->lex->current_select->first_cond_optimization)
1259 {
1260 thd->lex->current_select->save_leaf_tables(thd);
1261 thd->lex->current_select->first_cond_optimization= 0;
1262 }
1263
1264 DBUG_RETURN(FALSE);
1265
1266 abort:
1267 #ifndef EMBEDDED_LIBRARY
1268 if (lock_type == TL_WRITE_DELAYED)
1269 {
1270 end_delayed_insert(thd);
1271 /*
1272 In case of an error (e.g. data truncation), the data type specific data
1273 in fields (e.g. Field_blob::value) was not taken over
1274 by the delayed writer thread. All fields in table_list->table
1275 will be freed by free_root() soon. We need to free the specific
1276 data before free_root() to avoid a memory leak.
1277 */
1278 for (Field **ptr= table_list->table->field ; *ptr ; ptr++)
1279 (*ptr)->free();
1280 }
1281 #endif
1282 if (table != NULL)
1283 table->file->ha_release_auto_increment();
1284
1285 if (!joins_freed)
1286 free_underlaid_joins(thd, &thd->lex->select_lex);
1287 thd->abort_on_warning= 0;
1288 DBUG_RETURN(retval);
1289 }
1290
1291
1292 /*
1293 Additional check for insertability for VIEW
1294
1295 SYNOPSIS
1296 check_view_insertability()
1297 thd - thread handler
1298 view - reference on VIEW
1299
1300 IMPLEMENTATION
1301 A view is insertable if the folloings are true:
1302 - All columns in the view are columns from a table
1303 - All not used columns in table have a default values
1304 - All field in view are unique (not referring to the same column)
1305
1306 RETURN
1307 FALSE - OK
1308 view->contain_auto_increment is 1 if and only if the view contains an
1309 auto_increment field
1310
1311 TRUE - can't be used for insert
1312 */
1313
check_view_insertability(THD * thd,TABLE_LIST * view)1314 static bool check_view_insertability(THD * thd, TABLE_LIST *view)
1315 {
1316 uint num= view->view->select_lex.item_list.elements;
1317 TABLE *table= view->table;
1318 Field_translator *trans_start= view->field_translation,
1319 *trans_end= trans_start + num;
1320 Field_translator *trans;
1321 uint used_fields_buff_size= bitmap_buffer_size(table->s->fields);
1322 uint32 *used_fields_buff= (uint32*)thd->alloc(used_fields_buff_size);
1323 MY_BITMAP used_fields;
1324 enum_column_usage saved_column_usage= thd->column_usage;
1325 DBUG_ENTER("check_key_in_view");
1326
1327 if (!used_fields_buff)
1328 DBUG_RETURN(TRUE); // EOM
1329
1330 DBUG_ASSERT(view->table != 0 && view->field_translation != 0);
1331
1332 (void) my_bitmap_init(&used_fields, used_fields_buff, table->s->fields, 0);
1333 bitmap_clear_all(&used_fields);
1334
1335 view->contain_auto_increment= 0;
1336 /*
1337 we must not set query_id for fields as they're not
1338 really used in this context
1339 */
1340 thd->column_usage= COLUMNS_WRITE;
1341 /* check simplicity and prepare unique test of view */
1342 for (trans= trans_start; trans != trans_end; trans++)
1343 {
1344 if (trans->item->fix_fields_if_needed(thd, &trans->item))
1345 {
1346 thd->column_usage= saved_column_usage;
1347 DBUG_RETURN(TRUE);
1348 }
1349 Item_field *field;
1350 /* simple SELECT list entry (field without expression) */
1351 if (!(field= trans->item->field_for_view_update()))
1352 {
1353 thd->column_usage= saved_column_usage;
1354 DBUG_RETURN(TRUE);
1355 }
1356 if (field->field->unireg_check == Field::NEXT_NUMBER)
1357 view->contain_auto_increment= 1;
1358 /* prepare unique test */
1359 /*
1360 remove collation (or other transparent for update function) if we have
1361 it
1362 */
1363 trans->item= field;
1364 }
1365 thd->column_usage= saved_column_usage;
1366 /* unique test */
1367 for (trans= trans_start; trans != trans_end; trans++)
1368 {
1369 /* Thanks to test above, we know that all columns are of type Item_field */
1370 Item_field *field= (Item_field *)trans->item;
1371 /* check fields belong to table in which we are inserting */
1372 if (field->field->table == table &&
1373 bitmap_fast_test_and_set(&used_fields, field->field->field_index))
1374 DBUG_RETURN(TRUE);
1375 }
1376
1377 DBUG_RETURN(FALSE);
1378 }
1379
1380
1381 /*
1382 Check if table can be updated
1383
1384 SYNOPSIS
1385 mysql_prepare_insert_check_table()
1386 thd Thread handle
1387 table_list Table list
1388 fields List of fields to be updated
1389 where Pointer to where clause
1390 select_insert Check is making for SELECT ... INSERT
1391
1392 RETURN
1393 FALSE ok
1394 TRUE ERROR
1395 */
1396
mysql_prepare_insert_check_table(THD * thd,TABLE_LIST * table_list,List<Item> & fields,bool select_insert)1397 static bool mysql_prepare_insert_check_table(THD *thd, TABLE_LIST *table_list,
1398 List<Item> &fields,
1399 bool select_insert)
1400 {
1401 bool insert_into_view= (table_list->view != 0);
1402 DBUG_ENTER("mysql_prepare_insert_check_table");
1403
1404 if (!table_list->single_table_updatable())
1405 {
1406 my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias.str, "INSERT");
1407 DBUG_RETURN(TRUE);
1408 }
1409 /*
1410 first table in list is the one we'll INSERT into, requires INSERT_ACL.
1411 all others require SELECT_ACL only. the ACL requirement below is for
1412 new leaves only anyway (view-constituents), so check for SELECT rather
1413 than INSERT.
1414 */
1415
1416 if (setup_tables_and_check_access(thd, &thd->lex->select_lex.context,
1417 &thd->lex->select_lex.top_join_list,
1418 table_list,
1419 thd->lex->select_lex.leaf_tables,
1420 select_insert, INSERT_ACL, SELECT_ACL,
1421 TRUE))
1422 DBUG_RETURN(TRUE);
1423
1424 if (insert_into_view && !fields.elements)
1425 {
1426 thd->lex->empty_field_list_on_rset= 1;
1427 if (!thd->lex->select_lex.leaf_tables.head()->table ||
1428 table_list->is_multitable())
1429 {
1430 my_error(ER_VIEW_NO_INSERT_FIELD_LIST, MYF(0),
1431 table_list->view_db.str, table_list->view_name.str);
1432 DBUG_RETURN(TRUE);
1433 }
1434 DBUG_RETURN(insert_view_fields(thd, &fields, table_list));
1435 }
1436
1437 DBUG_RETURN(FALSE);
1438 }
1439
1440
1441 /*
1442 Get extra info for tables we insert into
1443
1444 @param table table(TABLE object) we insert into,
1445 might be NULL in case of view
1446 @param table(TABLE_LIST object) or view we insert into
1447 */
1448
prepare_for_positional_update(TABLE * table,TABLE_LIST * tables)1449 static void prepare_for_positional_update(TABLE *table, TABLE_LIST *tables)
1450 {
1451 if (table)
1452 {
1453 if(table->reginfo.lock_type != TL_WRITE_DELAYED)
1454 table->prepare_for_position();
1455 return;
1456 }
1457
1458 DBUG_ASSERT(tables->view);
1459 List_iterator<TABLE_LIST> it(*tables->view_tables);
1460 TABLE_LIST *tbl;
1461 while ((tbl= it++))
1462 prepare_for_positional_update(tbl->table, tbl);
1463
1464 return;
1465 }
1466
1467
1468 /*
1469 Prepare items in INSERT statement
1470
1471 SYNOPSIS
1472 mysql_prepare_insert()
1473 thd Thread handler
1474 table_list Global/local table list
1475 table Table to insert into (can be NULL if table should
1476 be taken from table_list->table)
1477 where Where clause (for insert ... select)
1478 select_insert TRUE if INSERT ... SELECT statement
1479
1480 TODO (in far future)
1481 In cases of:
1482 INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
1483 ON DUPLICATE KEY ...
1484 we should be able to refer to sum1 in the ON DUPLICATE KEY part
1485
1486 WARNING
1487 You MUST set table->insert_values to 0 after calling this function
1488 before releasing the table object.
1489
1490 RETURN VALUE
1491 FALSE OK
1492 TRUE error
1493 */
1494
mysql_prepare_insert(THD * thd,TABLE_LIST * table_list,TABLE * table,List<Item> & fields,List_item * values,List<Item> & update_fields,List<Item> & update_values,enum_duplicates duplic,COND ** where,bool select_insert)1495 bool mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
1496 TABLE *table, List<Item> &fields, List_item *values,
1497 List<Item> &update_fields, List<Item> &update_values,
1498 enum_duplicates duplic, COND **where,
1499 bool select_insert)
1500 {
1501 SELECT_LEX *select_lex= &thd->lex->select_lex;
1502 Name_resolution_context *context= &select_lex->context;
1503 Name_resolution_context_state ctx_state;
1504 bool insert_into_view= (table_list->view != 0);
1505 bool res= 0;
1506 table_map map= 0;
1507 DBUG_ENTER("mysql_prepare_insert");
1508 DBUG_PRINT("enter", ("table_list: %p table: %p view: %d",
1509 table_list, table,
1510 (int)insert_into_view));
1511 /* INSERT should have a SELECT or VALUES clause */
1512 DBUG_ASSERT (!select_insert || !values);
1513
1514 if (mysql_handle_derived(thd->lex, DT_INIT))
1515 DBUG_RETURN(TRUE);
1516 if (table_list->handle_derived(thd->lex, DT_MERGE_FOR_INSERT))
1517 DBUG_RETURN(TRUE);
1518 if (thd->lex->handle_list_of_derived(table_list, DT_PREPARE))
1519 DBUG_RETURN(TRUE);
1520 /*
1521 For subqueries in VALUES() we should not see the table in which we are
1522 inserting (for INSERT ... SELECT this is done by changing table_list,
1523 because INSERT ... SELECT share SELECT_LEX it with SELECT.
1524 */
1525 if (!select_insert)
1526 {
1527 for (SELECT_LEX_UNIT *un= select_lex->first_inner_unit();
1528 un;
1529 un= un->next_unit())
1530 {
1531 for (SELECT_LEX *sl= un->first_select();
1532 sl;
1533 sl= sl->next_select())
1534 {
1535 sl->context.outer_context= 0;
1536 }
1537 }
1538 }
1539
1540 if (duplic == DUP_UPDATE)
1541 {
1542 /* it should be allocated before Item::fix_fields() */
1543 if (table_list->set_insert_values(thd->mem_root))
1544 DBUG_RETURN(TRUE);
1545 }
1546
1547 if (mysql_prepare_insert_check_table(thd, table_list, fields, select_insert))
1548 DBUG_RETURN(TRUE);
1549
1550 /* Prepare the fields in the statement. */
1551 if (values)
1552 {
1553 /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
1554 DBUG_ASSERT (!select_lex->group_list.elements);
1555
1556 /* Save the state of the current name resolution context. */
1557 ctx_state.save_state(context, table_list);
1558
1559 /*
1560 Perform name resolution only in the first table - 'table_list',
1561 which is the table that is inserted into.
1562 */
1563 table_list->next_local= 0;
1564 context->resolve_in_table_list_only(table_list);
1565
1566 res= (setup_fields(thd, Ref_ptr_array(),
1567 *values, MARK_COLUMNS_READ, 0, NULL, 0) ||
1568 check_insert_fields(thd, context->table_list, fields, *values,
1569 !insert_into_view, 0, &map));
1570
1571 if (!res)
1572 res= setup_fields(thd, Ref_ptr_array(),
1573 update_values, MARK_COLUMNS_READ, 0, NULL, 0);
1574
1575 if (!res && duplic == DUP_UPDATE)
1576 {
1577 select_lex->no_wrap_view_item= TRUE;
1578 res= check_update_fields(thd, context->table_list, update_fields,
1579 update_values, false, &map);
1580 select_lex->no_wrap_view_item= FALSE;
1581 }
1582
1583 /* Restore the current context. */
1584 ctx_state.restore_state(context, table_list);
1585 }
1586
1587 if (res)
1588 DBUG_RETURN(res);
1589
1590 if (!table)
1591 table= table_list->table;
1592
1593 if (table->versioned(VERS_TIMESTAMP) && duplic == DUP_REPLACE)
1594 {
1595 // Additional memory may be required to create historical items.
1596 if (table_list->set_insert_values(thd->mem_root))
1597 DBUG_RETURN(TRUE);
1598 }
1599
1600 if (!select_insert)
1601 {
1602 Item *fake_conds= 0;
1603 TABLE_LIST *duplicate;
1604 if ((duplicate= unique_table(thd, table_list, table_list->next_global,
1605 CHECK_DUP_ALLOW_DIFFERENT_ALIAS)))
1606 {
1607 update_non_unique_table_error(table_list, "INSERT", duplicate);
1608 DBUG_RETURN(TRUE);
1609 }
1610 select_lex->fix_prepare_information(thd, &fake_conds, &fake_conds);
1611 }
1612 /*
1613 Only call prepare_for_posistion() if we are not performing a DELAYED
1614 operation. It will instead be executed by delayed insert thread.
1615 */
1616 if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
1617 prepare_for_positional_update(table, table_list);
1618 DBUG_RETURN(FALSE);
1619 }
1620
1621
1622 /* Check if there is more uniq keys after field */
1623
last_uniq_key(TABLE * table,uint keynr)1624 static int last_uniq_key(TABLE *table,uint keynr)
1625 {
1626 /*
1627 When an underlying storage engine informs that the unique key
1628 conflicts are not reported in the ascending order by setting
1629 the HA_DUPLICATE_KEY_NOT_IN_ORDER flag, we cannot rely on this
1630 information to determine the last key conflict.
1631
1632 The information about the last key conflict will be used to
1633 do a replace of the new row on the conflicting row, rather
1634 than doing a delete (of old row) + insert (of new row).
1635
1636 Hence check for this flag and disable replacing the last row
1637 by returning 0 always. Returning 0 will result in doing
1638 a delete + insert always.
1639 */
1640 if (table->file->ha_table_flags() & HA_DUPLICATE_KEY_NOT_IN_ORDER)
1641 return 0;
1642
1643 while (++keynr < table->s->keys)
1644 if (table->key_info[keynr].flags & HA_NOSAME)
1645 return 0;
1646 return 1;
1647 }
1648
1649
1650 /*
1651 Inserts one historical row to a table.
1652
1653 Copies content of the row from table->record[1] to table->record[0],
1654 sets Sys_end to now() and calls ha_write_row() .
1655 */
1656
vers_insert_history_row(TABLE * table)1657 int vers_insert_history_row(TABLE *table)
1658 {
1659 DBUG_ASSERT(table->versioned(VERS_TIMESTAMP));
1660 if (!table->vers_write)
1661 return 0;
1662 restore_record(table,record[1]);
1663
1664 // Set Sys_end to now()
1665 table->vers_update_end();
1666
1667 Field *row_start= table->vers_start_field();
1668 Field *row_end= table->vers_end_field();
1669 if (row_start->cmp(row_start->ptr, row_end->ptr) >= 0)
1670 return 0;
1671
1672 if (table->vfield &&
1673 table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_READ))
1674 return HA_ERR_GENERIC;
1675
1676 return table->file->ha_write_row(table->record[0]);
1677 }
1678
1679 /*
1680 Write a record to table with optional deleting of conflicting records,
1681 invoke proper triggers if needed.
1682
1683 SYNOPSIS
1684 write_record()
1685 thd - thread context
1686 table - table to which record should be written
1687 info - COPY_INFO structure describing handling of duplicates
1688 and which is used for counting number of records inserted
1689 and deleted.
1690
1691 NOTE
1692 Once this record will be written to table after insert trigger will
1693 be invoked. If instead of inserting new record we will update old one
1694 then both on update triggers will work instead. Similarly both on
1695 delete triggers will be invoked if we will delete conflicting records.
1696
1697 Sets thd->transaction.stmt.modified_non_trans_table to TRUE if table which is updated didn't have
1698 transactions.
1699
1700 RETURN VALUE
1701 0 - success
1702 non-0 - error
1703 */
1704
1705
write_record(THD * thd,TABLE * table,COPY_INFO * info)1706 int write_record(THD *thd, TABLE *table,COPY_INFO *info)
1707 {
1708 int error, trg_error= 0;
1709 char *key=0;
1710 MY_BITMAP *save_read_set, *save_write_set;
1711 ulonglong prev_insert_id= table->file->next_insert_id;
1712 ulonglong insert_id_for_cur_row= 0;
1713 ulonglong prev_insert_id_for_cur_row= 0;
1714 DBUG_ENTER("write_record");
1715
1716 info->records++;
1717 save_read_set= table->read_set;
1718 save_write_set= table->write_set;
1719
1720 if (info->handle_duplicates == DUP_REPLACE ||
1721 info->handle_duplicates == DUP_UPDATE)
1722 {
1723 while (unlikely(error=table->file->ha_write_row(table->record[0])))
1724 {
1725 uint key_nr;
1726 /*
1727 If we do more than one iteration of this loop, from the second one the
1728 row will have an explicit value in the autoinc field, which was set at
1729 the first call of handler::update_auto_increment(). So we must save
1730 the autogenerated value to avoid thd->insert_id_for_cur_row to become
1731 0.
1732 */
1733 if (table->file->insert_id_for_cur_row > 0)
1734 insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1735 else
1736 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
1737 bool is_duplicate_key_error;
1738 if (table->file->is_fatal_error(error, HA_CHECK_ALL))
1739 goto err;
1740 is_duplicate_key_error=
1741 table->file->is_fatal_error(error, HA_CHECK_ALL & ~HA_CHECK_DUP);
1742 if (!is_duplicate_key_error)
1743 {
1744 /*
1745 We come here when we had an ignorable error which is not a duplicate
1746 key error. In this we ignore error if ignore flag is set, otherwise
1747 report error as usual. We will not do any duplicate key processing.
1748 */
1749 if (info->ignore)
1750 {
1751 table->file->print_error(error, MYF(ME_JUST_WARNING));
1752 goto ok_or_after_trg_err; /* Ignoring a not fatal error, return 0 */
1753 }
1754 goto err;
1755 }
1756 if (unlikely((int) (key_nr = table->file->get_dup_key(error)) < 0))
1757 {
1758 error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
1759 goto err;
1760 }
1761 DEBUG_SYNC(thd, "write_row_replace");
1762
1763 /* Read all columns for the row we are going to replace */
1764 table->use_all_columns();
1765 /*
1766 Don't allow REPLACE to replace a row when a auto_increment column
1767 was used. This ensures that we don't get a problem when the
1768 whole range of the key has been used.
1769 */
1770 if (info->handle_duplicates == DUP_REPLACE &&
1771 table->next_number_field &&
1772 key_nr == table->s->next_number_index &&
1773 (insert_id_for_cur_row > 0))
1774 goto err;
1775 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
1776 {
1777 DBUG_ASSERT(table->file->inited == handler::RND);
1778 if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
1779 goto err;
1780 }
1781 else
1782 {
1783 if (table->file->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
1784 {
1785 error=my_errno;
1786 goto err;
1787 }
1788
1789 if (!key)
1790 {
1791 if (!(key=(char*) my_safe_alloca(table->s->max_unique_length)))
1792 {
1793 error=ENOMEM;
1794 goto err;
1795 }
1796 }
1797 key_copy((uchar*) key,table->record[0],table->key_info+key_nr,0);
1798 key_part_map keypart_map= (1 << table->key_info[key_nr].user_defined_key_parts) - 1;
1799 if ((error= (table->file->ha_index_read_idx_map(table->record[1],
1800 key_nr, (uchar*) key,
1801 keypart_map,
1802 HA_READ_KEY_EXACT))))
1803 goto err;
1804 }
1805 if (table->vfield)
1806 {
1807 /*
1808 We have not yet called update_virtual_fields(VOL_UPDATE_FOR_READ)
1809 in handler methods for the just read row in record[1].
1810 */
1811 table->move_fields(table->field, table->record[1], table->record[0]);
1812 int verr = table->update_virtual_fields(table->file, VCOL_UPDATE_FOR_REPLACE);
1813 table->move_fields(table->field, table->record[0], table->record[1]);
1814 if (verr)
1815 goto err;
1816 }
1817 if (info->handle_duplicates == DUP_UPDATE)
1818 {
1819 int res= 0;
1820 /*
1821 We don't check for other UNIQUE keys - the first row
1822 that matches, is updated. If update causes a conflict again,
1823 an error is returned
1824 */
1825 DBUG_ASSERT(table->insert_values != NULL);
1826 store_record(table,insert_values);
1827 restore_record(table,record[1]);
1828 table->reset_default_fields();
1829
1830 /*
1831 in INSERT ... ON DUPLICATE KEY UPDATE the set of modified fields can
1832 change per row. Thus, we have to do reset_default_fields() per row.
1833 Twice (before insert and before update).
1834 */
1835 DBUG_ASSERT(info->update_fields->elements ==
1836 info->update_values->elements);
1837 if (fill_record_n_invoke_before_triggers(thd, table,
1838 *info->update_fields,
1839 *info->update_values,
1840 info->ignore,
1841 TRG_EVENT_UPDATE))
1842 goto before_trg_err;
1843
1844 bool different_records= (!records_are_comparable(table) ||
1845 compare_record(table));
1846 /*
1847 Default fields must be updated before checking view updateability.
1848 This branch of INSERT is executed only when a UNIQUE key was violated
1849 with the ON DUPLICATE KEY UPDATE option. In this case the INSERT
1850 operation is transformed to an UPDATE, and the default fields must
1851 be updated as if this is an UPDATE.
1852 */
1853 if (different_records && table->default_field)
1854 table->evaluate_update_default_function();
1855
1856 /* CHECK OPTION for VIEW ... ON DUPLICATE KEY UPDATE ... */
1857 res= info->table_list->view_check_option(table->in_use, info->ignore);
1858 if (res == VIEW_CHECK_SKIP)
1859 goto ok_or_after_trg_err;
1860 if (res == VIEW_CHECK_ERROR)
1861 goto before_trg_err;
1862
1863 table->file->restore_auto_increment(prev_insert_id);
1864 info->touched++;
1865 if (different_records)
1866 {
1867 if (unlikely(error=table->file->ha_update_row(table->record[1],
1868 table->record[0])) &&
1869 error != HA_ERR_RECORD_IS_THE_SAME)
1870 {
1871 if (info->ignore &&
1872 !table->file->is_fatal_error(error, HA_CHECK_ALL))
1873 {
1874 if (!(thd->variables.old_behavior &
1875 OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
1876 table->file->print_error(error, MYF(ME_JUST_WARNING));
1877 goto ok_or_after_trg_err;
1878 }
1879 goto err;
1880 }
1881
1882 if (error != HA_ERR_RECORD_IS_THE_SAME)
1883 {
1884 info->updated++;
1885 if (table->versioned())
1886 {
1887 if (table->versioned(VERS_TIMESTAMP))
1888 {
1889 store_record(table, record[2]);
1890 if ((error= vers_insert_history_row(table)))
1891 {
1892 info->last_errno= error;
1893 table->file->print_error(error, MYF(0));
1894 trg_error= 1;
1895 restore_record(table, record[2]);
1896 goto ok_or_after_trg_err;
1897 }
1898 restore_record(table, record[2]);
1899 }
1900 info->copied++;
1901 }
1902 }
1903 else
1904 error= 0;
1905 /*
1906 If ON DUP KEY UPDATE updates a row instead of inserting
1907 one, it's like a regular UPDATE statement: it should not
1908 affect the value of a next SELECT LAST_INSERT_ID() or
1909 mysql_insert_id(). Except if LAST_INSERT_ID(#) was in the
1910 INSERT query, which is handled separately by
1911 THD::arg_of_last_insert_id_function.
1912 */
1913 prev_insert_id_for_cur_row= table->file->insert_id_for_cur_row;
1914 insert_id_for_cur_row= table->file->insert_id_for_cur_row= 0;
1915 trg_error= (table->triggers &&
1916 table->triggers->process_triggers(thd, TRG_EVENT_UPDATE,
1917 TRG_ACTION_AFTER, TRUE));
1918 info->copied++;
1919 }
1920
1921 /*
1922 Only update next_insert_id if the AUTO_INCREMENT value was explicitly
1923 updated, so we don't update next_insert_id with the value from the
1924 row being updated. Otherwise reset next_insert_id to what it was
1925 before the duplicate key error, since that value is unused.
1926 */
1927 if (table->next_number_field_updated)
1928 {
1929 DBUG_ASSERT(table->next_number_field != NULL);
1930
1931 table->file->adjust_next_insert_id_after_explicit_value(table->next_number_field->val_int());
1932 }
1933 else if (prev_insert_id_for_cur_row)
1934 {
1935 table->file->restore_auto_increment(prev_insert_id_for_cur_row);
1936 }
1937 goto ok_or_after_trg_err;
1938 }
1939 else /* DUP_REPLACE */
1940 {
1941 /*
1942 The manual defines the REPLACE semantics that it is either
1943 an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
1944 InnoDB do not function in the defined way if we allow MySQL
1945 to convert the latter operation internally to an UPDATE.
1946 We also should not perform this conversion if we have
1947 timestamp field with ON UPDATE which is different from DEFAULT.
1948 Another case when conversion should not be performed is when
1949 we have ON DELETE trigger on table so user may notice that
1950 we cheat here. Note that it is ok to do such conversion for
1951 tables which have ON UPDATE but have no ON DELETE triggers,
1952 we just should not expose this fact to users by invoking
1953 ON UPDATE triggers.
1954 For system versioning wa also use path through delete since we would
1955 save nothing through this cheating.
1956 */
1957 if (last_uniq_key(table,key_nr) &&
1958 !table->file->referenced_by_foreign_key() &&
1959 (!table->triggers || !table->triggers->has_delete_triggers()))
1960 {
1961 if (table->versioned(VERS_TRX_ID))
1962 {
1963 bitmap_set_bit(table->write_set, table->vers_start_field()->field_index);
1964 table->file->column_bitmaps_signal();
1965 table->vers_start_field()->store(0, false);
1966 }
1967 if (unlikely(error= table->file->ha_update_row(table->record[1],
1968 table->record[0])) &&
1969 error != HA_ERR_RECORD_IS_THE_SAME)
1970 goto err;
1971 if (likely(!error))
1972 {
1973 info->deleted++;
1974 if (!table->file->has_transactions())
1975 thd->transaction.stmt.modified_non_trans_table= TRUE;
1976 if (table->versioned(VERS_TIMESTAMP))
1977 {
1978 store_record(table, record[2]);
1979 error= vers_insert_history_row(table);
1980 restore_record(table, record[2]);
1981 if (unlikely(error))
1982 goto err;
1983 }
1984 }
1985 else
1986 error= 0; // error was HA_ERR_RECORD_IS_THE_SAME
1987 /*
1988 Since we pretend that we have done insert we should call
1989 its after triggers.
1990 */
1991 goto after_trg_n_copied_inc;
1992 }
1993 else
1994 {
1995 if (table->triggers &&
1996 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
1997 TRG_ACTION_BEFORE, TRUE))
1998 goto before_trg_err;
1999
2000 if (!table->versioned(VERS_TIMESTAMP))
2001 error= table->file->ha_delete_row(table->record[1]);
2002 else
2003 {
2004 store_record(table, record[2]);
2005 restore_record(table, record[1]);
2006 table->vers_update_end();
2007 error= table->file->ha_update_row(table->record[1],
2008 table->record[0]);
2009 restore_record(table, record[2]);
2010 }
2011 if (unlikely(error))
2012 goto err;
2013 if (!table->versioned(VERS_TIMESTAMP))
2014 info->deleted++;
2015 else
2016 info->updated++;
2017 if (!table->file->has_transactions())
2018 thd->transaction.stmt.modified_non_trans_table= TRUE;
2019 if (table->triggers &&
2020 table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
2021 TRG_ACTION_AFTER, TRUE))
2022 {
2023 trg_error= 1;
2024 goto ok_or_after_trg_err;
2025 }
2026 /* Let us attempt do write_row() once more */
2027 }
2028 }
2029 }
2030
2031 /*
2032 If more than one iteration of the above while loop is done, from
2033 the second one the row being inserted will have an explicit
2034 value in the autoinc field, which was set at the first call of
2035 handler::update_auto_increment(). This value is saved to avoid
2036 thd->insert_id_for_cur_row becoming 0. Use this saved autoinc
2037 value.
2038 */
2039 if (table->file->insert_id_for_cur_row == 0)
2040 table->file->insert_id_for_cur_row= insert_id_for_cur_row;
2041
2042 /*
2043 Restore column maps if they where replaced during an duplicate key
2044 problem.
2045 */
2046 if (table->read_set != save_read_set ||
2047 table->write_set != save_write_set)
2048 table->column_bitmaps_set(save_read_set, save_write_set);
2049 }
2050 else if (unlikely((error=table->file->ha_write_row(table->record[0]))))
2051 {
2052 DEBUG_SYNC(thd, "write_row_noreplace");
2053 if (!info->ignore ||
2054 table->file->is_fatal_error(error, HA_CHECK_ALL))
2055 goto err;
2056 if (!(thd->variables.old_behavior &
2057 OLD_MODE_NO_DUP_KEY_WARNINGS_WITH_IGNORE))
2058 table->file->print_error(error, MYF(ME_JUST_WARNING));
2059 table->file->restore_auto_increment(prev_insert_id);
2060 goto ok_or_after_trg_err;
2061 }
2062
2063 after_trg_n_copied_inc:
2064 info->copied++;
2065 thd->record_first_successful_insert_id_in_cur_stmt(table->file->insert_id_for_cur_row);
2066 trg_error= (table->triggers &&
2067 table->triggers->process_triggers(thd, TRG_EVENT_INSERT,
2068 TRG_ACTION_AFTER, TRUE));
2069
2070 ok_or_after_trg_err:
2071 if (key)
2072 my_safe_afree(key,table->s->max_unique_length);
2073 if (!table->file->has_transactions())
2074 thd->transaction.stmt.modified_non_trans_table= TRUE;
2075 DBUG_RETURN(trg_error);
2076
2077 err:
2078 info->last_errno= error;
2079 table->file->print_error(error,MYF(0));
2080
2081 before_trg_err:
2082 table->file->restore_auto_increment(prev_insert_id);
2083 if (key)
2084 my_safe_afree(key, table->s->max_unique_length);
2085 table->column_bitmaps_set(save_read_set, save_write_set);
2086 DBUG_RETURN(1);
2087 }
2088
2089
2090 /******************************************************************************
2091 Check that there aren't any null_fields
2092 ******************************************************************************/
2093
2094
check_that_all_fields_are_given_values(THD * thd,TABLE * entry,TABLE_LIST * table_list)2095 int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, TABLE_LIST *table_list)
2096 {
2097 int err= 0;
2098 MY_BITMAP *write_set= entry->write_set;
2099
2100 for (Field **field=entry->field ; *field ; field++)
2101 {
2102 if (!bitmap_is_set(write_set, (*field)->field_index) &&
2103 !(*field)->vers_sys_field() &&
2104 has_no_default_value(thd, *field, table_list) &&
2105 ((*field)->real_type() != MYSQL_TYPE_ENUM))
2106 err=1;
2107 }
2108 return thd->abort_on_warning ? err : 0;
2109 }
2110
2111 /*****************************************************************************
2112 Handling of delayed inserts
2113 A thread is created for each table that one uses with the DELAYED attribute.
2114 *****************************************************************************/
2115
2116 #ifndef EMBEDDED_LIBRARY
2117
2118 class delayed_row :public ilink {
2119 public:
2120 char *record;
2121 enum_duplicates dup;
2122 my_time_t start_time;
2123 ulong start_time_sec_part;
2124 sql_mode_t sql_mode;
2125 bool auto_increment_field_not_null;
2126 bool ignore, log_query, query_start_sec_part_used;
2127 bool stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2128 ulonglong first_successful_insert_id_in_prev_stmt;
2129 ulonglong forced_insert_id;
2130 ulong auto_increment_increment;
2131 ulong auto_increment_offset;
2132 LEX_STRING query;
2133 Time_zone *time_zone;
2134 char *user, *host, *ip;
2135 query_id_t query_id;
2136 my_thread_id thread_id;
2137
delayed_row(LEX_STRING const query_arg,enum_duplicates dup_arg,bool ignore_arg,bool log_query_arg)2138 delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
2139 bool ignore_arg, bool log_query_arg)
2140 : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
2141 forced_insert_id(0), query(query_arg), time_zone(0),
2142 user(0), host(0), ip(0)
2143 {}
~delayed_row()2144 ~delayed_row()
2145 {
2146 my_free(query.str);
2147 my_free(record);
2148 }
2149 };
2150
2151 /**
2152 Delayed_insert - context of a thread responsible for delayed insert
2153 into one table. When processing delayed inserts, we create an own
2154 thread for every distinct table. Later on all delayed inserts directed
2155 into that table are handled by a dedicated thread.
2156 */
2157
2158 class Delayed_insert :public ilink {
2159 uint locks_in_memory;
2160 thr_lock_type delayed_lock;
2161 public:
2162 THD thd;
2163 TABLE *table;
2164 mysql_mutex_t mutex;
2165 mysql_cond_t cond, cond_client;
2166 uint tables_in_use, stacked_inserts;
2167 volatile bool status;
2168 bool retry;
2169 /**
2170 When the handler thread starts, it clones a metadata lock ticket
2171 which protects against GRL and ticket for the table to be inserted.
2172 This is done to allow the deadlock detector to detect deadlocks
2173 resulting from these locks.
2174 Before this is done, the connection thread cannot safely exit
2175 without causing problems for clone_ticket().
2176 Once handler_thread_initialized has been set, it is safe for the
2177 connection thread to exit.
2178 Access to handler_thread_initialized is protected by di->mutex.
2179 */
2180 bool handler_thread_initialized;
2181 COPY_INFO info;
2182 I_List<delayed_row> rows;
2183 ulong group_count;
2184 TABLE_LIST table_list; // Argument
2185 /**
2186 Request for IX metadata lock protecting against GRL which is
2187 passed from connection thread to the handler thread.
2188 */
2189 MDL_request grl_protection;
Delayed_insert(SELECT_LEX * current_select)2190 Delayed_insert(SELECT_LEX *current_select)
2191 :locks_in_memory(0), thd(next_thread_id()),
2192 table(0),tables_in_use(0), stacked_inserts(0),
2193 status(0), retry(0), handler_thread_initialized(FALSE), group_count(0)
2194 {
2195 DBUG_ENTER("Delayed_insert constructor");
2196 thd.security_ctx->user=(char*) delayed_user;
2197 thd.security_ctx->host=(char*) my_localhost;
2198 thd.security_ctx->ip= NULL;
2199 thd.query_id= 0;
2200 strmake_buf(thd.security_ctx->priv_user, thd.security_ctx->user);
2201 thd.current_tablenr=0;
2202 thd.set_command(COM_DELAYED_INSERT);
2203 thd.lex->current_select= current_select;
2204 thd.lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
2205 /*
2206 Prevent changes to global.lock_wait_timeout from affecting
2207 delayed insert threads as any timeouts in delayed inserts
2208 are not communicated to the client.
2209 */
2210 thd.variables.lock_wait_timeout= LONG_TIMEOUT;
2211
2212 bzero((char*) &thd.net, sizeof(thd.net)); // Safety
2213 bzero((char*) &table_list, sizeof(table_list)); // Safety
2214 thd.system_thread= SYSTEM_THREAD_DELAYED_INSERT;
2215 thd.security_ctx->host_or_ip= "";
2216 bzero((char*) &info,sizeof(info));
2217 mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
2218 mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
2219 mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
2220 mysql_mutex_lock(&LOCK_thread_count);
2221 delayed_insert_threads++;
2222 delayed_lock= global_system_variables.low_priority_updates ?
2223 TL_WRITE_LOW_PRIORITY : TL_WRITE;
2224 mysql_mutex_unlock(&LOCK_thread_count);
2225 DBUG_VOID_RETURN;
2226 }
~Delayed_insert()2227 ~Delayed_insert()
2228 {
2229 /* The following is not really needed, but just for safety */
2230 delayed_row *row;
2231 while ((row=rows.get()))
2232 delete row;
2233 if (table)
2234 {
2235 close_thread_tables(&thd);
2236 thd.mdl_context.release_transactional_locks();
2237 }
2238 mysql_mutex_destroy(&mutex);
2239 mysql_cond_destroy(&cond);
2240 mysql_cond_destroy(&cond_client);
2241
2242 /*
2243 We could use unlink_not_visible_threads() here, but as
2244 delayed_insert_threads also needs to be protected by
2245 the LOCK_thread_count mutex, we open code this.
2246 */
2247 mysql_mutex_lock(&LOCK_thread_count);
2248 thd.unlink(); // Must be unlinked under lock
2249 delayed_insert_threads--;
2250 mysql_mutex_unlock(&LOCK_thread_count);
2251
2252 my_free(thd.query());
2253 thd.security_ctx->user= 0;
2254 thd.security_ctx->host= 0;
2255 }
2256
2257 /* The following is for checking when we can delete ourselves */
lock()2258 inline void lock()
2259 {
2260 locks_in_memory++; // Assume LOCK_delay_insert
2261 }
unlock()2262 void unlock()
2263 {
2264 mysql_mutex_lock(&LOCK_delayed_insert);
2265 if (!--locks_in_memory)
2266 {
2267 mysql_mutex_lock(&mutex);
2268 if (thd.killed && ! stacked_inserts && ! tables_in_use)
2269 {
2270 mysql_cond_signal(&cond);
2271 status=1;
2272 }
2273 mysql_mutex_unlock(&mutex);
2274 }
2275 mysql_mutex_unlock(&LOCK_delayed_insert);
2276 }
lock_count()2277 inline uint lock_count() { return locks_in_memory; }
2278
2279 TABLE* get_local_table(THD* client_thd);
2280 bool open_and_lock_table();
2281 bool handle_inserts(void);
2282 };
2283
2284
2285 I_List<Delayed_insert> delayed_threads;
2286
2287
2288 /**
2289 Return an instance of delayed insert thread that can handle
2290 inserts into a given table, if it exists. Otherwise return NULL.
2291 */
2292
2293 static
find_handler(THD * thd,TABLE_LIST * table_list)2294 Delayed_insert *find_handler(THD *thd, TABLE_LIST *table_list)
2295 {
2296 THD_STAGE_INFO(thd, stage_waiting_for_delay_list);
2297 mysql_mutex_lock(&LOCK_delayed_insert); // Protect master list
2298 I_List_iterator<Delayed_insert> it(delayed_threads);
2299 Delayed_insert *di;
2300 while ((di= it++))
2301 {
2302 if (!cmp(&table_list->db, &di->table_list.db) &&
2303 !cmp(&table_list->table_name, &di->table_list.table_name))
2304 {
2305 di->lock();
2306 break;
2307 }
2308 }
2309 mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2310 return di;
2311 }
2312
2313
2314 /**
2315 Attempt to find or create a delayed insert thread to handle inserts
2316 into this table.
2317
2318 @return In case of success, table_list->table points to a local copy
2319 of the delayed table or is set to NULL, which indicates a
2320 request for lock upgrade. In case of failure, value of
2321 table_list->table is undefined.
2322 @retval TRUE - this thread ran out of resources OR
2323 - a newly created delayed insert thread ran out of
2324 resources OR
2325 - the created thread failed to open and lock the table
2326 (e.g. because it does not exist) OR
2327 - the table opened in the created thread turned out to
2328 be a view
2329 @retval FALSE - table successfully opened OR
2330 - too many delayed insert threads OR
2331 - the table has triggers and we have to fall back to
2332 a normal INSERT
2333 Two latter cases indicate a request for lock upgrade.
2334
2335 XXX: why do we regard INSERT DELAYED into a view as an error and
2336 do not simply perform a lock upgrade?
2337
2338 TODO: The approach with using two mutexes to work with the
2339 delayed thread list -- LOCK_delayed_insert and
2340 LOCK_delayed_create -- is redundant, and we only need one of
2341 them to protect the list. The reason we have two locks is that
2342 we do not want to block look-ups in the list while we're waiting
2343 for the newly created thread to open the delayed table. However,
2344 this wait itself is redundant -- we always call get_local_table
2345 later on, and there wait again until the created thread acquires
2346 a table lock.
2347
2348 As is redundant the concept of locks_in_memory, since we already
2349 have another counter with similar semantics - tables_in_use,
2350 both of them are devoted to counting the number of producers for
2351 a given consumer (delayed insert thread), only at different
2352 stages of producer-consumer relationship.
2353
2354 The 'status' variable in Delayed_insert is redundant
2355 too, since there is already di->stacked_inserts.
2356 */
2357
2358 static
delayed_get_table(THD * thd,MDL_request * grl_protection_request,TABLE_LIST * table_list)2359 bool delayed_get_table(THD *thd, MDL_request *grl_protection_request,
2360 TABLE_LIST *table_list)
2361 {
2362 int error;
2363 Delayed_insert *di;
2364 DBUG_ENTER("delayed_get_table");
2365
2366 /* Must be set in the parser */
2367 DBUG_ASSERT(table_list->db.str);
2368
2369 /* Find the thread which handles this table. */
2370 if (!(di= find_handler(thd, table_list)))
2371 {
2372 /*
2373 No match. Create a new thread to handle the table, but
2374 no more than max_insert_delayed_threads.
2375 */
2376 if (delayed_insert_threads >= thd->variables.max_insert_delayed_threads)
2377 DBUG_RETURN(0);
2378 THD_STAGE_INFO(thd, stage_creating_delayed_handler);
2379 mysql_mutex_lock(&LOCK_delayed_create);
2380 /*
2381 The first search above was done without LOCK_delayed_create.
2382 Another thread might have created the handler in between. Search again.
2383 */
2384 if (! (di= find_handler(thd, table_list)))
2385 {
2386 if (!(di= new Delayed_insert(thd->lex->current_select)))
2387 goto end_create;
2388
2389 /*
2390 Annotating delayed inserts is not supported.
2391 */
2392 di->thd.variables.binlog_annotate_row_events= 0;
2393
2394 di->thd.set_db(&table_list->db);
2395 di->thd.set_query(my_strndup(table_list->table_name.str,
2396 table_list->table_name.length,
2397 MYF(MY_WME | ME_FATALERROR)),
2398 table_list->table_name.length, system_charset_info);
2399 if (di->thd.db.str == NULL || di->thd.query() == NULL)
2400 {
2401 /* The error is reported */
2402 delete di;
2403 goto end_create;
2404 }
2405 di->table_list= *table_list; // Needed to open table
2406 /* Replace volatile strings with local copies */
2407 di->table_list.alias.str= di->table_list.table_name.str= di->thd.query();
2408 di->table_list.alias.length= di->table_list.table_name.length= di->thd.query_length();
2409 di->table_list.db= di->thd.db;
2410 /* We need the tickets so that they can be cloned in handle_delayed_insert */
2411 di->grl_protection.init(MDL_key::GLOBAL, "", "",
2412 MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT);
2413 di->grl_protection.ticket= grl_protection_request->ticket;
2414 init_mdl_requests(&di->table_list);
2415 di->table_list.mdl_request.ticket= table_list->mdl_request.ticket;
2416
2417 di->lock();
2418 mysql_mutex_lock(&di->mutex);
2419 if ((error= mysql_thread_create(key_thread_delayed_insert,
2420 &di->thd.real_id, &connection_attrib,
2421 handle_delayed_insert, (void*) di)))
2422 {
2423 DBUG_PRINT("error",
2424 ("Can't create thread to handle delayed insert (error %d)",
2425 error));
2426 mysql_mutex_unlock(&di->mutex);
2427 di->unlock();
2428 delete di;
2429 my_error(ER_CANT_CREATE_THREAD, MYF(ME_FATALERROR), error);
2430 goto end_create;
2431 }
2432
2433 /*
2434 Wait until table is open unless the handler thread or the connection
2435 thread has been killed. Note that we in all cases must wait until the
2436 handler thread has been properly initialized before exiting. Otherwise
2437 we risk doing clone_ticket() on a ticket that is no longer valid.
2438 */
2439 THD_STAGE_INFO(thd, stage_waiting_for_handler_open);
2440 while (!di->handler_thread_initialized ||
2441 (!di->thd.killed && !di->table && !thd->killed))
2442 {
2443 mysql_cond_wait(&di->cond_client, &di->mutex);
2444 }
2445 mysql_mutex_unlock(&di->mutex);
2446 THD_STAGE_INFO(thd, stage_got_old_table);
2447 if (thd->killed)
2448 {
2449 di->unlock();
2450 goto end_create;
2451 }
2452 if (di->thd.killed)
2453 {
2454 if (di->thd.is_error() && ! di->retry)
2455 {
2456 /*
2457 Copy the error message. Note that we don't treat fatal
2458 errors in the delayed thread as fatal errors in the
2459 main thread. If delayed thread was killed, we don't
2460 want to send "Server shutdown in progress" in the
2461 INSERT THREAD.
2462 */
2463 my_message(di->thd.get_stmt_da()->sql_errno(),
2464 di->thd.get_stmt_da()->message(),
2465 MYF(0));
2466 }
2467 di->unlock();
2468 goto end_create;
2469 }
2470 mysql_mutex_lock(&LOCK_delayed_insert);
2471 delayed_threads.append(di);
2472 mysql_mutex_unlock(&LOCK_delayed_insert);
2473 }
2474 mysql_mutex_unlock(&LOCK_delayed_create);
2475 }
2476
2477 mysql_mutex_lock(&di->mutex);
2478 table_list->table= di->get_local_table(thd);
2479 mysql_mutex_unlock(&di->mutex);
2480 if (table_list->table)
2481 {
2482 DBUG_ASSERT(! thd->is_error());
2483 thd->di= di;
2484 }
2485 /* Unlock the delayed insert object after its last access. */
2486 di->unlock();
2487 DBUG_RETURN((table_list->table == NULL));
2488
2489 end_create:
2490 mysql_mutex_unlock(&LOCK_delayed_create);
2491 DBUG_RETURN(thd->is_error());
2492 }
2493
2494 #define memdup_vcol(thd, vcol) \
2495 if (vcol) \
2496 { \
2497 (vcol)= (Virtual_column_info*)(thd)->memdup((vcol), sizeof(*(vcol))); \
2498 (vcol)->expr= NULL; \
2499 }
2500
2501 /**
2502 As we can't let many client threads modify the same TABLE
2503 structure of the dedicated delayed insert thread, we create an
2504 own structure for each client thread. This includes a row
2505 buffer to save the column values and new fields that point to
2506 the new row buffer. The memory is allocated in the client
2507 thread and is freed automatically.
2508
2509 @pre This function is called from the client thread. Delayed
2510 insert thread mutex must be acquired before invoking this
2511 function.
2512
2513 @return Not-NULL table object on success. NULL in case of an error,
2514 which is set in client_thd.
2515 */
2516
get_local_table(THD * client_thd)2517 TABLE *Delayed_insert::get_local_table(THD* client_thd)
2518 {
2519 my_ptrdiff_t adjust_ptrs;
2520 Field **field,**org_field, *found_next_number_field;
2521 TABLE *copy;
2522 TABLE_SHARE *share;
2523 uchar *bitmap;
2524 char *copy_tmp;
2525 uint bitmaps_used;
2526 DBUG_ENTER("Delayed_insert::get_local_table");
2527
2528 /* First request insert thread to get a lock */
2529 status=1;
2530 tables_in_use++;
2531 if (!thd.lock) // Table is not locked
2532 {
2533 THD_STAGE_INFO(client_thd, stage_waiting_for_handler_lock);
2534 mysql_cond_signal(&cond); // Tell handler to lock table
2535 while (!thd.killed && !thd.lock && ! client_thd->killed)
2536 {
2537 mysql_cond_wait(&cond_client, &mutex);
2538 }
2539 THD_STAGE_INFO(client_thd, stage_got_handler_lock);
2540 if (client_thd->killed)
2541 goto error;
2542 if (thd.killed)
2543 {
2544 /*
2545 Copy the error message. Note that we don't treat fatal
2546 errors in the delayed thread as fatal errors in the
2547 main thread. If delayed thread was killed, we don't
2548 want to send "Server shutdown in progress" in the
2549 INSERT THREAD.
2550
2551 The thread could be killed with an error message if
2552 di->handle_inserts() or di->open_and_lock_table() fails.
2553 The thread could be killed without an error message if
2554 killed using THD::notify_shared_lock() or
2555 kill_delayed_threads_for_table().
2556 */
2557 if (!thd.is_error())
2558 my_message(ER_QUERY_INTERRUPTED, ER_THD(&thd, ER_QUERY_INTERRUPTED),
2559 MYF(0));
2560 else
2561 my_message(thd.get_stmt_da()->sql_errno(),
2562 thd.get_stmt_da()->message(), MYF(0));
2563 goto error;
2564 }
2565 }
2566 share= table->s;
2567
2568 /*
2569 Allocate memory for the TABLE object, the field pointers array, and
2570 one record buffer of reclength size. Normally a table has three
2571 record buffers of rec_buff_length size, which includes alignment
2572 bytes. Since the table copy is used for creating one record only,
2573 the other record buffers and alignment are unnecessary.
2574 */
2575 THD_STAGE_INFO(client_thd, stage_allocating_local_table);
2576 copy_tmp= (char*) client_thd->alloc(sizeof(*copy)+
2577 (share->fields+1)*sizeof(Field**)+
2578 share->reclength +
2579 share->column_bitmap_size*4);
2580 if (!copy_tmp)
2581 goto error;
2582
2583 /* Copy the TABLE object. */
2584 copy= new (copy_tmp) TABLE;
2585 *copy= *table;
2586
2587 /* We don't need to change the file handler here */
2588 /* Assign the pointers for the field pointers array and the record. */
2589 field= copy->field= (Field**) (copy + 1);
2590 bitmap= (uchar*) (field + share->fields + 1);
2591 copy->record[0]= (bitmap + share->column_bitmap_size*4);
2592 memcpy((char*) copy->record[0], (char*) table->record[0], share->reclength);
2593 if (share->default_fields || share->default_expressions)
2594 {
2595 copy->default_field= (Field**)
2596 client_thd->alloc((share->default_fields +
2597 share->default_expressions + 1)*
2598 sizeof(Field*));
2599 if (!copy->default_field)
2600 goto error;
2601 }
2602
2603 if (share->virtual_fields)
2604 {
2605 copy->vfield= (Field **) client_thd->alloc((share->virtual_fields+1)*
2606 sizeof(Field*));
2607 if (!copy->vfield)
2608 goto error;
2609 }
2610 copy->expr_arena= NULL;
2611
2612 /* Ensure we don't use the table list of the original table */
2613 copy->pos_in_table_list= 0;
2614
2615 /*
2616 Make a copy of all fields.
2617 The copied fields need to point into the copied record. This is done
2618 by copying the field objects with their old pointer values and then
2619 "move" the pointers by the distance between the original and copied
2620 records. That way we preserve the relative positions in the records.
2621 */
2622 adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
2623 found_next_number_field= table->found_next_number_field;
2624 for (org_field= table->field; *org_field; org_field++, field++)
2625 {
2626 if (!(*field= (*org_field)->make_new_field(client_thd->mem_root, copy, 1)))
2627 goto error;
2628 (*field)->unireg_check= (*org_field)->unireg_check;
2629 (*field)->invisible= (*org_field)->invisible;
2630 (*field)->orig_table= copy; // Remove connection
2631 (*field)->move_field_offset(adjust_ptrs); // Point at copy->record[0]
2632 memdup_vcol(client_thd, (*field)->vcol_info);
2633 memdup_vcol(client_thd, (*field)->default_value);
2634 memdup_vcol(client_thd, (*field)->check_constraint);
2635 if (*org_field == found_next_number_field)
2636 (*field)->table->found_next_number_field= *field;
2637 }
2638 *field=0;
2639
2640 if (share->virtual_fields || share->default_expressions ||
2641 share->default_fields)
2642 {
2643 bool error_reported= FALSE;
2644 if (unlikely(!(copy->def_vcol_set=
2645 (MY_BITMAP*) alloc_root(client_thd->mem_root,
2646 sizeof(MY_BITMAP)))))
2647 goto error;
2648 if (unlikely(parse_vcol_defs(client_thd, client_thd->mem_root, copy,
2649 &error_reported,
2650 VCOL_INIT_DEPENDENCY_FAILURE_IS_WARNING)))
2651 goto error;
2652 }
2653
2654 switch_defaults_to_nullable_trigger_fields(copy);
2655
2656 /* Adjust in_use for pointing to client thread */
2657 copy->in_use= client_thd;
2658
2659 /* Adjust lock_count. This table object is not part of a lock. */
2660 copy->lock_count= 0;
2661
2662 /* Adjust bitmaps */
2663 copy->def_read_set.bitmap= (my_bitmap_map*) bitmap;
2664 copy->def_write_set.bitmap= ((my_bitmap_map*)
2665 (bitmap + share->column_bitmap_size));
2666 bitmaps_used= 2;
2667 if (share->virtual_fields)
2668 {
2669 my_bitmap_init(copy->def_vcol_set,
2670 (my_bitmap_map*) (bitmap +
2671 bitmaps_used*share->column_bitmap_size),
2672 share->fields, FALSE);
2673 bitmaps_used++;
2674 copy->vcol_set= copy->def_vcol_set;
2675 }
2676 if (share->default_fields || share->default_expressions)
2677 {
2678 my_bitmap_init(©->has_value_set,
2679 (my_bitmap_map*) (bitmap +
2680 bitmaps_used*share->column_bitmap_size),
2681 share->fields, FALSE);
2682 }
2683 copy->tmp_set.bitmap= 0; // To catch errors
2684 bzero((char*) bitmap, share->column_bitmap_size * bitmaps_used);
2685 copy->read_set= ©->def_read_set;
2686 copy->write_set= ©->def_write_set;
2687
2688 DBUG_RETURN(copy);
2689
2690 /* Got fatal error */
2691 error:
2692 tables_in_use--;
2693 mysql_cond_signal(&cond); // Inform thread about abort
2694 DBUG_RETURN(0);
2695 }
2696
2697
2698 /* Put a question in queue */
2699
2700 static
write_delayed(THD * thd,TABLE * table,enum_duplicates duplic,LEX_STRING query,bool ignore,bool log_on)2701 int write_delayed(THD *thd, TABLE *table, enum_duplicates duplic,
2702 LEX_STRING query, bool ignore, bool log_on)
2703 {
2704 delayed_row *row= 0;
2705 Delayed_insert *di=thd->di;
2706 const Discrete_interval *forced_auto_inc;
2707 size_t user_len, host_len, ip_length;
2708 DBUG_ENTER("write_delayed");
2709 DBUG_PRINT("enter", ("query = '%s' length %lu", query.str,
2710 (ulong) query.length));
2711
2712 THD_STAGE_INFO(thd, stage_waiting_for_handler_insert);
2713 mysql_mutex_lock(&di->mutex);
2714 while (di->stacked_inserts >= delayed_queue_size && !thd->killed)
2715 mysql_cond_wait(&di->cond_client, &di->mutex);
2716 THD_STAGE_INFO(thd, stage_storing_row_into_queue);
2717
2718 if (thd->killed)
2719 goto err;
2720
2721 /*
2722 Take a copy of the query string, if there is any. The string will
2723 be free'ed when the row is destroyed. If there is no query string,
2724 we don't do anything special.
2725 */
2726
2727 if (query.str)
2728 {
2729 char *str;
2730 if (!(str= my_strndup(query.str, query.length, MYF(MY_WME))))
2731 goto err;
2732 query.str= str;
2733 }
2734 row= new delayed_row(query, duplic, ignore, log_on);
2735 if (row == NULL)
2736 {
2737 my_free(query.str);
2738 goto err;
2739 }
2740
2741 user_len= host_len= ip_length= 0;
2742 row->user= row->host= row->ip= NULL;
2743 if (thd->security_ctx)
2744 {
2745 if (thd->security_ctx->user)
2746 user_len= strlen(thd->security_ctx->user) + 1;
2747 if (thd->security_ctx->host)
2748 host_len= strlen(thd->security_ctx->host) + 1;
2749 if (thd->security_ctx->ip)
2750 ip_length= strlen(thd->security_ctx->ip) + 1;
2751 }
2752 /* This can't be THREAD_SPECIFIC as it's freed in delayed thread */
2753 if (!(row->record= (char*) my_malloc(table->s->reclength +
2754 user_len + host_len + ip_length,
2755 MYF(MY_WME))))
2756 goto err;
2757 memcpy(row->record, table->record[0], table->s->reclength);
2758
2759 if (thd->security_ctx)
2760 {
2761 if (thd->security_ctx->user)
2762 {
2763 row->user= row->record + table->s->reclength;
2764 memcpy(row->user, thd->security_ctx->user, user_len);
2765 }
2766 if (thd->security_ctx->host)
2767 {
2768 row->host= row->record + table->s->reclength + user_len;
2769 memcpy(row->host, thd->security_ctx->host, host_len);
2770 }
2771 if (thd->security_ctx->ip)
2772 {
2773 row->ip= row->record + table->s->reclength + user_len + host_len;
2774 memcpy(row->ip, thd->security_ctx->ip, ip_length);
2775 }
2776 }
2777 row->query_id= thd->query_id;
2778 row->thread_id= thd->thread_id;
2779
2780 row->start_time= thd->start_time;
2781 row->start_time_sec_part= thd->start_time_sec_part;
2782 row->query_start_sec_part_used= thd->query_start_sec_part_used;
2783 /*
2784 those are for the binlog: LAST_INSERT_ID() has been evaluated at this
2785 time, so record does not need it, but statement-based binlogging of the
2786 INSERT will need when the row is actually inserted.
2787 As for SET INSERT_ID, DELAYED does not honour it (BUG#20830).
2788 */
2789 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt=
2790 thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
2791 row->first_successful_insert_id_in_prev_stmt=
2792 thd->first_successful_insert_id_in_prev_stmt;
2793
2794 /* Add session variable timezone
2795 Time_zone object will not be freed even the thread is ended.
2796 So we can get time_zone object from thread which handling delayed statement.
2797 See the comment of my_tz_find() for detail.
2798 */
2799 if (thd->time_zone_used)
2800 {
2801 row->time_zone = thd->variables.time_zone;
2802 }
2803 else
2804 {
2805 row->time_zone = NULL;
2806 }
2807 /* Copy session variables. */
2808 row->auto_increment_increment= thd->variables.auto_increment_increment;
2809 row->auto_increment_offset= thd->variables.auto_increment_offset;
2810 row->sql_mode= thd->variables.sql_mode;
2811 row->auto_increment_field_not_null= table->auto_increment_field_not_null;
2812
2813 /* Copy the next forced auto increment value, if any. */
2814 if ((forced_auto_inc= thd->auto_inc_intervals_forced.get_next()))
2815 {
2816 row->forced_insert_id= forced_auto_inc->minimum();
2817 DBUG_PRINT("delayed", ("transmitting auto_inc: %lu",
2818 (ulong) row->forced_insert_id));
2819 }
2820
2821 di->rows.push_back(row);
2822 di->stacked_inserts++;
2823 di->status=1;
2824 if (table->s->blob_fields)
2825 unlink_blobs(table);
2826 mysql_cond_signal(&di->cond);
2827
2828 thread_safe_increment(delayed_rows_in_use,&LOCK_delayed_status);
2829 mysql_mutex_unlock(&di->mutex);
2830 DBUG_RETURN(0);
2831
2832 err:
2833 delete row;
2834 mysql_mutex_unlock(&di->mutex);
2835 DBUG_RETURN(1);
2836 }
2837
2838 /**
2839 Signal the delayed insert thread that this user connection
2840 is finished using it for this statement.
2841 */
2842
end_delayed_insert(THD * thd)2843 static void end_delayed_insert(THD *thd)
2844 {
2845 DBUG_ENTER("end_delayed_insert");
2846 Delayed_insert *di=thd->di;
2847 mysql_mutex_lock(&di->mutex);
2848 DBUG_PRINT("info",("tables in use: %d",di->tables_in_use));
2849 if (!--di->tables_in_use || di->thd.killed)
2850 { // Unlock table
2851 di->status=1;
2852 mysql_cond_signal(&di->cond);
2853 }
2854 mysql_mutex_unlock(&di->mutex);
2855 DBUG_VOID_RETURN;
2856 }
2857
2858
2859 /* We kill all delayed threads when doing flush-tables */
2860
kill_delayed_threads(void)2861 void kill_delayed_threads(void)
2862 {
2863 DBUG_ENTER("kill_delayed_threads");
2864 mysql_mutex_lock(&LOCK_delayed_insert); // For unlink from list
2865
2866 I_List_iterator<Delayed_insert> it(delayed_threads);
2867 Delayed_insert *di;
2868 while ((di= it++))
2869 {
2870 mysql_mutex_lock(&di->thd.LOCK_thd_kill);
2871 if (di->thd.killed < KILL_CONNECTION)
2872 di->thd.set_killed_no_mutex(KILL_CONNECTION);
2873 if (di->thd.mysys_var)
2874 {
2875 mysql_mutex_lock(&di->thd.mysys_var->mutex);
2876 if (di->thd.mysys_var->current_cond)
2877 {
2878 /*
2879 We need the following test because the main mutex may be locked
2880 in handle_delayed_insert()
2881 */
2882 if (&di->mutex != di->thd.mysys_var->current_mutex)
2883 mysql_mutex_lock(di->thd.mysys_var->current_mutex);
2884 mysql_cond_broadcast(di->thd.mysys_var->current_cond);
2885 if (&di->mutex != di->thd.mysys_var->current_mutex)
2886 mysql_mutex_unlock(di->thd.mysys_var->current_mutex);
2887 }
2888 mysql_mutex_unlock(&di->thd.mysys_var->mutex);
2889 }
2890 mysql_mutex_unlock(&di->thd.LOCK_thd_kill);
2891 }
2892 mysql_mutex_unlock(&LOCK_delayed_insert); // For unlink from list
2893 DBUG_VOID_RETURN;
2894 }
2895
2896
2897 /**
2898 A strategy for the prelocking algorithm which prevents the
2899 delayed insert thread from opening tables with engines which
2900 do not support delayed inserts.
2901
2902 Particularly it allows to abort open_tables() as soon as we
2903 discover that we have opened a MERGE table, without acquiring
2904 metadata locks on underlying tables.
2905 */
2906
2907 class Delayed_prelocking_strategy : public Prelocking_strategy
2908 {
2909 public:
2910 virtual bool handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2911 Sroutine_hash_entry *rt, sp_head *sp,
2912 bool *need_prelocking);
2913 virtual bool handle_table(THD *thd, Query_tables_list *prelocking_ctx,
2914 TABLE_LIST *table_list, bool *need_prelocking);
2915 virtual bool handle_view(THD *thd, Query_tables_list *prelocking_ctx,
2916 TABLE_LIST *table_list, bool *need_prelocking);
2917 };
2918
2919
2920 bool Delayed_prelocking_strategy::
handle_table(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)2921 handle_table(THD *thd, Query_tables_list *prelocking_ctx,
2922 TABLE_LIST *table_list, bool *need_prelocking)
2923 {
2924 DBUG_ASSERT(table_list->lock_type == TL_WRITE_DELAYED);
2925
2926 if (!(table_list->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
2927 {
2928 my_error(ER_DELAYED_NOT_SUPPORTED, MYF(0), table_list->table_name.str);
2929 return TRUE;
2930 }
2931 return FALSE;
2932 }
2933
2934
2935 bool Delayed_prelocking_strategy::
handle_routine(THD * thd,Query_tables_list * prelocking_ctx,Sroutine_hash_entry * rt,sp_head * sp,bool * need_prelocking)2936 handle_routine(THD *thd, Query_tables_list *prelocking_ctx,
2937 Sroutine_hash_entry *rt, sp_head *sp,
2938 bool *need_prelocking)
2939 {
2940 /* LEX used by the delayed insert thread has no routines. */
2941 DBUG_ASSERT(0);
2942 return FALSE;
2943 }
2944
2945
2946 bool Delayed_prelocking_strategy::
handle_view(THD * thd,Query_tables_list * prelocking_ctx,TABLE_LIST * table_list,bool * need_prelocking)2947 handle_view(THD *thd, Query_tables_list *prelocking_ctx,
2948 TABLE_LIST *table_list, bool *need_prelocking)
2949 {
2950 /* We don't open views in the delayed insert thread. */
2951 DBUG_ASSERT(0);
2952 return FALSE;
2953 }
2954
2955
2956 /**
2957 Open and lock table for use by delayed thread and check that
2958 this table is suitable for delayed inserts.
2959
2960 @retval FALSE - Success.
2961 @retval TRUE - Failure.
2962 */
2963
open_and_lock_table()2964 bool Delayed_insert::open_and_lock_table()
2965 {
2966 Delayed_prelocking_strategy prelocking_strategy;
2967
2968 /*
2969 Use special prelocking strategy to get ER_DELAYED_NOT_SUPPORTED
2970 error for tables with engines which don't support delayed inserts.
2971
2972 We can't do auto-repair in insert delayed thread, as it would hang
2973 when trying to an exclusive MDL_LOCK on the table during repair
2974 as the connection thread has a SHARED_WRITE lock.
2975 */
2976 if (!(table= open_n_lock_single_table(&thd, &table_list,
2977 TL_WRITE_DELAYED,
2978 MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
2979 MYSQL_OPEN_IGNORE_REPAIR,
2980 &prelocking_strategy)))
2981 {
2982 /* If table was crashed, then upper level should retry open+repair */
2983 retry= table_list.crashed;
2984 thd.fatal_error(); // Abort waiting inserts
2985 return TRUE;
2986 }
2987
2988 if (table->triggers || table->check_constraints)
2989 {
2990 /*
2991 Table has triggers or check constraints. This is not an error, but we do
2992 not support these with delayed insert. Terminate the delayed
2993 thread without an error and thus request lock upgrade.
2994 */
2995 return TRUE;
2996 }
2997 table->copy_blobs= 1;
2998 return FALSE;
2999 }
3000
3001
3002 /*
3003 * Create a new delayed insert thread
3004 */
3005
handle_delayed_insert(void * arg)3006 pthread_handler_t handle_delayed_insert(void *arg)
3007 {
3008 Delayed_insert *di=(Delayed_insert*) arg;
3009 THD *thd= &di->thd;
3010
3011 pthread_detach_this_thread();
3012 /* Add thread to THD list so that's it's visible in 'show processlist' */
3013 thd->set_start_time();
3014 add_to_active_threads(thd);
3015 if (abort_loop)
3016 thd->set_killed(KILL_CONNECTION);
3017 else
3018 thd->reset_killed();
3019
3020 mysql_thread_set_psi_id(thd->thread_id);
3021
3022 /*
3023 Wait until the client runs into mysql_cond_wait(),
3024 where we free it after the table is opened and di linked in the list.
3025 If we did not wait here, the client might detect the opened table
3026 before it is linked to the list. It would release LOCK_delayed_create
3027 and allow another thread to create another handler for the same table,
3028 since it does not find one in the list.
3029 */
3030 mysql_mutex_lock(&di->mutex);
3031 if (my_thread_init())
3032 {
3033 /* Can't use my_error since store_globals has not yet been called */
3034 thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3035 di->handler_thread_initialized= TRUE;
3036 }
3037 else
3038 {
3039 DBUG_ENTER("handle_delayed_insert");
3040 thd->thread_stack= (char*) &thd;
3041 if (init_thr_lock() || thd->store_globals())
3042 {
3043 /* Can't use my_error since store_globals has perhaps failed */
3044 thd->get_stmt_da()->set_error_status(ER_OUT_OF_RESOURCES);
3045 di->handler_thread_initialized= TRUE;
3046 thd->fatal_error();
3047 goto err;
3048 }
3049
3050 thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
3051
3052 /*
3053 INSERT DELAYED has to go to row-based format because the time
3054 at which rows are inserted cannot be determined in mixed mode.
3055 */
3056 thd->set_current_stmt_binlog_format_row_if_mixed();
3057
3058 /*
3059 Clone tickets representing protection against GRL and the lock on
3060 the target table for the insert and add them to the list of granted
3061 metadata locks held by the handler thread. This is safe since the
3062 handler thread is not holding nor waiting on any metadata locks.
3063 */
3064 if (thd->mdl_context.clone_ticket(&di->grl_protection) ||
3065 thd->mdl_context.clone_ticket(&di->table_list.mdl_request))
3066 {
3067 thd->release_transactional_locks();
3068 di->handler_thread_initialized= TRUE;
3069 goto err;
3070 }
3071
3072 /*
3073 Now that the ticket has been cloned, it is safe for the connection
3074 thread to exit.
3075 */
3076 di->handler_thread_initialized= TRUE;
3077 di->table_list.mdl_request.ticket= NULL;
3078
3079 if (di->open_and_lock_table())
3080 goto err;
3081
3082 /*
3083 INSERT DELAYED generally expects thd->lex->current_select to be NULL,
3084 since this is not an attribute of the current thread. This can lead to
3085 problems if the thread that spawned the current one disconnects.
3086 current_select will then point to freed memory. But current_select is
3087 required to resolve the partition function. So, after fulfilling that
3088 requirement, we set the current_select to 0.
3089 */
3090 thd->lex->current_select= NULL;
3091
3092 /* Tell client that the thread is initialized */
3093 mysql_cond_signal(&di->cond_client);
3094
3095 /*
3096 Inform mdl that it needs to call mysql_lock_abort to abort locks
3097 for delayed insert.
3098 */
3099 thd->mdl_context.set_needs_thr_lock_abort(TRUE);
3100
3101 di->table->mark_columns_needed_for_insert();
3102 /* Mark all columns for write as we don't know which columns we get from user */
3103 bitmap_set_all(di->table->write_set);
3104
3105 /* Now wait until we get an insert or lock to handle */
3106 /* We will not abort as long as a client thread uses this thread */
3107
3108 for (;;)
3109 {
3110 if (thd->killed)
3111 {
3112 uint lock_count;
3113 DBUG_PRINT("delayed", ("Insert delayed killed"));
3114 /*
3115 Remove this from delay insert list so that no one can request a
3116 table from this
3117 */
3118 mysql_mutex_unlock(&di->mutex);
3119 mysql_mutex_lock(&LOCK_delayed_insert);
3120 di->unlink();
3121 lock_count=di->lock_count();
3122 mysql_mutex_unlock(&LOCK_delayed_insert);
3123 mysql_mutex_lock(&di->mutex);
3124 if (!lock_count && !di->tables_in_use && !di->stacked_inserts &&
3125 !thd->lock)
3126 break; // Time to die
3127 }
3128
3129 /* Shouldn't wait if killed or an insert is waiting. */
3130 DBUG_PRINT("delayed",
3131 ("thd->killed: %d di->status: %d di->stacked_inserts: %d",
3132 thd->killed, di->status, di->stacked_inserts));
3133 if (!thd->killed && !di->status && !di->stacked_inserts)
3134 {
3135 struct timespec abstime;
3136 set_timespec(abstime, delayed_insert_timeout);
3137
3138 /* Information for pthread_kill */
3139 mysql_mutex_unlock(&di->mutex);
3140 mysql_mutex_lock(&di->thd.mysys_var->mutex);
3141 di->thd.mysys_var->current_mutex= &di->mutex;
3142 di->thd.mysys_var->current_cond= &di->cond;
3143 mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3144 mysql_mutex_lock(&di->mutex);
3145 THD_STAGE_INFO(&(di->thd), stage_waiting_for_insert);
3146
3147 DBUG_PRINT("info",("Waiting for someone to insert rows"));
3148 while (!thd->killed && !di->status)
3149 {
3150 int error;
3151 mysql_audit_release(thd);
3152 error= mysql_cond_timedwait(&di->cond, &di->mutex, &abstime);
3153 #ifdef EXTRA_DEBUG
3154 if (error && error != EINTR && error != ETIMEDOUT)
3155 {
3156 fprintf(stderr, "Got error %d from mysql_cond_timedwait\n", error);
3157 DBUG_PRINT("error", ("Got error %d from mysql_cond_timedwait",
3158 error));
3159 }
3160 #endif
3161 if (error == ETIMEDOUT || error == ETIME)
3162 thd->set_killed(KILL_CONNECTION);
3163 }
3164 /* We can't lock di->mutex and mysys_var->mutex at the same time */
3165 mysql_mutex_unlock(&di->mutex);
3166 mysql_mutex_lock(&di->thd.mysys_var->mutex);
3167 di->thd.mysys_var->current_mutex= 0;
3168 di->thd.mysys_var->current_cond= 0;
3169 mysql_mutex_unlock(&di->thd.mysys_var->mutex);
3170 mysql_mutex_lock(&di->mutex);
3171 }
3172 DBUG_PRINT("delayed",
3173 ("thd->killed: %d di->tables_in_use: %d thd->lock: %d",
3174 thd->killed, di->tables_in_use, thd->lock != 0));
3175
3176 if (di->tables_in_use && ! thd->lock && !thd->killed)
3177 {
3178 /*
3179 Request for new delayed insert.
3180 Lock the table, but avoid to be blocked by a global read lock.
3181 If we got here while a global read lock exists, then one or more
3182 inserts started before the lock was requested. These are allowed
3183 to complete their work before the server returns control to the
3184 client which requested the global read lock. The delayed insert
3185 handler will close the table and finish when the outstanding
3186 inserts are done.
3187 */
3188 if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1, 0)))
3189 {
3190 /* Fatal error */
3191 thd->set_killed(KILL_CONNECTION);
3192 }
3193 mysql_cond_broadcast(&di->cond_client);
3194 }
3195 if (di->stacked_inserts)
3196 {
3197 delayed_row *row;
3198 I_List_iterator<delayed_row> it(di->rows);
3199 my_thread_id cur_thd= di->thd.thread_id;
3200
3201 while ((row= it++))
3202 {
3203 if (cur_thd != row->thread_id)
3204 {
3205 mysql_audit_external_lock_ex(&di->thd, row->thread_id,
3206 row->user, row->host, row->ip, row->query_id,
3207 di->table->s, F_WRLCK);
3208 cur_thd= row->thread_id;
3209 }
3210 }
3211 if (di->handle_inserts())
3212 {
3213 /* Some fatal error */
3214 thd->set_killed(KILL_CONNECTION);
3215 }
3216 }
3217 di->status=0;
3218 if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
3219 {
3220 /*
3221 No one is doing a insert delayed
3222 Unlock table so that other threads can use it
3223 */
3224 MYSQL_LOCK *lock=thd->lock;
3225 thd->lock=0;
3226 mysql_mutex_unlock(&di->mutex);
3227 /*
3228 We need to release next_insert_id before unlocking. This is
3229 enforced by handler::ha_external_lock().
3230 */
3231 di->table->file->ha_release_auto_increment();
3232 mysql_unlock_tables(thd, lock);
3233 trans_commit_stmt(thd);
3234 di->group_count=0;
3235 mysql_audit_release(thd);
3236 mysql_mutex_lock(&di->mutex);
3237 }
3238 if (di->tables_in_use)
3239 mysql_cond_broadcast(&di->cond_client); // If waiting clients
3240 }
3241
3242 err:
3243 DBUG_LEAVE;
3244 }
3245
3246 {
3247 DBUG_ENTER("handle_delayed_insert-cleanup");
3248 di->table=0;
3249 mysql_mutex_unlock(&di->mutex);
3250
3251 /*
3252 Protect against mdl_locks trying to access open tables
3253 We use KILL_CONNECTION_HARD here to ensure that
3254 THD::notify_shared_lock() dosn't try to access open tables after
3255 this.
3256 */
3257 mysql_mutex_lock(&thd->LOCK_thd_data);
3258 thd->mdl_context.set_needs_thr_lock_abort(0);
3259 mysql_mutex_unlock(&thd->LOCK_thd_data);
3260 thd->set_killed(KILL_CONNECTION_HARD); // If error
3261
3262 close_thread_tables(thd); // Free the table
3263 thd->release_transactional_locks();
3264 mysql_cond_broadcast(&di->cond_client); // Safety
3265
3266 mysql_mutex_lock(&LOCK_delayed_create); // Because of delayed_get_table
3267 mysql_mutex_lock(&LOCK_delayed_insert);
3268 /*
3269 di should be unlinked from the thread handler list and have no active
3270 clients
3271 */
3272 delete di;
3273 mysql_mutex_unlock(&LOCK_delayed_insert);
3274 mysql_mutex_unlock(&LOCK_delayed_create);
3275
3276 DBUG_LEAVE;
3277 }
3278 my_thread_end();
3279 pthread_exit(0);
3280
3281 return 0;
3282 }
3283
3284
3285 /* Remove all pointers to data for blob fields so that original table doesn't try to free them */
3286
unlink_blobs(TABLE * table)3287 static void unlink_blobs(TABLE *table)
3288 {
3289 for (Field **ptr=table->field ; *ptr ; ptr++)
3290 {
3291 if ((*ptr)->flags & BLOB_FLAG)
3292 ((Field_blob *) (*ptr))->clear_temporary();
3293 }
3294 }
3295
3296 /* Free blobs stored in current row */
3297
free_delayed_insert_blobs(TABLE * table)3298 static void free_delayed_insert_blobs(TABLE *table)
3299 {
3300 for (Field **ptr=table->field ; *ptr ; ptr++)
3301 {
3302 if ((*ptr)->flags & BLOB_FLAG)
3303 ((Field_blob *) *ptr)->free();
3304 }
3305 }
3306
3307
3308 /* set value field for blobs to point to data in record */
3309
set_delayed_insert_blobs(TABLE * table)3310 static void set_delayed_insert_blobs(TABLE *table)
3311 {
3312 for (Field **ptr=table->field ; *ptr ; ptr++)
3313 {
3314 if ((*ptr)->flags & BLOB_FLAG)
3315 {
3316 Field_blob *blob= ((Field_blob *) *ptr);
3317 uchar *data= blob->get_ptr();
3318 if (data)
3319 blob->set_value(data); // Set value.ptr() to point to data
3320 }
3321 }
3322 }
3323
3324
handle_inserts(void)3325 bool Delayed_insert::handle_inserts(void)
3326 {
3327 int error;
3328 ulong max_rows;
3329 bool has_trans = TRUE;
3330 bool using_ignore= 0, using_opt_replace= 0,
3331 using_bin_log= mysql_bin_log.is_open();
3332 delayed_row *row;
3333 DBUG_ENTER("handle_inserts");
3334
3335 /* Allow client to insert new rows */
3336 mysql_mutex_unlock(&mutex);
3337
3338 table->next_number_field=table->found_next_number_field;
3339 table->use_all_columns();
3340
3341 THD_STAGE_INFO(&thd, stage_upgrading_lock);
3342 if (thr_upgrade_write_delay_lock(*thd.lock->locks, delayed_lock,
3343 thd.variables.lock_wait_timeout))
3344 {
3345 /*
3346 This can happen if thread is killed either by a shutdown
3347 or if another thread is removing the current table definition
3348 from the table cache.
3349 */
3350 my_error(ER_DELAYED_CANT_CHANGE_LOCK, MYF(ME_FATALERROR | ME_NOREFRESH),
3351 table->s->table_name.str);
3352 goto err;
3353 }
3354
3355 THD_STAGE_INFO(&thd, stage_insert);
3356 max_rows= delayed_insert_limit;
3357 if (thd.killed || table->s->tdc->flushed)
3358 {
3359 thd.set_killed(KILL_SYSTEM_THREAD);
3360 max_rows= ULONG_MAX; // Do as much as possible
3361 }
3362
3363 if (table->file->ha_rnd_init_with_error(0))
3364 goto err;
3365
3366 /*
3367 We can't use row caching when using the binary log because if
3368 we get a crash, then binary log will contain rows that are not yet
3369 written to disk, which will cause problems in replication.
3370 */
3371 if (!using_bin_log)
3372 table->file->extra(HA_EXTRA_WRITE_CACHE);
3373 mysql_mutex_lock(&mutex);
3374
3375 while ((row=rows.get()))
3376 {
3377 int tmp_error;
3378 stacked_inserts--;
3379 mysql_mutex_unlock(&mutex);
3380 memcpy(table->record[0],row->record,table->s->reclength);
3381 if (table->s->blob_fields)
3382 set_delayed_insert_blobs(table);
3383
3384 thd.start_time=row->start_time;
3385 thd.start_time_sec_part=row->start_time_sec_part;
3386 thd.query_start_sec_part_used=row->query_start_sec_part_used;
3387 /*
3388 To get the exact auto_inc interval to store in the binlog we must not
3389 use values from the previous interval (of the previous rows).
3390 */
3391 bool log_query= (row->log_query && row->query.str != NULL);
3392 DBUG_PRINT("delayed", ("query: '%s' length: %lu", row->query.str ?
3393 row->query.str : "[NULL]",
3394 (ulong) row->query.length));
3395 if (log_query)
3396 {
3397 /*
3398 Guaranteed that the INSERT DELAYED STMT will not be here
3399 in SBR when mysql binlog is enabled.
3400 */
3401 DBUG_ASSERT(!(mysql_bin_log.is_open() &&
3402 !thd.is_current_stmt_binlog_format_row()));
3403
3404 /*
3405 This is the first value of an INSERT statement.
3406 It is the right place to clear a forced insert_id.
3407 This is usually done after the last value of an INSERT statement,
3408 but we won't know this in the insert delayed thread. But before
3409 the first value is sufficiently equivalent to after the last
3410 value of the previous statement.
3411 */
3412 table->file->ha_release_auto_increment();
3413 thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty();
3414 }
3415 thd.first_successful_insert_id_in_prev_stmt=
3416 row->first_successful_insert_id_in_prev_stmt;
3417 thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt=
3418 row->stmt_depends_on_first_successful_insert_id_in_prev_stmt;
3419 table->auto_increment_field_not_null= row->auto_increment_field_not_null;
3420
3421 /* Copy the session variables. */
3422 thd.variables.auto_increment_increment= row->auto_increment_increment;
3423 thd.variables.auto_increment_offset= row->auto_increment_offset;
3424 thd.variables.sql_mode= row->sql_mode;
3425
3426 /* Copy a forced insert_id, if any. */
3427 if (row->forced_insert_id)
3428 {
3429 DBUG_PRINT("delayed", ("received auto_inc: %lu",
3430 (ulong) row->forced_insert_id));
3431 thd.force_one_auto_inc_interval(row->forced_insert_id);
3432 }
3433
3434 info.ignore= row->ignore;
3435 info.handle_duplicates= row->dup;
3436 if (info.ignore ||
3437 info.handle_duplicates != DUP_ERROR)
3438 {
3439 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3440 using_ignore=1;
3441 }
3442 if (info.handle_duplicates == DUP_REPLACE &&
3443 (!table->triggers ||
3444 !table->triggers->has_delete_triggers()))
3445 {
3446 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3447 using_opt_replace= 1;
3448 }
3449 if (info.handle_duplicates == DUP_UPDATE)
3450 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3451 thd.clear_error(); // reset error for binlog
3452
3453 tmp_error= 0;
3454 if (unlikely(table->vfield))
3455 {
3456 /*
3457 Virtual fields where not calculated by caller as the temporary
3458 TABLE object used had vcol_set empty. Better to calculate them
3459 here to make the caller faster.
3460 */
3461 tmp_error= table->update_virtual_fields(table->file,
3462 VCOL_UPDATE_FOR_WRITE);
3463 }
3464
3465 if (unlikely(tmp_error) || unlikely(write_record(&thd, table, &info)))
3466 {
3467 info.error_count++; // Ignore errors
3468 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3469 row->log_query = 0;
3470 }
3471
3472 if (using_ignore)
3473 {
3474 using_ignore=0;
3475 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
3476 }
3477 if (using_opt_replace)
3478 {
3479 using_opt_replace= 0;
3480 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
3481 }
3482
3483 if (table->s->blob_fields)
3484 free_delayed_insert_blobs(table);
3485 thread_safe_decrement(delayed_rows_in_use,&LOCK_delayed_status);
3486 thread_safe_increment(delayed_insert_writes,&LOCK_delayed_status);
3487 mysql_mutex_lock(&mutex);
3488
3489 /*
3490 Reset the table->auto_increment_field_not_null as it is valid for
3491 only one row.
3492 */
3493 table->auto_increment_field_not_null= FALSE;
3494
3495 delete row;
3496 /*
3497 Let READ clients do something once in a while
3498 We should however not break in the middle of a multi-line insert
3499 if we have binary logging enabled as we don't want other commands
3500 on this table until all entries has been processed
3501 */
3502 if (group_count++ >= max_rows && (row= rows.head()) &&
3503 (!(row->log_query & using_bin_log)))
3504 {
3505 group_count=0;
3506 if (stacked_inserts || tables_in_use) // Let these wait a while
3507 {
3508 if (tables_in_use)
3509 mysql_cond_broadcast(&cond_client); // If waiting clients
3510 THD_STAGE_INFO(&thd, stage_reschedule);
3511 mysql_mutex_unlock(&mutex);
3512 if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3513 {
3514 /* This should never happen */
3515 table->file->print_error(error,MYF(0));
3516 sql_print_error("%s", thd.get_stmt_da()->message());
3517 DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed in loop"));
3518 goto err;
3519 }
3520 query_cache_invalidate3(&thd, table, 1);
3521 if (thr_reschedule_write_lock(*thd.lock->locks,
3522 thd.variables.lock_wait_timeout))
3523 {
3524 /* This is not known to happen. */
3525 my_error(ER_DELAYED_CANT_CHANGE_LOCK,
3526 MYF(ME_FATALERROR | ME_NOREFRESH),
3527 table->s->table_name.str);
3528 goto err;
3529 }
3530 if (!using_bin_log)
3531 table->file->extra(HA_EXTRA_WRITE_CACHE);
3532 mysql_mutex_lock(&mutex);
3533 THD_STAGE_INFO(&thd, stage_insert);
3534 }
3535 if (tables_in_use)
3536 mysql_cond_broadcast(&cond_client); // If waiting clients
3537 }
3538 }
3539
3540 table->file->ha_rnd_end();
3541
3542 if (WSREP((&thd)))
3543 thd_proc_info(&thd, "Insert done");
3544 else
3545 thd_proc_info(&thd, 0);
3546 mysql_mutex_unlock(&mutex);
3547
3548 /*
3549 We need to flush the pending event when using row-based
3550 replication since the flushing normally done in binlog_query() is
3551 not done last in the statement: for delayed inserts, the insert
3552 statement is logged *before* all rows are inserted.
3553
3554 We can flush the pending event without checking the thd->lock
3555 since the delayed insert *thread* is not inside a stored function
3556 or trigger.
3557
3558 TODO: Move the logging to last in the sequence of rows.
3559 */
3560 has_trans= thd.lex->sql_command == SQLCOM_CREATE_TABLE ||
3561 table->file->has_transactions();
3562 if (thd.is_current_stmt_binlog_format_row() &&
3563 thd.binlog_flush_pending_rows_event(TRUE, has_trans))
3564 goto err;
3565
3566 if (unlikely((error=table->file->extra(HA_EXTRA_NO_CACHE))))
3567 { // This shouldn't happen
3568 table->file->print_error(error,MYF(0));
3569 sql_print_error("%s", thd.get_stmt_da()->message());
3570 DBUG_PRINT("error", ("HA_EXTRA_NO_CACHE failed after loop"));
3571 goto err;
3572 }
3573 query_cache_invalidate3(&thd, table, 1);
3574 mysql_mutex_lock(&mutex);
3575 DBUG_RETURN(0);
3576
3577 err:
3578 #ifndef DBUG_OFF
3579 max_rows= 0; // For DBUG output
3580 #endif
3581 /* Remove all not used rows */
3582 mysql_mutex_lock(&mutex);
3583 while ((row=rows.get()))
3584 {
3585 if (table->s->blob_fields)
3586 {
3587 memcpy(table->record[0],row->record,table->s->reclength);
3588 set_delayed_insert_blobs(table);
3589 free_delayed_insert_blobs(table);
3590 }
3591 delete row;
3592 thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
3593 stacked_inserts--;
3594 #ifndef DBUG_OFF
3595 max_rows++;
3596 #endif
3597 }
3598 DBUG_PRINT("error", ("dropped %lu rows after an error", max_rows));
3599 thread_safe_increment(delayed_insert_errors, &LOCK_delayed_status);
3600 DBUG_RETURN(1);
3601 }
3602 #endif /* EMBEDDED_LIBRARY */
3603
3604 /***************************************************************************
3605 Store records in INSERT ... SELECT *
3606 ***************************************************************************/
3607
3608
3609 /*
3610 make insert specific preparation and checks after opening tables
3611
3612 SYNOPSIS
3613 mysql_insert_select_prepare()
3614 thd thread handler
3615
3616 RETURN
3617 FALSE OK
3618 TRUE Error
3619 */
3620
mysql_insert_select_prepare(THD * thd)3621 bool mysql_insert_select_prepare(THD *thd)
3622 {
3623 LEX *lex= thd->lex;
3624 SELECT_LEX *select_lex= &lex->select_lex;
3625 DBUG_ENTER("mysql_insert_select_prepare");
3626
3627
3628 /*
3629 SELECT_LEX do not belong to INSERT statement, so we can't add WHERE
3630 clause if table is VIEW
3631 */
3632
3633 if (mysql_prepare_insert(thd, lex->query_tables,
3634 lex->query_tables->table, lex->field_list, 0,
3635 lex->update_list, lex->value_list, lex->duplicates,
3636 &select_lex->where, TRUE))
3637 DBUG_RETURN(TRUE);
3638
3639 DBUG_ASSERT(select_lex->leaf_tables.elements != 0);
3640 List_iterator<TABLE_LIST> ti(select_lex->leaf_tables);
3641 TABLE_LIST *table;
3642 uint insert_tables;
3643
3644 if (select_lex->first_cond_optimization)
3645 {
3646 /* Back up leaf_tables list. */
3647 Query_arena *arena= thd->stmt_arena, backup;
3648 arena= thd->activate_stmt_arena_if_needed(&backup); // For easier test
3649
3650 insert_tables= select_lex->insert_tables;
3651 while ((table= ti++) && insert_tables--)
3652 {
3653 select_lex->leaf_tables_exec.push_back(table);
3654 table->tablenr_exec= table->table->tablenr;
3655 table->map_exec= table->table->map;
3656 table->maybe_null_exec= table->table->maybe_null;
3657 }
3658 if (arena)
3659 thd->restore_active_arena(arena, &backup);
3660 }
3661 ti.rewind();
3662 /*
3663 exclude first table from leaf tables list, because it belong to
3664 INSERT
3665 */
3666 /* skip all leaf tables belonged to view where we are insert */
3667 insert_tables= select_lex->insert_tables;
3668 while ((table= ti++) && insert_tables--)
3669 ti.remove();
3670
3671 DBUG_RETURN(FALSE);
3672 }
3673
3674
select_insert(THD * thd_arg,TABLE_LIST * table_list_par,TABLE * table_par,List<Item> * fields_par,List<Item> * update_fields,List<Item> * update_values,enum_duplicates duplic,bool ignore_check_option_errors)3675 select_insert::select_insert(THD *thd_arg, TABLE_LIST *table_list_par,
3676 TABLE *table_par,
3677 List<Item> *fields_par,
3678 List<Item> *update_fields,
3679 List<Item> *update_values,
3680 enum_duplicates duplic,
3681 bool ignore_check_option_errors):
3682 select_result_interceptor(thd_arg),
3683 table_list(table_list_par), table(table_par), fields(fields_par),
3684 autoinc_value_of_last_inserted_row(0),
3685 insert_into_view(table_list_par && table_list_par->view != 0)
3686 {
3687 bzero((char*) &info,sizeof(info));
3688 info.handle_duplicates= duplic;
3689 info.ignore= ignore_check_option_errors;
3690 info.update_fields= update_fields;
3691 info.update_values= update_values;
3692 info.view= (table_list_par->view ? table_list_par : 0);
3693 info.table_list= table_list_par;
3694 }
3695
3696
3697 int
prepare(List<Item> & values,SELECT_LEX_UNIT * u)3698 select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
3699 {
3700 LEX *lex= thd->lex;
3701 int res;
3702 table_map map= 0;
3703 SELECT_LEX *lex_current_select_save= lex->current_select;
3704 DBUG_ENTER("select_insert::prepare");
3705
3706 unit= u;
3707
3708 /*
3709 Since table in which we are going to insert is added to the first
3710 select, LEX::current_select should point to the first select while
3711 we are fixing fields from insert list.
3712 */
3713 lex->current_select= &lex->select_lex;
3714
3715 res= (setup_fields(thd, Ref_ptr_array(),
3716 values, MARK_COLUMNS_READ, 0, NULL, 0) ||
3717 check_insert_fields(thd, table_list, *fields, values,
3718 !insert_into_view, 1, &map));
3719
3720 if (!res && fields->elements)
3721 {
3722 bool saved_abort_on_warning= thd->abort_on_warning;
3723 thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
3724 res= check_that_all_fields_are_given_values(thd, table_list->table, table_list);
3725 thd->abort_on_warning= saved_abort_on_warning;
3726 }
3727
3728 if (info.handle_duplicates == DUP_UPDATE && !res)
3729 {
3730 Name_resolution_context *context= &lex->select_lex.context;
3731 Name_resolution_context_state ctx_state;
3732
3733 /* Save the state of the current name resolution context. */
3734 ctx_state.save_state(context, table_list);
3735
3736 /* Perform name resolution only in the first table - 'table_list'. */
3737 table_list->next_local= 0;
3738 context->resolve_in_table_list_only(table_list);
3739
3740 lex->select_lex.no_wrap_view_item= TRUE;
3741 res= res ||
3742 check_update_fields(thd, context->table_list,
3743 *info.update_fields, *info.update_values,
3744 /*
3745 In INSERT SELECT ON DUPLICATE KEY UPDATE col=x
3746 'x' can legally refer to a non-inserted table.
3747 'x' is not even resolved yet.
3748 */
3749 true,
3750 &map);
3751 lex->select_lex.no_wrap_view_item= FALSE;
3752 /*
3753 When we are not using GROUP BY and there are no ungrouped aggregate functions
3754 we can refer to other tables in the ON DUPLICATE KEY part.
3755 We use next_name_resolution_table descructively, so check it first (views?)
3756 */
3757 DBUG_ASSERT (!table_list->next_name_resolution_table);
3758 if (lex->select_lex.group_list.elements == 0 &&
3759 !lex->select_lex.with_sum_func)
3760 /*
3761 We must make a single context out of the two separate name resolution contexts :
3762 the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
3763 To do that we must concatenate the two lists
3764 */
3765 table_list->next_name_resolution_table=
3766 ctx_state.get_first_name_resolution_table();
3767
3768 res= res || setup_fields(thd, Ref_ptr_array(), *info.update_values,
3769 MARK_COLUMNS_READ, 0, NULL, 0);
3770 if (!res)
3771 {
3772 /*
3773 Traverse the update values list and substitute fields from the
3774 select for references (Item_ref objects) to them. This is done in
3775 order to get correct values from those fields when the select
3776 employs a temporary table.
3777 */
3778 List_iterator<Item> li(*info.update_values);
3779 Item *item;
3780
3781 while ((item= li++))
3782 {
3783 item->transform(thd, &Item::update_value_transformer,
3784 (uchar*)lex->current_select);
3785 }
3786 }
3787
3788 /* Restore the current context. */
3789 ctx_state.restore_state(context, table_list);
3790 }
3791
3792 lex->current_select= lex_current_select_save;
3793 if (res)
3794 DBUG_RETURN(1);
3795 /*
3796 if it is INSERT into join view then check_insert_fields already found
3797 real table for insert
3798 */
3799 table= table_list->table;
3800
3801 /*
3802 Is table which we are changing used somewhere in other parts of
3803 query
3804 */
3805 if (unique_table(thd, table_list, table_list->next_global, 0))
3806 {
3807 /* Using same table for INSERT and SELECT */
3808 lex->current_select->options|= OPTION_BUFFER_RESULT;
3809 lex->current_select->join->select_options|= OPTION_BUFFER_RESULT;
3810 }
3811 else if (!(lex->current_select->options & OPTION_BUFFER_RESULT) &&
3812 thd->locked_tables_mode <= LTM_LOCK_TABLES)
3813 {
3814 /*
3815 We must not yet prepare the result table if it is the same as one of the
3816 source tables (INSERT SELECT). The preparation may disable
3817 indexes on the result table, which may be used during the select, if it
3818 is the same table (Bug #6034). Do the preparation after the select phase
3819 in select_insert::prepare2().
3820 We won't start bulk inserts at all if this statement uses functions or
3821 should invoke triggers since they may access to the same table too.
3822 */
3823 table->file->ha_start_bulk_insert((ha_rows) 0);
3824 }
3825 restore_record(table,s->default_values); // Get empty record
3826 table->reset_default_fields();
3827 table->next_number_field=table->found_next_number_field;
3828
3829 #ifdef HAVE_REPLICATION
3830 if (thd->rgi_slave &&
3831 (info.handle_duplicates == DUP_UPDATE) &&
3832 (table->next_number_field != NULL) &&
3833 rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
3834 DBUG_RETURN(1);
3835 #endif
3836
3837 thd->cuted_fields=0;
3838 if (info.ignore || info.handle_duplicates != DUP_ERROR)
3839 {
3840 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
3841 if (table->file->ha_table_flags() & HA_DUPLICATE_POS &&
3842 table->file->ha_rnd_init_with_error(0))
3843 DBUG_RETURN(1);
3844 }
3845 if (info.handle_duplicates == DUP_REPLACE &&
3846 (!table->triggers || !table->triggers->has_delete_triggers()))
3847 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
3848 if (info.handle_duplicates == DUP_UPDATE)
3849 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
3850 thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
3851 res= (table_list->prepare_where(thd, 0, TRUE) ||
3852 table_list->prepare_check_option(thd));
3853
3854 if (!res)
3855 {
3856 table->prepare_triggers_for_insert_stmt_or_event();
3857 table->mark_columns_needed_for_insert();
3858 }
3859
3860 DBUG_RETURN(res);
3861 }
3862
3863
3864 /*
3865 Finish the preparation of the result table.
3866
3867 SYNOPSIS
3868 select_insert::prepare2()
3869 void
3870
3871 DESCRIPTION
3872 If the result table is the same as one of the source tables (INSERT SELECT),
3873 the result table is not finally prepared at the join prepair phase.
3874 Do the final preparation now.
3875
3876 RETURN
3877 0 OK
3878 */
3879
prepare2(JOIN *)3880 int select_insert::prepare2(JOIN *)
3881 {
3882 DBUG_ENTER("select_insert::prepare2");
3883 if (thd->lex->current_select->options & OPTION_BUFFER_RESULT &&
3884 thd->locked_tables_mode <= LTM_LOCK_TABLES &&
3885 !thd->lex->describe)
3886 table->file->ha_start_bulk_insert((ha_rows) 0);
3887 if (table->validate_default_values_of_unset_fields(thd))
3888 DBUG_RETURN(1);
3889 DBUG_RETURN(0);
3890 }
3891
3892
cleanup()3893 void select_insert::cleanup()
3894 {
3895 /* select_insert/select_create are never re-used in prepared statement */
3896 DBUG_ASSERT(0);
3897 }
3898
~select_insert()3899 select_insert::~select_insert()
3900 {
3901 DBUG_ENTER("~select_insert");
3902 if (table && table->is_created())
3903 {
3904 table->next_number_field=0;
3905 table->auto_increment_field_not_null= FALSE;
3906 table->file->ha_reset();
3907 }
3908 thd->count_cuted_fields= CHECK_FIELD_IGNORE;
3909 thd->abort_on_warning= 0;
3910 DBUG_VOID_RETURN;
3911 }
3912
3913
send_data(List<Item> & values)3914 int select_insert::send_data(List<Item> &values)
3915 {
3916 DBUG_ENTER("select_insert::send_data");
3917 bool error=0;
3918
3919 if (unit->offset_limit_cnt)
3920 { // using limit offset,count
3921 unit->offset_limit_cnt--;
3922 DBUG_RETURN(0);
3923 }
3924 if (unlikely(thd->killed == ABORT_QUERY))
3925 DBUG_RETURN(0);
3926
3927 thd->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
3928 store_values(values);
3929 if (table->default_field &&
3930 unlikely(table->update_default_fields(info.ignore)))
3931 DBUG_RETURN(1);
3932 thd->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
3933 if (unlikely(thd->is_error()))
3934 {
3935 table->auto_increment_field_not_null= FALSE;
3936 DBUG_RETURN(1);
3937 }
3938
3939 table->vers_write= table->versioned();
3940 if (table_list) // Not CREATE ... SELECT
3941 {
3942 switch (table_list->view_check_option(thd, info.ignore)) {
3943 case VIEW_CHECK_SKIP:
3944 DBUG_RETURN(0);
3945 case VIEW_CHECK_ERROR:
3946 DBUG_RETURN(1);
3947 }
3948 }
3949
3950 error= write_record(thd, table, &info);
3951 table->vers_write= table->versioned();
3952 table->auto_increment_field_not_null= FALSE;
3953
3954 if (likely(!error))
3955 {
3956 if (table->triggers || info.handle_duplicates == DUP_UPDATE)
3957 {
3958 /*
3959 Restore fields of the record since it is possible that they were
3960 changed by ON DUPLICATE KEY UPDATE clause.
3961
3962 If triggers exist then whey can modify some fields which were not
3963 originally touched by INSERT ... SELECT, so we have to restore
3964 their original values for the next row.
3965 */
3966 restore_record(table, s->default_values);
3967 }
3968 if (table->next_number_field)
3969 {
3970 /*
3971 If no value has been autogenerated so far, we need to remember the
3972 value we just saw, we may need to send it to client in the end.
3973 */
3974 if (thd->first_successful_insert_id_in_cur_stmt == 0) // optimization
3975 autoinc_value_of_last_inserted_row=
3976 table->next_number_field->val_int();
3977 /*
3978 Clear auto-increment field for the next record, if triggers are used
3979 we will clear it twice, but this should be cheap.
3980 */
3981 table->next_number_field->reset();
3982 }
3983 }
3984 DBUG_RETURN(error);
3985 }
3986
3987
store_values(List<Item> & values)3988 void select_insert::store_values(List<Item> &values)
3989 {
3990 DBUG_ENTER("select_insert::store_values");
3991
3992 if (fields->elements)
3993 fill_record_n_invoke_before_triggers(thd, table, *fields, values, 1,
3994 TRG_EVENT_INSERT);
3995 else
3996 fill_record_n_invoke_before_triggers(thd, table, table->field_to_fill(),
3997 values, 1, TRG_EVENT_INSERT);
3998
3999 DBUG_VOID_RETURN;
4000 }
4001
prepare_eof()4002 bool select_insert::prepare_eof()
4003 {
4004 int error;
4005 bool const trans_table= table->file->has_transactions();
4006 bool changed;
4007 bool binary_logged= 0;
4008 killed_state killed_status= thd->killed;
4009
4010 DBUG_ENTER("select_insert::prepare_eof");
4011 DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
4012 trans_table, table->file->table_type()));
4013
4014 error= (IF_WSREP((thd->wsrep_conflict_state == MUST_ABORT ||
4015 thd->wsrep_conflict_state == CERT_FAILURE) ? -1 :, )
4016 (thd->locked_tables_mode <= LTM_LOCK_TABLES ?
4017 table->file->ha_end_bulk_insert() : 0));
4018
4019 if (likely(!error) && unlikely(thd->is_error()))
4020 error= thd->get_stmt_da()->sql_errno();
4021
4022 if (info.ignore || info.handle_duplicates != DUP_ERROR)
4023 if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
4024 table->file->ha_rnd_end();
4025 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4026 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4027
4028 if (likely((changed= (info.copied || info.deleted || info.updated))))
4029 {
4030 /*
4031 We must invalidate the table in the query cache before binlog writing
4032 and ha_autocommit_or_rollback.
4033 */
4034 query_cache_invalidate3(thd, table, 1);
4035 }
4036
4037 if (thd->transaction.stmt.modified_non_trans_table)
4038 thd->transaction.all.modified_non_trans_table= TRUE;
4039 thd->transaction.all.m_unsafe_rollback_flags|=
4040 (thd->transaction.stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
4041
4042 DBUG_ASSERT(trans_table || !changed ||
4043 thd->transaction.stmt.modified_non_trans_table);
4044
4045 /*
4046 Write to binlog before commiting transaction. No statement will
4047 be written by the binlog_query() below in RBR mode. All the
4048 events are in the transaction cache and will be written when
4049 ha_autocommit_or_rollback() is issued below.
4050 */
4051 if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
4052 (likely(!error) || thd->transaction.stmt.modified_non_trans_table))
4053 {
4054 int errcode= 0;
4055 int res;
4056 if (likely(!error))
4057 thd->clear_error();
4058 else
4059 errcode= query_error_code(thd, killed_status == NOT_KILLED);
4060 res= thd->binlog_query(THD::ROW_QUERY_TYPE,
4061 thd->query(), thd->query_length(),
4062 trans_table, FALSE, FALSE, errcode);
4063 if (res > 0)
4064 {
4065 table->file->ha_release_auto_increment();
4066 DBUG_RETURN(true);
4067 }
4068 binary_logged= res == 0 || !table->s->tmp_table;
4069 }
4070 table->s->table_creation_was_logged|= binary_logged;
4071 table->file->ha_release_auto_increment();
4072
4073 if (unlikely(error))
4074 {
4075 table->file->print_error(error,MYF(0));
4076 DBUG_RETURN(true);
4077 }
4078
4079 DBUG_RETURN(false);
4080 }
4081
send_ok_packet()4082 bool select_insert::send_ok_packet() {
4083 char message[160]; /* status message */
4084 ulonglong row_count; /* rows affected */
4085 ulonglong id; /* last insert-id */
4086
4087 DBUG_ENTER("select_insert::send_ok_packet");
4088
4089 if (info.ignore)
4090 my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4091 (ulong) info.records, (ulong) (info.records - info.copied),
4092 (long) thd->get_stmt_da()->current_statement_warn_count());
4093 else
4094 my_snprintf(message, sizeof(message), ER(ER_INSERT_INFO),
4095 (ulong) info.records, (ulong) (info.deleted + info.updated),
4096 (long) thd->get_stmt_da()->current_statement_warn_count());
4097
4098 row_count= info.copied + info.deleted +
4099 ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
4100 info.touched : info.updated);
4101
4102 id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
4103 thd->first_successful_insert_id_in_cur_stmt :
4104 (thd->arg_of_last_insert_id_function ?
4105 thd->first_successful_insert_id_in_prev_stmt :
4106 (info.copied ? autoinc_value_of_last_inserted_row : 0));
4107
4108 ::my_ok(thd, row_count, id, message);
4109
4110 DBUG_RETURN(false);
4111 }
4112
send_eof()4113 bool select_insert::send_eof()
4114 {
4115 bool res;
4116 DBUG_ENTER("select_insert::send_eof");
4117 res= (prepare_eof() || (!suppress_my_ok && send_ok_packet()));
4118 DBUG_RETURN(res);
4119 }
4120
abort_result_set()4121 void select_insert::abort_result_set()
4122 {
4123 bool binary_logged= 0;
4124 DBUG_ENTER("select_insert::abort_result_set");
4125 /*
4126 If the creation of the table failed (due to a syntax error, for
4127 example), no table will have been opened and therefore 'table'
4128 will be NULL. In that case, we still need to execute the rollback
4129 and the end of the function.
4130
4131 If it fail due to inability to insert in multi-table view for example,
4132 table will be assigned with view table structure, but that table will
4133 not be opened really (it is dummy to check fields types & Co).
4134 */
4135 if (table && table->file->get_table())
4136 {
4137 bool changed, transactional_table;
4138 /*
4139 If we are not in prelocked mode, we end the bulk insert started
4140 before.
4141 */
4142 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4143 table->file->ha_end_bulk_insert();
4144
4145 if (table->file->inited)
4146 table->file->ha_rnd_end();
4147 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4148 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4149
4150 /*
4151 If at least one row has been inserted/modified and will stay in
4152 the table (the table doesn't have transactions) we must write to
4153 the binlog (and the error code will make the slave stop).
4154
4155 For many errors (example: we got a duplicate key error while
4156 inserting into a MyISAM table), no row will be added to the table,
4157 so passing the error to the slave will not help since there will
4158 be an error code mismatch (the inserts will succeed on the slave
4159 with no error).
4160
4161 If table creation failed, the number of rows modified will also be
4162 zero, so no check for that is made.
4163 */
4164 changed= (info.copied || info.deleted || info.updated);
4165 transactional_table= table->file->has_transactions();
4166 if (thd->transaction.stmt.modified_non_trans_table ||
4167 thd->log_current_statement)
4168 {
4169 if (!can_rollback_data())
4170 thd->transaction.all.modified_non_trans_table= TRUE;
4171
4172 if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4173 {
4174 int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4175 int res;
4176 /* error of writing binary log is ignored */
4177 res= thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
4178 thd->query_length(),
4179 transactional_table, FALSE, FALSE, errcode);
4180 binary_logged= res == 0 || !table->s->tmp_table;
4181 }
4182 if (changed)
4183 query_cache_invalidate3(thd, table, 1);
4184 }
4185 DBUG_ASSERT(transactional_table || !changed ||
4186 thd->transaction.stmt.modified_non_trans_table);
4187
4188 table->s->table_creation_was_logged|= binary_logged;
4189 table->file->ha_release_auto_increment();
4190 }
4191
4192 DBUG_VOID_RETURN;
4193 }
4194
4195
4196 /***************************************************************************
4197 CREATE TABLE (SELECT) ...
4198 ***************************************************************************/
4199
create_field_for_create_select(TABLE * table)4200 Field *Item::create_field_for_create_select(TABLE *table)
4201 {
4202 Field *def_field, *tmp_field;
4203 return ::create_tmp_field(table->in_use, table, this, type(),
4204 (Item ***) 0, &tmp_field, &def_field, 0, 0, 0, 0);
4205 }
4206
4207
4208 /**
4209 Create table from lists of fields and items (or just return TABLE
4210 object for pre-opened existing table).
4211
4212 @param thd [in] Thread object
4213 @param create_info [in] Create information (like MAX_ROWS, ENGINE or
4214 temporary table flag)
4215 @param create_table [in] Pointer to TABLE_LIST object providing database
4216 and name for table to be created or to be open
4217 @param alter_info [in/out] Initial list of columns and indexes for the
4218 table to be created
4219 @param items [in] List of items which should be used to produce
4220 rest of fields for the table (corresponding
4221 fields will be added to the end of
4222 alter_info->create_list)
4223 @param lock [out] Pointer to the MYSQL_LOCK object for table
4224 created will be returned in this parameter.
4225 Since this table is not included in THD::lock
4226 caller is responsible for explicitly unlocking
4227 this table.
4228 @param hooks [in] Hooks to be invoked before and after obtaining
4229 table lock on the table being created.
4230
4231 @note
4232 This function assumes that either table exists and was pre-opened and
4233 locked at open_and_lock_tables() stage (and in this case we just emit
4234 error or warning and return pre-opened TABLE object) or an exclusive
4235 metadata lock was acquired on table so we can safely create, open and
4236 lock table in it (we don't acquire metadata lock if this create is
4237 for temporary table).
4238
4239 @note
4240 Since this function contains some logic specific to CREATE TABLE ...
4241 SELECT it should be changed before it can be used in other contexts.
4242
4243 @retval non-zero Pointer to TABLE object for table created or opened
4244 @retval 0 Error
4245 */
4246
create_table_from_items(THD * thd,List<Item> * items,MYSQL_LOCK ** lock,TABLEOP_HOOKS * hooks)4247 TABLE *select_create::create_table_from_items(THD *thd,
4248 List<Item> *items,
4249 MYSQL_LOCK **lock,
4250 TABLEOP_HOOKS *hooks)
4251 {
4252 TABLE tmp_table; // Used during 'Create_field()'
4253 TABLE_SHARE share;
4254 TABLE *table= 0;
4255 uint select_field_count= items->elements;
4256 /* Add selected items to field list */
4257 List_iterator_fast<Item> it(*items);
4258 Item *item;
4259 bool save_table_creation_was_logged;
4260 DBUG_ENTER("select_create::create_table_from_items");
4261
4262 tmp_table.s= &share;
4263 init_tmp_table_share(thd, &share, "", 0, "", "");
4264
4265 tmp_table.s->db_create_options=0;
4266 tmp_table.null_row= 0;
4267 tmp_table.maybe_null= 0;
4268 tmp_table.in_use= thd;
4269
4270 if (!opt_explicit_defaults_for_timestamp)
4271 promote_first_timestamp_column(&alter_info->create_list);
4272
4273 if (create_info->vers_fix_system_fields(thd, alter_info, *create_table))
4274 DBUG_RETURN(NULL);
4275
4276 while ((item=it++))
4277 {
4278 Field *tmp_field= item->create_field_for_create_select(&tmp_table);
4279
4280 if (!tmp_field)
4281 DBUG_RETURN(NULL);
4282
4283 Field *table_field;
4284
4285 switch (item->type())
4286 {
4287 /*
4288 We have to take into account both the real table's fields and
4289 pseudo-fields used in trigger's body. These fields are used
4290 to copy defaults values later inside constructor of
4291 the class Create_field.
4292 */
4293 case Item::FIELD_ITEM:
4294 case Item::TRIGGER_FIELD_ITEM:
4295 table_field= ((Item_field *) item)->field;
4296 break;
4297 default:
4298 table_field= NULL;
4299 }
4300
4301 Create_field *cr_field= new (thd->mem_root)
4302 Create_field(thd, tmp_field, table_field);
4303
4304 if (!cr_field)
4305 DBUG_RETURN(NULL);
4306
4307 if (item->maybe_null)
4308 cr_field->flags &= ~NOT_NULL_FLAG;
4309 alter_info->create_list.push_back(cr_field, thd->mem_root);
4310 }
4311
4312 if (create_info->vers_check_system_fields(thd, alter_info,
4313 create_table->table_name,
4314 create_table->db,
4315 select_field_count))
4316 DBUG_RETURN(NULL);
4317
4318 DEBUG_SYNC(thd,"create_table_select_before_create");
4319
4320 /* Check if LOCK TABLES + CREATE OR REPLACE of existing normal table*/
4321 if (thd->locked_tables_mode && create_table->table &&
4322 !create_info->tmp_table())
4323 {
4324 /* Remember information about the locked table */
4325 create_info->pos_in_locked_tables=
4326 create_table->table->pos_in_locked_tables;
4327 create_info->mdl_ticket= create_table->table->mdl_ticket;
4328 }
4329
4330 /*
4331 Create and lock table.
4332
4333 Note that we either creating (or opening existing) temporary table or
4334 creating base table on which name we have exclusive lock. So code below
4335 should not cause deadlocks or races.
4336
4337 We don't log the statement, it will be logged later.
4338
4339 If this is a HEAP table, the automatic DELETE FROM which is written to the
4340 binlog when a HEAP table is opened for the first time since startup, must
4341 not be written: 1) it would be wrong (imagine we're in CREATE SELECT: we
4342 don't want to delete from it) 2) it would be written before the CREATE
4343 TABLE, which is a wrong order. So we keep binary logging disabled when we
4344 open_table().
4345 */
4346
4347 if (!mysql_create_table_no_lock(thd, &create_table->db,
4348 &create_table->table_name,
4349 create_info, alter_info, NULL,
4350 select_field_count, create_table))
4351 {
4352 DEBUG_SYNC(thd,"create_table_select_before_open");
4353
4354 /*
4355 If we had a temporary table or a table used with LOCK TABLES,
4356 it was closed by mysql_create()
4357 */
4358 create_table->table= 0;
4359
4360 if (!create_info->tmp_table())
4361 {
4362 Open_table_context ot_ctx(thd, MYSQL_OPEN_REOPEN);
4363 TABLE_LIST::enum_open_strategy save_open_strategy;
4364
4365 /* Force the newly created table to be opened */
4366 save_open_strategy= create_table->open_strategy;
4367 create_table->open_strategy= TABLE_LIST::OPEN_NORMAL;
4368 /*
4369 Here we open the destination table, on which we already have
4370 an exclusive metadata lock.
4371 */
4372 if (open_table(thd, create_table, &ot_ctx))
4373 {
4374 quick_rm_table(thd, create_info->db_type, &create_table->db,
4375 table_case_name(create_info, &create_table->table_name),
4376 0);
4377 }
4378 /* Restore */
4379 create_table->open_strategy= save_open_strategy;
4380 }
4381 else
4382 {
4383 /*
4384 The pointer to the newly created temporary table has been stored in
4385 table->create_info.
4386 */
4387 create_table->table= create_info->table;
4388 if (!create_table->table)
4389 {
4390 /*
4391 This shouldn't happen as creation of temporary table should make
4392 it preparable for open. Anyway we can't drop temporary table if
4393 we are unable to find it.
4394 */
4395 DBUG_ASSERT(0);
4396 }
4397 }
4398 }
4399 else
4400 create_table->table= 0; // Create failed
4401
4402 if (unlikely(!(table= create_table->table)))
4403 {
4404 if (likely(!thd->is_error())) // CREATE ... IF NOT EXISTS
4405 my_ok(thd); // succeed, but did nothing
4406 DBUG_RETURN(NULL);
4407 }
4408
4409 DEBUG_SYNC(thd,"create_table_select_before_lock");
4410
4411 table->reginfo.lock_type=TL_WRITE;
4412 hooks->prelock(&table, 1); // Call prelock hooks
4413
4414 /*
4415 Ensure that decide_logging_format(), called by mysql_lock_tables(), works
4416 with temporary tables that will be logged later if needed.
4417 */
4418 save_table_creation_was_logged= table->s->table_creation_was_logged;
4419 table->s->table_creation_was_logged= 1;
4420
4421 /*
4422 mysql_lock_tables() below should never fail with request to reopen table
4423 since it won't wait for the table lock (we have exclusive metadata lock on
4424 the table) and thus can't get aborted.
4425 */
4426 if (unlikely(!((*lock)= mysql_lock_tables(thd, &table, 1, 0)) ||
4427 hooks->postlock(&table, 1)))
4428 {
4429 /* purecov: begin tested */
4430 /*
4431 This can happen in innodb when you get a deadlock when using same table
4432 in insert and select or when you run out of memory.
4433 It can also happen if there was a conflict in
4434 THD::decide_logging_format()
4435 */
4436 if (!thd->is_error())
4437 my_error(ER_CANT_LOCK, MYF(0), my_errno);
4438 if (*lock)
4439 {
4440 mysql_unlock_tables(thd, *lock);
4441 *lock= 0;
4442 }
4443 drop_open_table(thd, table, &create_table->db, &create_table->table_name);
4444 DBUG_RETURN(0);
4445 /* purecov: end */
4446 }
4447 table->s->table_creation_was_logged= save_table_creation_was_logged;
4448 DBUG_RETURN(table);
4449 }
4450
4451
4452 int
prepare(List<Item> & _values,SELECT_LEX_UNIT * u)4453 select_create::prepare(List<Item> &_values, SELECT_LEX_UNIT *u)
4454 {
4455 List<Item> values(_values, thd->mem_root);
4456 MYSQL_LOCK *extra_lock= NULL;
4457 DBUG_ENTER("select_create::prepare");
4458
4459 TABLEOP_HOOKS *hook_ptr= NULL;
4460 /*
4461 For row-based replication, the CREATE-SELECT statement is written
4462 in two pieces: the first one contain the CREATE TABLE statement
4463 necessary to create the table and the second part contain the rows
4464 that should go into the table.
4465
4466 For non-temporary tables, the start of the CREATE-SELECT
4467 implicitly commits the previous transaction, and all events
4468 forming the statement will be stored the transaction cache. At end
4469 of the statement, the entire statement is committed as a
4470 transaction, and all events are written to the binary log.
4471
4472 On the master, the table is locked for the duration of the
4473 statement, but since the CREATE part is replicated as a simple
4474 statement, there is no way to lock the table for accesses on the
4475 slave. Hence, we have to hold on to the CREATE part of the
4476 statement until the statement has finished.
4477 */
4478 class MY_HOOKS : public TABLEOP_HOOKS {
4479 public:
4480 MY_HOOKS(select_create *x, TABLE_LIST *create_table_arg,
4481 TABLE_LIST *select_tables_arg)
4482 : ptr(x),
4483 create_table(create_table_arg),
4484 select_tables(select_tables_arg)
4485 {
4486 }
4487
4488 private:
4489 virtual int do_postlock(TABLE **tables, uint count)
4490 {
4491 int error;
4492 THD *thd= const_cast<THD*>(ptr->get_thd());
4493 TABLE_LIST *save_next_global= create_table->next_global;
4494
4495 create_table->next_global= select_tables;
4496
4497 error= thd->decide_logging_format(create_table);
4498
4499 create_table->next_global= save_next_global;
4500
4501 if (unlikely(error))
4502 return error;
4503
4504 TABLE const *const table = *tables;
4505 if (thd->is_current_stmt_binlog_format_row() &&
4506 !table->s->tmp_table)
4507 {
4508 int error;
4509 if (unlikely((error= ptr->binlog_show_create_table(tables, count))))
4510 return error;
4511 }
4512 return 0;
4513 }
4514 select_create *ptr;
4515 TABLE_LIST *create_table;
4516 TABLE_LIST *select_tables;
4517 };
4518
4519 MY_HOOKS hooks(this, create_table, select_tables);
4520 hook_ptr= &hooks;
4521
4522 unit= u;
4523
4524 /*
4525 Start a statement transaction before the create if we are using
4526 row-based replication for the statement. If we are creating a
4527 temporary table, we need to start a statement transaction.
4528 */
4529 if (!thd->lex->tmp_table() &&
4530 thd->is_current_stmt_binlog_format_row() &&
4531 mysql_bin_log.is_open())
4532 {
4533 thd->binlog_start_trans_and_stmt();
4534 }
4535
4536 DEBUG_SYNC(thd,"create_table_select_before_check_if_exists");
4537
4538 if (!(table= create_table_from_items(thd, &values, &extra_lock, hook_ptr)))
4539 {
4540 if (create_info->or_replace())
4541 {
4542 /* Original table was deleted. We have to log it */
4543 log_drop_table(thd, &create_table->db, &create_table->table_name,
4544 thd->lex->tmp_table());
4545 }
4546
4547 /* abort() deletes table */
4548 DBUG_RETURN(-1);
4549 }
4550
4551 if (create_info->tmp_table())
4552 {
4553 /*
4554 When the temporary table was created & opened in create_table_impl(),
4555 the table's TABLE_SHARE (and thus TABLE) object was also linked to THD
4556 temporary tables lists. So, we must temporarily remove it from the
4557 list to keep them inaccessible from inner statements.
4558 e.g. CREATE TEMPORARY TABLE `t1` AS SELECT * FROM `t1`;
4559 */
4560 saved_tmp_table_share= thd->save_tmp_table_share(create_table->table);
4561 }
4562
4563 if (extra_lock)
4564 {
4565 DBUG_ASSERT(m_plock == NULL);
4566
4567 if (create_info->tmp_table())
4568 m_plock= &m_lock;
4569 else
4570 m_plock= &thd->extra_lock;
4571
4572 *m_plock= extra_lock;
4573 }
4574
4575 if (table->s->fields < values.elements)
4576 {
4577 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
4578 DBUG_RETURN(-1);
4579 }
4580
4581 /* First field to copy */
4582 field= table->field+table->s->fields;
4583
4584 /* Mark all fields that are given values */
4585 for (uint n= values.elements; n; )
4586 {
4587 if ((*--field)->invisible >= INVISIBLE_SYSTEM)
4588 continue;
4589 n--;
4590 bitmap_set_bit(table->write_set, (*field)->field_index);
4591 }
4592
4593 table->next_number_field=table->found_next_number_field;
4594
4595 restore_record(table,s->default_values); // Get empty record
4596 thd->cuted_fields=0;
4597 if (info.ignore || info.handle_duplicates != DUP_ERROR)
4598 {
4599 table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
4600 if (table->file->ha_table_flags() & HA_DUPLICATE_POS &&
4601 table->file->ha_rnd_init_with_error(0))
4602 DBUG_RETURN(1);
4603 }
4604 if (info.handle_duplicates == DUP_REPLACE &&
4605 (!table->triggers || !table->triggers->has_delete_triggers()))
4606 table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
4607 if (info.handle_duplicates == DUP_UPDATE)
4608 table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
4609 if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
4610 table->file->ha_start_bulk_insert((ha_rows) 0);
4611 thd->abort_on_warning= !info.ignore && thd->is_strict_mode();
4612 if (check_that_all_fields_are_given_values(thd, table, table_list))
4613 DBUG_RETURN(1);
4614 table->mark_columns_needed_for_insert();
4615 table->file->extra(HA_EXTRA_WRITE_CACHE);
4616 // Mark table as used
4617 table->query_id= thd->query_id;
4618 DBUG_RETURN(0);
4619 }
4620
4621 int
binlog_show_create_table(TABLE ** tables,uint count)4622 select_create::binlog_show_create_table(TABLE **tables, uint count)
4623 {
4624 /*
4625 Note 1: In RBR mode, we generate a CREATE TABLE statement for the
4626 created table by calling show_create_table(). In the event of an error,
4627 nothing should be written to the binary log, even if the table is
4628 non-transactional; therefore we pretend that the generated CREATE TABLE
4629 statement is for a transactional table. The event will then be put in the
4630 transaction cache, and any subsequent events (e.g., table-map events and
4631 binrow events) will also be put there. We can then use
4632 ha_autocommit_or_rollback() to either throw away the entire kaboodle of
4633 events, or write them to the binary log.
4634
4635 We write the CREATE TABLE statement here and not in prepare()
4636 since there potentially are sub-selects or accesses to information
4637 schema that will do a close_thread_tables(), destroying the
4638 statement transaction cache.
4639 */
4640 DBUG_ASSERT(thd->is_current_stmt_binlog_format_row());
4641 DBUG_ASSERT(tables && *tables && count > 0);
4642
4643 StringBuffer<2048> query(system_charset_info);
4644 int result;
4645 TABLE_LIST tmp_table_list;
4646
4647 tmp_table_list.reset();
4648 tmp_table_list.table = *tables;
4649
4650 result= show_create_table(thd, &tmp_table_list, &query,
4651 create_info, WITH_DB_NAME);
4652 DBUG_ASSERT(result == 0); /* show_create_table() always return 0 */
4653
4654 if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
4655 {
4656 int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
4657 result= thd->binlog_query(THD::STMT_QUERY_TYPE,
4658 query.ptr(), query.length(),
4659 /* is_trans */ TRUE,
4660 /* direct */ FALSE,
4661 /* suppress_use */ FALSE,
4662 errcode) > 0;
4663 }
4664
4665 ha_fake_trx_id(thd);
4666
4667 return result;
4668 }
4669
store_values(List<Item> & values)4670 void select_create::store_values(List<Item> &values)
4671 {
4672 fill_record_n_invoke_before_triggers(thd, table, field, values, 1,
4673 TRG_EVENT_INSERT);
4674 }
4675
4676
send_eof()4677 bool select_create::send_eof()
4678 {
4679 DBUG_ENTER("select_create::send_eof");
4680
4681 /*
4682 The routine that writes the statement in the binary log
4683 is in select_insert::prepare_eof(). For that reason, we
4684 mark the flag at this point.
4685 */
4686 if (table->s->tmp_table)
4687 thd->transaction.stmt.mark_created_temp_table();
4688
4689 if (prepare_eof())
4690 {
4691 abort_result_set();
4692 DBUG_RETURN(true);
4693 }
4694
4695 if (table->s->tmp_table)
4696 {
4697 /*
4698 Now is good time to add the new table to THD temporary tables list.
4699 But, before that we need to check if same table got created by the sub-
4700 statement.
4701 */
4702 if (thd->find_tmp_table_share(table->s->table_cache_key.str,
4703 table->s->table_cache_key.length))
4704 {
4705 my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table->alias.c_ptr());
4706 abort_result_set();
4707 DBUG_RETURN(true);
4708 }
4709 else
4710 {
4711 DBUG_ASSERT(saved_tmp_table_share);
4712 thd->restore_tmp_table_share(saved_tmp_table_share);
4713 }
4714 }
4715
4716 /*
4717 Do an implicit commit at end of statement for non-temporary
4718 tables. This can fail, but we should unlock the table
4719 nevertheless.
4720 */
4721 if (!table->s->tmp_table)
4722 {
4723 #ifdef WITH_WSREP
4724 if (WSREP_ON)
4725 {
4726 /*
4727 append table level exclusive key for CTAS
4728 */
4729 wsrep_key_arr_t key_arr= {0, 0};
4730 wsrep_prepare_keys_for_isolation(thd,
4731 create_table->db.str,
4732 create_table->table_name.str,
4733 table_list,
4734 &key_arr);
4735 int rcode = wsrep->append_key(
4736 wsrep,
4737 &thd->wsrep_ws_handle,
4738 key_arr.keys, //&wkey,
4739 key_arr.keys_len,
4740 WSREP_KEY_EXCLUSIVE,
4741 false);
4742 wsrep_keys_free(&key_arr);
4743 if (rcode) {
4744 DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
4745 WSREP_ERROR("Appending table key for CTAS failed: %s, %d",
4746 (wsrep_thd_query(thd)) ?
4747 wsrep_thd_query(thd) : "void", rcode);
4748 abort_result_set();
4749 DBUG_RETURN(true);
4750 }
4751 /* If commit fails, we should be able to reset the OK status. */
4752 thd->get_stmt_da()->set_overwrite_status(TRUE);
4753 }
4754 #endif /* WITH_WSREP */
4755 trans_commit_stmt(thd);
4756 if (!(thd->variables.option_bits & OPTION_GTID_BEGIN))
4757 trans_commit_implicit(thd);
4758 #ifdef WITH_WSREP
4759 if (WSREP_ON)
4760 {
4761 thd->get_stmt_da()->set_overwrite_status(FALSE);
4762 mysql_mutex_lock(&thd->LOCK_thd_data);
4763 if (thd->wsrep_conflict_state != NO_CONFLICT)
4764 {
4765 WSREP_DEBUG("select_create commit failed, thd: %lld err: %d %s",
4766 (longlong) thd->thread_id, thd->wsrep_conflict_state,
4767 thd->query());
4768 mysql_mutex_unlock(&thd->LOCK_thd_data);
4769 abort_result_set();
4770 DBUG_RETURN(true);
4771 }
4772 mysql_mutex_unlock(&thd->LOCK_thd_data);
4773 }
4774 #endif /* WITH_WSREP */
4775 }
4776
4777 /*
4778 exit_done must only be set after last potential call to
4779 abort_result_set().
4780 */
4781 exit_done= 1; // Avoid double calls
4782
4783 send_ok_packet();
4784
4785 if (m_plock)
4786 {
4787 MYSQL_LOCK *lock= *m_plock;
4788 *m_plock= NULL;
4789 m_plock= NULL;
4790
4791 if (create_info->pos_in_locked_tables)
4792 {
4793 /*
4794 If we are under lock tables, we have created a table that was
4795 originally locked. We should add back the lock to ensure that
4796 all tables in the thd->open_list are locked!
4797 */
4798 table->mdl_ticket= create_info->mdl_ticket;
4799
4800 /* The following should never fail, except if out of memory */
4801 if (!thd->locked_tables_list.restore_lock(thd,
4802 create_info->
4803 pos_in_locked_tables,
4804 table, lock))
4805 DBUG_RETURN(false); // ok
4806 /* Fail. Continue without locking the table */
4807 }
4808 mysql_unlock_tables(thd, lock);
4809 }
4810 DBUG_RETURN(false);
4811 }
4812
4813
abort_result_set()4814 void select_create::abort_result_set()
4815 {
4816 ulonglong save_option_bits;
4817 DBUG_ENTER("select_create::abort_result_set");
4818
4819 /* Avoid double calls, could happen in case of out of memory on cleanup */
4820 if (exit_done)
4821 DBUG_VOID_RETURN;
4822 exit_done= 1;
4823
4824 /*
4825 In select_insert::abort_result_set() we roll back the statement, including
4826 truncating the transaction cache of the binary log. To do this, we
4827 pretend that the statement is transactional, even though it might
4828 be the case that it was not.
4829
4830 We roll back the statement prior to deleting the table and prior
4831 to releasing the lock on the table, since there might be potential
4832 for failure if the rollback is executed after the drop or after
4833 unlocking the table.
4834
4835 We also roll back the statement regardless of whether the creation
4836 of the table succeeded or not, since we need to reset the binary
4837 log state.
4838
4839 However if there was an original table that was deleted, as part of
4840 create or replace table, then we must log the statement.
4841 */
4842
4843 save_option_bits= thd->variables.option_bits;
4844 thd->variables.option_bits&= ~OPTION_BIN_LOG;
4845 select_insert::abort_result_set();
4846 thd->transaction.stmt.modified_non_trans_table= FALSE;
4847 thd->variables.option_bits= save_option_bits;
4848
4849 /* possible error of writing binary log is ignored deliberately */
4850 (void) thd->binlog_flush_pending_rows_event(TRUE, TRUE);
4851
4852 if (table)
4853 {
4854 bool tmp_table= table->s->tmp_table;
4855 bool table_creation_was_logged= (!tmp_table ||
4856 table->s->table_creation_was_logged);
4857 if (tmp_table)
4858 {
4859 DBUG_ASSERT(saved_tmp_table_share);
4860 thd->restore_tmp_table_share(saved_tmp_table_share);
4861 }
4862
4863 if (table->file->inited &&
4864 (info.ignore || info.handle_duplicates != DUP_ERROR) &&
4865 (table->file->ha_table_flags() & HA_DUPLICATE_POS))
4866 table->file->ha_rnd_end();
4867 table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
4868 table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
4869 table->auto_increment_field_not_null= FALSE;
4870
4871 if (m_plock)
4872 {
4873 mysql_unlock_tables(thd, *m_plock);
4874 *m_plock= NULL;
4875 m_plock= NULL;
4876 }
4877
4878 drop_open_table(thd, table, &create_table->db, &create_table->table_name);
4879 table=0; // Safety
4880 if (thd->log_current_statement && mysql_bin_log.is_open())
4881 {
4882 /* Remove logging of drop, create + insert rows */
4883 binlog_reset_cache(thd);
4884 /* Original table was deleted. We have to log it */
4885 if (table_creation_was_logged)
4886 log_drop_table(thd, &create_table->db, &create_table->table_name,
4887 tmp_table);
4888 }
4889 }
4890
4891 if (create_info->table_was_deleted)
4892 {
4893 /* Unlock locked table that was dropped by CREATE. */
4894 (void) trans_rollback_stmt(thd);
4895 thd->locked_tables_list.unlock_locked_table(thd, create_info->mdl_ticket);
4896 }
4897
4898 DBUG_VOID_RETURN;
4899 }
4900