1 /* Copyright (c) 2000, 2016, Oracle and/or its affiliates.
2 Copyright (c) 2009, 2019, MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software Foundation,
15 Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 /** @file handler.cc
18
19 @brief
20 Handler-calling-functions
21 */
22
23 #include "mariadb.h"
24 #include <inttypes.h>
25 #include "sql_priv.h"
26 #include "unireg.h"
27 #include "rpl_rli.h"
28 #include "sql_cache.h" // query_cache, query_cache_*
29 #include "sql_connect.h" // global_table_stats
30 #include "key.h" // key_copy, key_unpack, key_cmp_if_same, key_cmp
31 #include "sql_table.h" // build_table_filename
32 #include "sql_parse.h" // check_stack_overrun
33 #include "sql_acl.h" // SUPER_ACL
34 #include "sql_base.h" // TDC_element
35 #include "discover.h" // extension_based_table_discovery, etc
36 #include "log_event.h" // *_rows_log_event
37 #include "create_options.h"
38 #include <myisampack.h>
39 #include "transaction.h"
40 #include "myisam.h"
41 #include "probes_mysql.h"
42 #include <mysql/psi/mysql_table.h>
43 #include "debug_sync.h" // DEBUG_SYNC
44 #include "sql_audit.h"
45 #include "ha_sequence.h"
46
47 #ifdef WITH_PARTITION_STORAGE_ENGINE
48 #include "ha_partition.h"
49 #endif
50
51 #ifdef WITH_ARIA_STORAGE_ENGINE
52 #include "../storage/maria/ha_maria.h"
53 #endif
54 #include "semisync_master.h"
55
56 #include "wsrep_mysqld.h"
57 #include "wsrep.h"
58 #include "wsrep_xid.h"
59
60 /*
61 While we have legacy_db_type, we have this array to
62 check for dups and to find handlerton from legacy_db_type.
63 Remove when legacy_db_type is finally gone
64 */
65 st_plugin_int *hton2plugin[MAX_HA];
66
67 static handlerton *installed_htons[128];
68
69 #define BITMAP_STACKBUF_SIZE (128/8)
70
71 KEY_CREATE_INFO default_key_create_info=
72 { HA_KEY_ALG_UNDEF, 0, 0, {NullS, 0}, {NullS, 0}, true };
73
74 /* number of entries in handlertons[] */
75 ulong total_ha= 0;
76 /* number of storage engines (from handlertons[]) that support 2pc */
77 ulong total_ha_2pc= 0;
78 #ifdef DBUG_ASSERT_EXISTS
79 /*
80 Number of non-mandatory 2pc handlertons whose initialization failed
81 to estimate total_ha_2pc value under supposition of the failures
82 have not occcured.
83 */
84 ulong failed_ha_2pc= 0;
85 #endif
86 /* size of savepoint storage area (see ha_init) */
87 ulong savepoint_alloc_size= 0;
88
89 static const LEX_CSTRING sys_table_aliases[]=
90 {
91 { STRING_WITH_LEN("INNOBASE") }, { STRING_WITH_LEN("INNODB") },
92 { STRING_WITH_LEN("HEAP") }, { STRING_WITH_LEN("MEMORY") },
93 { STRING_WITH_LEN("MERGE") }, { STRING_WITH_LEN("MRG_MYISAM") },
94 { STRING_WITH_LEN("Maria") }, { STRING_WITH_LEN("Aria") },
95 {NullS, 0}
96 };
97
98 const char *ha_row_type[] = {
99 "", "FIXED", "DYNAMIC", "COMPRESSED", "REDUNDANT", "COMPACT", "PAGE"
100 };
101
102 const char *tx_isolation_names[] =
103 { "READ-UNCOMMITTED", "READ-COMMITTED", "REPEATABLE-READ", "SERIALIZABLE",
104 NullS};
105 TYPELIB tx_isolation_typelib= {array_elements(tx_isolation_names)-1,"",
106 tx_isolation_names, NULL};
107
108 static TYPELIB known_extensions= {0,"known_exts", NULL, NULL};
109 uint known_extensions_id= 0;
110
111 static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans,
112 bool is_real_trans);
113
114
ha_default_plugin(THD * thd)115 static plugin_ref ha_default_plugin(THD *thd)
116 {
117 if (thd->variables.table_plugin)
118 return thd->variables.table_plugin;
119 return my_plugin_lock(thd, global_system_variables.table_plugin);
120 }
121
ha_default_tmp_plugin(THD * thd)122 static plugin_ref ha_default_tmp_plugin(THD *thd)
123 {
124 if (thd->variables.tmp_table_plugin)
125 return thd->variables.tmp_table_plugin;
126 if (global_system_variables.tmp_table_plugin)
127 return my_plugin_lock(thd, global_system_variables.tmp_table_plugin);
128 return ha_default_plugin(thd);
129 }
130
131
132 /** @brief
133 Return the default storage engine handlerton for thread
134
135 SYNOPSIS
136 ha_default_handlerton(thd)
137 thd current thread
138
139 RETURN
140 pointer to handlerton
141 */
ha_default_handlerton(THD * thd)142 handlerton *ha_default_handlerton(THD *thd)
143 {
144 plugin_ref plugin= ha_default_plugin(thd);
145 DBUG_ASSERT(plugin);
146 handlerton *hton= plugin_hton(plugin);
147 DBUG_ASSERT(hton);
148 return hton;
149 }
150
151
ha_default_tmp_handlerton(THD * thd)152 handlerton *ha_default_tmp_handlerton(THD *thd)
153 {
154 plugin_ref plugin= ha_default_tmp_plugin(thd);
155 DBUG_ASSERT(plugin);
156 handlerton *hton= plugin_hton(plugin);
157 DBUG_ASSERT(hton);
158 return hton;
159 }
160
161
162 /** @brief
163 Return the storage engine handlerton for the supplied name
164
165 SYNOPSIS
166 ha_resolve_by_name(thd, name)
167 thd current thread
168 name name of storage engine
169
170 RETURN
171 pointer to storage engine plugin handle
172 */
ha_resolve_by_name(THD * thd,const LEX_CSTRING * name,bool tmp_table)173 plugin_ref ha_resolve_by_name(THD *thd, const LEX_CSTRING *name,
174 bool tmp_table)
175 {
176 const LEX_CSTRING *table_alias;
177 plugin_ref plugin;
178
179 redo:
180 /* my_strnncoll is a macro and gcc doesn't do early expansion of macro */
181 if (thd && !my_charset_latin1.coll->strnncoll(&my_charset_latin1,
182 (const uchar *)name->str, name->length,
183 (const uchar *)STRING_WITH_LEN("DEFAULT"), 0))
184 return tmp_table ? ha_default_tmp_plugin(thd) : ha_default_plugin(thd);
185
186 if ((plugin= my_plugin_lock_by_name(thd, name, MYSQL_STORAGE_ENGINE_PLUGIN)))
187 {
188 handlerton *hton= plugin_hton(plugin);
189 if (hton && !(hton->flags & HTON_NOT_USER_SELECTABLE))
190 return plugin;
191
192 /*
193 unlocking plugin immediately after locking is relatively low cost.
194 */
195 plugin_unlock(thd, plugin);
196 }
197
198 /*
199 We check for the historical aliases.
200 */
201 for (table_alias= sys_table_aliases; table_alias->str; table_alias+= 2)
202 {
203 if (!my_strnncoll(&my_charset_latin1,
204 (const uchar *)name->str, name->length,
205 (const uchar *)table_alias->str, table_alias->length))
206 {
207 name= table_alias + 1;
208 goto redo;
209 }
210 }
211
212 return NULL;
213 }
214
215
216 bool
resolve_storage_engine_with_error(THD * thd,handlerton ** ha,bool tmp_table)217 Storage_engine_name::resolve_storage_engine_with_error(THD *thd,
218 handlerton **ha,
219 bool tmp_table)
220 {
221 if (plugin_ref plugin= ha_resolve_by_name(thd, &m_storage_engine_name,
222 tmp_table))
223 {
224 *ha= plugin_hton(plugin);
225 return false;
226 }
227
228 *ha= NULL;
229 if (thd->variables.sql_mode & MODE_NO_ENGINE_SUBSTITUTION)
230 {
231 my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), m_storage_engine_name.str);
232 return true;
233 }
234 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
235 ER_UNKNOWN_STORAGE_ENGINE,
236 ER_THD(thd, ER_UNKNOWN_STORAGE_ENGINE),
237 m_storage_engine_name.str);
238 return false;
239 }
240
241
ha_lock_engine(THD * thd,const handlerton * hton)242 plugin_ref ha_lock_engine(THD *thd, const handlerton *hton)
243 {
244 if (hton)
245 {
246 st_plugin_int *plugin= hton2plugin[hton->slot];
247 return my_plugin_lock(thd, plugin_int_to_ref(plugin));
248 }
249 return NULL;
250 }
251
252
ha_resolve_by_legacy_type(THD * thd,enum legacy_db_type db_type)253 handlerton *ha_resolve_by_legacy_type(THD *thd, enum legacy_db_type db_type)
254 {
255 plugin_ref plugin;
256 switch (db_type) {
257 case DB_TYPE_DEFAULT:
258 return ha_default_handlerton(thd);
259 default:
260 if (db_type > DB_TYPE_UNKNOWN && db_type < DB_TYPE_DEFAULT &&
261 (plugin= ha_lock_engine(thd, installed_htons[db_type])))
262 return plugin_hton(plugin);
263 /* fall through */
264 case DB_TYPE_UNKNOWN:
265 return NULL;
266 }
267 }
268
269
270 /**
271 Use other database handler if databasehandler is not compiled in.
272 */
ha_checktype(THD * thd,handlerton * hton,bool no_substitute)273 handlerton *ha_checktype(THD *thd, handlerton *hton, bool no_substitute)
274 {
275 if (ha_storage_engine_is_enabled(hton))
276 return hton;
277
278 if (no_substitute)
279 return NULL;
280
281 return ha_default_handlerton(thd);
282 } /* ha_checktype */
283
284
get_new_handler(TABLE_SHARE * share,MEM_ROOT * alloc,handlerton * db_type)285 handler *get_new_handler(TABLE_SHARE *share, MEM_ROOT *alloc,
286 handlerton *db_type)
287 {
288 handler *file;
289 DBUG_ENTER("get_new_handler");
290 DBUG_PRINT("enter", ("alloc: %p", alloc));
291
292 if (db_type && db_type->state == SHOW_OPTION_YES && db_type->create)
293 {
294 if ((file= db_type->create(db_type, share, alloc)))
295 file->init();
296 DBUG_RETURN(file);
297 }
298 /*
299 Try the default table type
300 Here the call to current_thd() is ok as we call this function a lot of
301 times but we enter this branch very seldom.
302 */
303 file= get_new_handler(share, alloc, ha_default_handlerton(current_thd));
304 DBUG_RETURN(file);
305 }
306
307
308 #ifdef WITH_PARTITION_STORAGE_ENGINE
get_ha_partition(partition_info * part_info)309 handler *get_ha_partition(partition_info *part_info)
310 {
311 ha_partition *partition;
312 DBUG_ENTER("get_ha_partition");
313 if ((partition= new ha_partition(partition_hton, part_info)))
314 {
315 if (partition->initialize_partition(current_thd->mem_root))
316 {
317 delete partition;
318 partition= 0;
319 }
320 else
321 partition->init();
322 }
323 else
324 {
325 my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
326 static_cast<int>(sizeof(ha_partition)));
327 }
328 DBUG_RETURN(((handler*) partition));
329 }
330 #endif
331
332 static const char **handler_errmsgs;
333
334 C_MODE_START
get_handler_errmsgs(int nr)335 static const char **get_handler_errmsgs(int nr)
336 {
337 return handler_errmsgs;
338 }
339 C_MODE_END
340
341
342 /**
343 Register handler error messages for use with my_error().
344
345 @retval
346 0 OK
347 @retval
348 !=0 Error
349 */
350
ha_init_errors(void)351 int ha_init_errors(void)
352 {
353 #define SETMSG(nr, msg) handler_errmsgs[(nr) - HA_ERR_FIRST]= (msg)
354
355 /* Allocate a pointer array for the error message strings. */
356 /* Zerofill it to avoid uninitialized gaps. */
357 if (! (handler_errmsgs= (const char**) my_malloc(HA_ERR_ERRORS * sizeof(char*),
358 MYF(MY_WME | MY_ZEROFILL))))
359 return 1;
360
361 /* Set the dedicated error messages. */
362 SETMSG(HA_ERR_KEY_NOT_FOUND, ER_DEFAULT(ER_KEY_NOT_FOUND));
363 SETMSG(HA_ERR_FOUND_DUPP_KEY, ER_DEFAULT(ER_DUP_KEY));
364 SETMSG(HA_ERR_RECORD_CHANGED, "Update which is recoverable");
365 SETMSG(HA_ERR_WRONG_INDEX, "Wrong index given to function");
366 SETMSG(HA_ERR_CRASHED, ER_DEFAULT(ER_NOT_KEYFILE));
367 SETMSG(HA_ERR_WRONG_IN_RECORD, ER_DEFAULT(ER_CRASHED_ON_USAGE));
368 SETMSG(HA_ERR_OUT_OF_MEM, "Table handler out of memory");
369 SETMSG(HA_ERR_NOT_A_TABLE, "Incorrect file format '%.64s'");
370 SETMSG(HA_ERR_WRONG_COMMAND, "Command not supported");
371 SETMSG(HA_ERR_OLD_FILE, ER_DEFAULT(ER_OLD_KEYFILE));
372 SETMSG(HA_ERR_NO_ACTIVE_RECORD, "No record read in update");
373 SETMSG(HA_ERR_RECORD_DELETED, "Intern record deleted");
374 SETMSG(HA_ERR_RECORD_FILE_FULL, ER_DEFAULT(ER_RECORD_FILE_FULL));
375 SETMSG(HA_ERR_INDEX_FILE_FULL, "No more room in index file '%.64s'");
376 SETMSG(HA_ERR_END_OF_FILE, "End in next/prev/first/last");
377 SETMSG(HA_ERR_UNSUPPORTED, ER_DEFAULT(ER_ILLEGAL_HA));
378 SETMSG(HA_ERR_TO_BIG_ROW, "Too big row");
379 SETMSG(HA_WRONG_CREATE_OPTION, "Wrong create option");
380 SETMSG(HA_ERR_FOUND_DUPP_UNIQUE, ER_DEFAULT(ER_DUP_UNIQUE));
381 SETMSG(HA_ERR_UNKNOWN_CHARSET, "Can't open charset");
382 SETMSG(HA_ERR_WRONG_MRG_TABLE_DEF, ER_DEFAULT(ER_WRONG_MRG_TABLE));
383 SETMSG(HA_ERR_CRASHED_ON_REPAIR, ER_DEFAULT(ER_CRASHED_ON_REPAIR));
384 SETMSG(HA_ERR_CRASHED_ON_USAGE, ER_DEFAULT(ER_CRASHED_ON_USAGE));
385 SETMSG(HA_ERR_LOCK_WAIT_TIMEOUT, ER_DEFAULT(ER_LOCK_WAIT_TIMEOUT));
386 SETMSG(HA_ERR_LOCK_TABLE_FULL, ER_DEFAULT(ER_LOCK_TABLE_FULL));
387 SETMSG(HA_ERR_READ_ONLY_TRANSACTION, ER_DEFAULT(ER_READ_ONLY_TRANSACTION));
388 SETMSG(HA_ERR_LOCK_DEADLOCK, ER_DEFAULT(ER_LOCK_DEADLOCK));
389 SETMSG(HA_ERR_CANNOT_ADD_FOREIGN, ER_DEFAULT(ER_CANNOT_ADD_FOREIGN));
390 SETMSG(HA_ERR_NO_REFERENCED_ROW, ER_DEFAULT(ER_NO_REFERENCED_ROW_2));
391 SETMSG(HA_ERR_ROW_IS_REFERENCED, ER_DEFAULT(ER_ROW_IS_REFERENCED_2));
392 SETMSG(HA_ERR_NO_SAVEPOINT, "No savepoint with that name");
393 SETMSG(HA_ERR_NON_UNIQUE_BLOCK_SIZE, "Non unique key block size");
394 SETMSG(HA_ERR_NO_SUCH_TABLE, "No such table: '%.64s'");
395 SETMSG(HA_ERR_TABLE_EXIST, ER_DEFAULT(ER_TABLE_EXISTS_ERROR));
396 SETMSG(HA_ERR_NO_CONNECTION, "Could not connect to storage engine");
397 SETMSG(HA_ERR_TABLE_DEF_CHANGED, ER_DEFAULT(ER_TABLE_DEF_CHANGED));
398 SETMSG(HA_ERR_FOREIGN_DUPLICATE_KEY, "FK constraint would lead to duplicate key");
399 SETMSG(HA_ERR_TABLE_NEEDS_UPGRADE, ER_DEFAULT(ER_TABLE_NEEDS_UPGRADE));
400 SETMSG(HA_ERR_TABLE_READONLY, ER_DEFAULT(ER_OPEN_AS_READONLY));
401 SETMSG(HA_ERR_AUTOINC_READ_FAILED, ER_DEFAULT(ER_AUTOINC_READ_FAILED));
402 SETMSG(HA_ERR_AUTOINC_ERANGE, ER_DEFAULT(ER_WARN_DATA_OUT_OF_RANGE));
403 SETMSG(HA_ERR_TOO_MANY_CONCURRENT_TRXS, ER_DEFAULT(ER_TOO_MANY_CONCURRENT_TRXS));
404 SETMSG(HA_ERR_INDEX_COL_TOO_LONG, ER_DEFAULT(ER_INDEX_COLUMN_TOO_LONG));
405 SETMSG(HA_ERR_INDEX_CORRUPT, ER_DEFAULT(ER_INDEX_CORRUPT));
406 SETMSG(HA_FTS_INVALID_DOCID, "Invalid InnoDB FTS Doc ID");
407 SETMSG(HA_ERR_TABLE_IN_FK_CHECK, ER_DEFAULT(ER_TABLE_IN_FK_CHECK));
408 SETMSG(HA_ERR_DISK_FULL, ER_DEFAULT(ER_DISK_FULL));
409 SETMSG(HA_ERR_FTS_TOO_MANY_WORDS_IN_PHRASE, "Too many words in a FTS phrase or proximity search");
410 SETMSG(HA_ERR_FK_DEPTH_EXCEEDED, "Foreign key cascade delete/update exceeds");
411 SETMSG(HA_ERR_TABLESPACE_MISSING, ER_DEFAULT(ER_TABLESPACE_MISSING));
412
413 /* Register the error messages for use with my_error(). */
414 return my_error_register(get_handler_errmsgs, HA_ERR_FIRST, HA_ERR_LAST);
415 }
416
417
418 /**
419 Unregister handler error messages.
420
421 @retval
422 0 OK
423 @retval
424 !=0 Error
425 */
ha_finish_errors(void)426 static int ha_finish_errors(void)
427 {
428 /* Allocate a pointer array for the error message strings. */
429 my_error_unregister(HA_ERR_FIRST, HA_ERR_LAST);
430 my_free(handler_errmsgs);
431 handler_errmsgs= 0;
432 return 0;
433 }
434
435 static volatile int32 need_full_discover_for_existence= 0;
436 static volatile int32 engines_with_discover_file_names= 0;
437 static volatile int32 engines_with_discover= 0;
438
full_discover_for_existence(handlerton *,const char *,const char *)439 static int full_discover_for_existence(handlerton *, const char *, const char *)
440 { return 0; }
441
ext_based_existence(handlerton *,const char *,const char *)442 static int ext_based_existence(handlerton *, const char *, const char *)
443 { return 0; }
444
hton_ext_based_table_discovery(handlerton * hton,LEX_CSTRING * db,MY_DIR * dir,handlerton::discovered_list * result)445 static int hton_ext_based_table_discovery(handlerton *hton, LEX_CSTRING *db,
446 MY_DIR *dir, handlerton::discovered_list *result)
447 {
448 /*
449 tablefile_extensions[0] is the metadata file, see
450 the comment above tablefile_extensions declaration
451 */
452 return extension_based_table_discovery(dir, hton->tablefile_extensions[0],
453 result);
454 }
455
update_discovery_counters(handlerton * hton,int val)456 static void update_discovery_counters(handlerton *hton, int val)
457 {
458 if (hton->discover_table_existence == full_discover_for_existence)
459 my_atomic_add32(&need_full_discover_for_existence, val);
460
461 if (hton->discover_table_names && hton->tablefile_extensions[0])
462 my_atomic_add32(&engines_with_discover_file_names, val);
463
464 if (hton->discover_table)
465 my_atomic_add32(&engines_with_discover, val);
466 }
467
ha_finalize_handlerton(st_plugin_int * plugin)468 int ha_finalize_handlerton(st_plugin_int *plugin)
469 {
470 handlerton *hton= (handlerton *)plugin->data;
471 DBUG_ENTER("ha_finalize_handlerton");
472
473 /* hton can be NULL here, if ha_initialize_handlerton() failed. */
474 if (!hton)
475 goto end;
476
477 switch (hton->state) {
478 case SHOW_OPTION_NO:
479 case SHOW_OPTION_DISABLED:
480 break;
481 case SHOW_OPTION_YES:
482 if (installed_htons[hton->db_type] == hton)
483 installed_htons[hton->db_type]= NULL;
484 break;
485 };
486
487 if (hton->panic)
488 hton->panic(hton, HA_PANIC_CLOSE);
489
490 if (plugin->plugin->deinit)
491 {
492 /*
493 Today we have no defined/special behavior for uninstalling
494 engine plugins.
495 */
496 DBUG_PRINT("info", ("Deinitializing plugin: '%s'", plugin->name.str));
497 if (plugin->plugin->deinit(NULL))
498 {
499 DBUG_PRINT("warning", ("Plugin '%s' deinit function returned error.",
500 plugin->name.str));
501 }
502 }
503
504 free_sysvar_table_options(hton);
505 update_discovery_counters(hton, -1);
506
507 /*
508 In case a plugin is uninstalled and re-installed later, it should
509 reuse an array slot. Otherwise the number of uninstall/install
510 cycles would be limited.
511 */
512 if (hton->slot != HA_SLOT_UNDEF)
513 {
514 /* Make sure we are not unpluging another plugin */
515 DBUG_ASSERT(hton2plugin[hton->slot] == plugin);
516 DBUG_ASSERT(hton->slot < MAX_HA);
517 hton2plugin[hton->slot]= NULL;
518 }
519
520 my_free(hton);
521
522 end:
523 DBUG_RETURN(0);
524 }
525
526
ha_initialize_handlerton(st_plugin_int * plugin)527 int ha_initialize_handlerton(st_plugin_int *plugin)
528 {
529 handlerton *hton;
530 static const char *no_exts[]= { 0 };
531 DBUG_ENTER("ha_initialize_handlerton");
532 DBUG_PRINT("plugin", ("initialize plugin: '%s'", plugin->name.str));
533
534 hton= (handlerton *)my_malloc(sizeof(handlerton),
535 MYF(MY_WME | MY_ZEROFILL));
536 if (hton == NULL)
537 {
538 sql_print_error("Unable to allocate memory for plugin '%s' handlerton.",
539 plugin->name.str);
540 goto err_no_hton_memory;
541 }
542
543 hton->tablefile_extensions= no_exts;
544 hton->discover_table_names= hton_ext_based_table_discovery;
545
546 hton->slot= HA_SLOT_UNDEF;
547 /* Historical Requirement */
548 plugin->data= hton; // shortcut for the future
549 if (plugin->plugin->init && plugin->plugin->init(hton))
550 {
551 sql_print_error("Plugin '%s' init function returned error.",
552 plugin->name.str);
553 goto err;
554 }
555
556 // hton_ext_based_table_discovery() works only when discovery
557 // is supported and the engine if file-based.
558 if (hton->discover_table_names == hton_ext_based_table_discovery &&
559 (!hton->discover_table || !hton->tablefile_extensions[0]))
560 hton->discover_table_names= NULL;
561
562 // default discover_table_existence implementation
563 if (!hton->discover_table_existence && hton->discover_table)
564 {
565 if (hton->tablefile_extensions[0])
566 hton->discover_table_existence= ext_based_existence;
567 else
568 hton->discover_table_existence= full_discover_for_existence;
569 }
570
571 switch (hton->state) {
572 case SHOW_OPTION_NO:
573 break;
574 case SHOW_OPTION_YES:
575 {
576 uint tmp;
577 ulong fslot;
578
579 DBUG_EXECUTE_IF("unstable_db_type", {
580 static int i= (int) DB_TYPE_FIRST_DYNAMIC;
581 hton->db_type= (enum legacy_db_type)++i;
582 });
583
584 /* now check the db_type for conflict */
585 if (hton->db_type <= DB_TYPE_UNKNOWN ||
586 hton->db_type >= DB_TYPE_DEFAULT ||
587 installed_htons[hton->db_type])
588 {
589 int idx= (int) DB_TYPE_FIRST_DYNAMIC;
590
591 while (idx < (int) DB_TYPE_DEFAULT && installed_htons[idx])
592 idx++;
593
594 if (idx == (int) DB_TYPE_DEFAULT)
595 {
596 sql_print_warning("Too many storage engines!");
597 goto err_deinit;
598 }
599 if (hton->db_type != DB_TYPE_UNKNOWN)
600 sql_print_warning("Storage engine '%s' has conflicting typecode. "
601 "Assigning value %d.", plugin->plugin->name, idx);
602 hton->db_type= (enum legacy_db_type) idx;
603 }
604
605 /*
606 In case a plugin is uninstalled and re-installed later, it should
607 reuse an array slot. Otherwise the number of uninstall/install
608 cycles would be limited. So look for a free slot.
609 */
610 DBUG_PRINT("plugin", ("total_ha: %lu", total_ha));
611 for (fslot= 0; fslot < total_ha; fslot++)
612 {
613 if (!hton2plugin[fslot])
614 break;
615 }
616 if (fslot < total_ha)
617 hton->slot= fslot;
618 else
619 {
620 if (total_ha >= MAX_HA)
621 {
622 sql_print_error("Too many plugins loaded. Limit is %lu. "
623 "Failed on '%s'", (ulong) MAX_HA, plugin->name.str);
624 goto err_deinit;
625 }
626 hton->slot= total_ha++;
627 }
628 installed_htons[hton->db_type]= hton;
629 tmp= hton->savepoint_offset;
630 hton->savepoint_offset= savepoint_alloc_size;
631 savepoint_alloc_size+= tmp;
632 hton2plugin[hton->slot]=plugin;
633 if (hton->prepare)
634 {
635 total_ha_2pc++;
636 if (tc_log && tc_log != get_tc_log_implementation())
637 {
638 total_ha_2pc--;
639 hton->prepare= 0;
640 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
641 ER_UNKNOWN_ERROR,
642 "Cannot enable tc-log at run-time. "
643 "XA features of %s are disabled",
644 plugin->name.str);
645 }
646 }
647 break;
648 }
649 /* fall through */
650 default:
651 hton->state= SHOW_OPTION_DISABLED;
652 break;
653 }
654
655 /*
656 This is entirely for legacy. We will create a new "disk based" hton and a
657 "memory" hton which will be configurable longterm. We should be able to
658 remove partition.
659 */
660 switch (hton->db_type) {
661 case DB_TYPE_HEAP:
662 heap_hton= hton;
663 break;
664 case DB_TYPE_MYISAM:
665 myisam_hton= hton;
666 break;
667 case DB_TYPE_PARTITION_DB:
668 partition_hton= hton;
669 break;
670 case DB_TYPE_SEQUENCE:
671 sql_sequence_hton= hton;
672 break;
673 default:
674 break;
675 };
676
677 resolve_sysvar_table_options(hton);
678 update_discovery_counters(hton, 1);
679
680 DBUG_RETURN(0);
681
682 err_deinit:
683 /*
684 Let plugin do its inner deinitialization as plugin->init()
685 was successfully called before.
686 */
687 if (plugin->plugin->deinit)
688 (void) plugin->plugin->deinit(NULL);
689
690 err:
691 #ifdef DBUG_ASSERT_EXISTS
692 if (hton->prepare && hton->state == SHOW_OPTION_YES)
693 failed_ha_2pc++;
694 #endif
695 my_free(hton);
696 err_no_hton_memory:
697 plugin->data= NULL;
698 DBUG_RETURN(1);
699 }
700
ha_init()701 int ha_init()
702 {
703 int error= 0;
704 DBUG_ENTER("ha_init");
705
706 DBUG_ASSERT(total_ha < MAX_HA);
707 /*
708 Check if there is a transaction-capable storage engine besides the
709 binary log (which is considered a transaction-capable storage engine in
710 counting total_ha)
711 */
712 opt_using_transactions= total_ha>(ulong)opt_bin_log;
713 savepoint_alloc_size+= sizeof(SAVEPOINT);
714 DBUG_RETURN(error);
715 }
716
ha_end()717 int ha_end()
718 {
719 int error= 0;
720 DBUG_ENTER("ha_end");
721
722
723 /*
724 This should be eventually based on the graceful shutdown flag.
725 So if flag is equal to HA_PANIC_CLOSE, the deallocate
726 the errors.
727 */
728 if (unlikely(ha_finish_errors()))
729 error= 1;
730
731 DBUG_RETURN(error);
732 }
733
dropdb_handlerton(THD * unused1,plugin_ref plugin,void * path)734 static my_bool dropdb_handlerton(THD *unused1, plugin_ref plugin,
735 void *path)
736 {
737 handlerton *hton= plugin_hton(plugin);
738 if (hton->state == SHOW_OPTION_YES && hton->drop_database)
739 hton->drop_database(hton, (char *)path);
740 return FALSE;
741 }
742
743
ha_drop_database(char * path)744 void ha_drop_database(char* path)
745 {
746 plugin_foreach(NULL, dropdb_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, path);
747 }
748
749
checkpoint_state_handlerton(THD * unused1,plugin_ref plugin,void * disable)750 static my_bool checkpoint_state_handlerton(THD *unused1, plugin_ref plugin,
751 void *disable)
752 {
753 handlerton *hton= plugin_hton(plugin);
754 if (hton->state == SHOW_OPTION_YES && hton->checkpoint_state)
755 hton->checkpoint_state(hton, (int) *(bool*) disable);
756 return FALSE;
757 }
758
759
ha_checkpoint_state(bool disable)760 void ha_checkpoint_state(bool disable)
761 {
762 plugin_foreach(NULL, checkpoint_state_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &disable);
763 }
764
765
766 struct st_commit_checkpoint_request {
767 void *cookie;
768 void (*pre_hook)(void *);
769 };
770
commit_checkpoint_request_handlerton(THD * unused1,plugin_ref plugin,void * data)771 static my_bool commit_checkpoint_request_handlerton(THD *unused1, plugin_ref plugin,
772 void *data)
773 {
774 st_commit_checkpoint_request *st= (st_commit_checkpoint_request *)data;
775 handlerton *hton= plugin_hton(plugin);
776 if (hton->state == SHOW_OPTION_YES && hton->commit_checkpoint_request)
777 {
778 void *cookie= st->cookie;
779 if (st->pre_hook)
780 (*st->pre_hook)(cookie);
781 (*hton->commit_checkpoint_request)(hton, cookie);
782 }
783 return FALSE;
784 }
785
786
787 /*
788 Invoke commit_checkpoint_request() in all storage engines that implement it.
789
790 If pre_hook is non-NULL, the hook will be called prior to each invocation.
791 */
792 void
ha_commit_checkpoint_request(void * cookie,void (* pre_hook)(void *))793 ha_commit_checkpoint_request(void *cookie, void (*pre_hook)(void *))
794 {
795 st_commit_checkpoint_request st;
796 st.cookie= cookie;
797 st.pre_hook= pre_hook;
798 plugin_foreach(NULL, commit_checkpoint_request_handlerton,
799 MYSQL_STORAGE_ENGINE_PLUGIN, &st);
800 }
801
802
803
closecon_handlerton(THD * thd,plugin_ref plugin,void * unused)804 static my_bool closecon_handlerton(THD *thd, plugin_ref plugin,
805 void *unused)
806 {
807 handlerton *hton= plugin_hton(plugin);
808 /*
809 there's no need to rollback here as all transactions must
810 be rolled back already
811 */
812 if (hton->state == SHOW_OPTION_YES && thd_get_ha_data(thd, hton))
813 {
814 if (hton->close_connection)
815 hton->close_connection(hton, thd);
816 /* make sure ha_data is reset and ha_data_lock is released */
817 thd_set_ha_data(thd, hton, NULL);
818 }
819 return FALSE;
820 }
821
822 /**
823 @note
824 don't bother to rollback here, it's done already
825 */
ha_close_connection(THD * thd)826 void ha_close_connection(THD* thd)
827 {
828 plugin_foreach_with_mask(thd, closecon_handlerton,
829 MYSQL_STORAGE_ENGINE_PLUGIN,
830 PLUGIN_IS_DELETED|PLUGIN_IS_READY, 0);
831 }
832
kill_handlerton(THD * thd,plugin_ref plugin,void * level)833 static my_bool kill_handlerton(THD *thd, plugin_ref plugin,
834 void *level)
835 {
836 handlerton *hton= plugin_hton(plugin);
837
838 if (hton->state == SHOW_OPTION_YES && hton->kill_query &&
839 thd_get_ha_data(thd, hton))
840 hton->kill_query(hton, thd, *(enum thd_kill_levels *) level);
841 return FALSE;
842 }
843
ha_kill_query(THD * thd,enum thd_kill_levels level)844 void ha_kill_query(THD* thd, enum thd_kill_levels level)
845 {
846 DBUG_ENTER("ha_kill_query");
847 plugin_foreach(thd, kill_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &level);
848 DBUG_VOID_RETURN;
849 }
850
851
852 /* ========================================================================
853 ======================= TRANSACTIONS ===================================*/
854
855 /**
856 Transaction handling in the server
857 ==================================
858
859 In each client connection, MySQL maintains two transactional
860 states:
861 - a statement transaction,
862 - a standard, also called normal transaction.
863
864 Historical note
865 ---------------
866 "Statement transaction" is a non-standard term that comes
867 from the times when MySQL supported BerkeleyDB storage engine.
868
869 First of all, it should be said that in BerkeleyDB auto-commit
870 mode auto-commits operations that are atomic to the storage
871 engine itself, such as a write of a record, and are too
872 high-granular to be atomic from the application perspective
873 (MySQL). One SQL statement could involve many BerkeleyDB
874 auto-committed operations and thus BerkeleyDB auto-commit was of
875 little use to MySQL.
876
877 Secondly, instead of SQL standard savepoints, BerkeleyDB
878 provided the concept of "nested transactions". In a nutshell,
879 transactions could be arbitrarily nested, but when the parent
880 transaction was committed or aborted, all its child (nested)
881 transactions were handled committed or aborted as well.
882 Commit of a nested transaction, in turn, made its changes
883 visible, but not durable: it destroyed the nested transaction,
884 all its changes would become available to the parent and
885 currently active nested transactions of this parent.
886
887 So the mechanism of nested transactions was employed to
888 provide "all or nothing" guarantee of SQL statements
889 required by the standard.
890 A nested transaction would be created at start of each SQL
891 statement, and destroyed (committed or aborted) at statement
892 end. Such nested transaction was internally referred to as
893 a "statement transaction" and gave birth to the term.
894
895 (Historical note ends)
896
897 Since then a statement transaction is started for each statement
898 that accesses transactional tables or uses the binary log. If
899 the statement succeeds, the statement transaction is committed.
900 If the statement fails, the transaction is rolled back. Commits
901 of statement transactions are not durable -- each such
902 transaction is nested in the normal transaction, and if the
903 normal transaction is rolled back, the effects of all enclosed
904 statement transactions are undone as well. Technically,
905 a statement transaction can be viewed as a savepoint which is
906 maintained automatically in order to make effects of one
907 statement atomic.
908
909 The normal transaction is started by the user and is ended
910 usually upon a user request as well. The normal transaction
911 encloses transactions of all statements issued between
912 its beginning and its end.
913 In autocommit mode, the normal transaction is equivalent
914 to the statement transaction.
915
916 Since MySQL supports PSEA (pluggable storage engine
917 architecture), more than one transactional engine can be
918 active at a time. Hence transactions, from the server
919 point of view, are always distributed. In particular,
920 transactional state is maintained independently for each
921 engine. In order to commit a transaction the two phase
922 commit protocol is employed.
923
924 Not all statements are executed in context of a transaction.
925 Administrative and status information statements do not modify
926 engine data, and thus do not start a statement transaction and
927 also have no effect on the normal transaction. Examples of such
928 statements are SHOW STATUS and RESET SLAVE.
929
930 Similarly DDL statements are not transactional,
931 and therefore a transaction is [almost] never started for a DDL
932 statement. The difference between a DDL statement and a purely
933 administrative statement though is that a DDL statement always
934 commits the current transaction before proceeding, if there is
935 any.
936
937 At last, SQL statements that work with non-transactional
938 engines also have no effect on the transaction state of the
939 connection. Even though they are written to the binary log,
940 and the binary log is, overall, transactional, the writes
941 are done in "write-through" mode, directly to the binlog
942 file, followed with a OS cache sync, in other words,
943 bypassing the binlog undo log (translog).
944 They do not commit the current normal transaction.
945 A failure of a statement that uses non-transactional tables
946 would cause a rollback of the statement transaction, but
947 in case there no non-transactional tables are used,
948 no statement transaction is started.
949
950 Data layout
951 -----------
952
953 The server stores its transaction-related data in
954 thd->transaction. This structure has two members of type
955 THD_TRANS. These members correspond to the statement and
956 normal transactions respectively:
957
958 - thd->transaction.stmt contains a list of engines
959 that are participating in the given statement
960 - thd->transaction.all contains a list of engines that
961 have participated in any of the statement transactions started
962 within the context of the normal transaction.
963 Each element of the list contains a pointer to the storage
964 engine, engine-specific transactional data, and engine-specific
965 transaction flags.
966
967 In autocommit mode thd->transaction.all is empty.
968 Instead, data of thd->transaction.stmt is
969 used to commit/rollback the normal transaction.
970
971 The list of registered engines has a few important properties:
972 - no engine is registered in the list twice
973 - engines are present in the list a reverse temporal order --
974 new participants are always added to the beginning of the list.
975
976 Transaction life cycle
977 ----------------------
978
979 When a new connection is established, thd->transaction
980 members are initialized to an empty state.
981 If a statement uses any tables, all affected engines
982 are registered in the statement engine list. In
983 non-autocommit mode, the same engines are registered in
984 the normal transaction list.
985 At the end of the statement, the server issues a commit
986 or a roll back for all engines in the statement list.
987 At this point transaction flags of an engine, if any, are
988 propagated from the statement list to the list of the normal
989 transaction.
990 When commit/rollback is finished, the statement list is
991 cleared. It will be filled in again by the next statement,
992 and emptied again at the next statement's end.
993
994 The normal transaction is committed in a similar way
995 (by going over all engines in thd->transaction.all list)
996 but at different times:
997 - upon COMMIT SQL statement is issued by the user
998 - implicitly, by the server, at the beginning of a DDL statement
999 or SET AUTOCOMMIT={0|1} statement.
1000
1001 The normal transaction can be rolled back as well:
1002 - if the user has requested so, by issuing ROLLBACK SQL
1003 statement
1004 - if one of the storage engines requested a rollback
1005 by setting thd->transaction_rollback_request. This may
1006 happen in case, e.g., when the transaction in the engine was
1007 chosen a victim of the internal deadlock resolution algorithm
1008 and rolled back internally. When such a situation happens, there
1009 is little the server can do and the only option is to rollback
1010 transactions in all other participating engines. In this case
1011 the rollback is accompanied by an error sent to the user.
1012
1013 As follows from the use cases above, the normal transaction
1014 is never committed when there is an outstanding statement
1015 transaction. In most cases there is no conflict, since
1016 commits of the normal transaction are issued by a stand-alone
1017 administrative or DDL statement, thus no outstanding statement
1018 transaction of the previous statement exists. Besides,
1019 all statements that manipulate with the normal transaction
1020 are prohibited in stored functions and triggers, therefore
1021 no conflicting situation can occur in a sub-statement either.
1022 The remaining rare cases when the server explicitly has
1023 to commit the statement transaction prior to committing the normal
1024 one cover error-handling scenarios (see for example
1025 SQLCOM_LOCK_TABLES).
1026
1027 When committing a statement or a normal transaction, the server
1028 either uses the two-phase commit protocol, or issues a commit
1029 in each engine independently. The two-phase commit protocol
1030 is used only if:
1031 - all participating engines support two-phase commit (provide
1032 handlerton::prepare PSEA API call) and
1033 - transactions in at least two engines modify data (i.e. are
1034 not read-only).
1035
1036 Note that the two phase commit is used for
1037 statement transactions, even though they are not durable anyway.
1038 This is done to ensure logical consistency of data in a multiple-
1039 engine transaction.
1040 For example, imagine that some day MySQL supports unique
1041 constraint checks deferred till the end of statement. In such
1042 case a commit in one of the engines may yield ER_DUP_KEY,
1043 and MySQL should be able to gracefully abort statement
1044 transactions of other participants.
1045
1046 After the normal transaction has been committed,
1047 thd->transaction.all list is cleared.
1048
1049 When a connection is closed, the current normal transaction, if
1050 any, is rolled back.
1051
1052 Roles and responsibilities
1053 --------------------------
1054
1055 The server has no way to know that an engine participates in
1056 the statement and a transaction has been started
1057 in it unless the engine says so. Thus, in order to be
1058 a part of a transaction, the engine must "register" itself.
1059 This is done by invoking trans_register_ha() server call.
1060 Normally the engine registers itself whenever handler::external_lock()
1061 is called. trans_register_ha() can be invoked many times: if
1062 an engine is already registered, the call does nothing.
1063 In case autocommit is not set, the engine must register itself
1064 twice -- both in the statement list and in the normal transaction
1065 list.
1066 In which list to register is a parameter of trans_register_ha().
1067
1068 Note, that although the registration interface in itself is
1069 fairly clear, the current usage practice often leads to undesired
1070 effects. E.g. since a call to trans_register_ha() in most engines
1071 is embedded into implementation of handler::external_lock(), some
1072 DDL statements start a transaction (at least from the server
1073 point of view) even though they are not expected to. E.g.
1074 CREATE TABLE does not start a transaction, since
1075 handler::external_lock() is never called during CREATE TABLE. But
1076 CREATE TABLE ... SELECT does, since handler::external_lock() is
1077 called for the table that is being selected from. This has no
1078 practical effects currently, but must be kept in mind
1079 nevertheless.
1080
1081 Once an engine is registered, the server will do the rest
1082 of the work.
1083
1084 During statement execution, whenever any of data-modifying
1085 PSEA API methods is used, e.g. handler::write_row() or
1086 handler::update_row(), the read-write flag is raised in the
1087 statement transaction for the involved engine.
1088 Currently All PSEA calls are "traced", and the data can not be
1089 changed in a way other than issuing a PSEA call. Important:
1090 unless this invariant is preserved the server will not know that
1091 a transaction in a given engine is read-write and will not
1092 involve the two-phase commit protocol!
1093
1094 At the end of a statement, server call trans_commit_stmt is
1095 invoked. This call in turn invokes handlerton::prepare()
1096 for every involved engine. Prepare is followed by a call
1097 to handlerton::commit_one_phase() If a one-phase commit
1098 will suffice, handlerton::prepare() is not invoked and
1099 the server only calls handlerton::commit_one_phase().
1100 At statement commit, the statement-related read-write
1101 engine flag is propagated to the corresponding flag in the
1102 normal transaction. When the commit is complete, the list
1103 of registered engines is cleared.
1104
1105 Rollback is handled in a similar fashion.
1106
1107 Additional notes on DDL and the normal transaction.
1108 ---------------------------------------------------
1109
1110 DDLs and operations with non-transactional engines
1111 do not "register" in thd->transaction lists, and thus do not
1112 modify the transaction state. Besides, each DDL in
1113 MySQL is prefixed with an implicit normal transaction commit
1114 (a call to trans_commit_implicit()), and thus leaves nothing
1115 to modify.
1116 However, as it has been pointed out with CREATE TABLE .. SELECT,
1117 some DDL statements can start a *new* transaction.
1118
1119 Behaviour of the server in this case is currently badly
1120 defined.
1121 DDL statements use a form of "semantic" logging
1122 to maintain atomicity: if CREATE TABLE .. SELECT failed,
1123 the newly created table is deleted.
1124 In addition, some DDL statements issue interim transaction
1125 commits: e.g. ALTER TABLE issues a commit after data is copied
1126 from the original table to the internal temporary table. Other
1127 statements, e.g. CREATE TABLE ... SELECT do not always commit
1128 after itself.
1129 And finally there is a group of DDL statements such as
1130 RENAME/DROP TABLE that doesn't start a new transaction
1131 and doesn't commit.
1132
1133 This diversity makes it hard to say what will happen if
1134 by chance a stored function is invoked during a DDL --
1135 whether any modifications it makes will be committed or not
1136 is not clear. Fortunately, SQL grammar of few DDLs allows
1137 invocation of a stored function.
1138
1139 A consistent behaviour is perhaps to always commit the normal
1140 transaction after all DDLs, just like the statement transaction
1141 is always committed at the end of all statements.
1142 */
1143
1144 /**
1145 Register a storage engine for a transaction.
1146
1147 Every storage engine MUST call this function when it starts
1148 a transaction or a statement (that is it must be called both for the
1149 "beginning of transaction" and "beginning of statement").
1150 Only storage engines registered for the transaction/statement
1151 will know when to commit/rollback it.
1152
1153 @note
1154 trans_register_ha is idempotent - storage engine may register many
1155 times per transaction.
1156
1157 */
trans_register_ha(THD * thd,bool all,handlerton * ht_arg)1158 void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)
1159 {
1160 THD_TRANS *trans;
1161 Ha_trx_info *ha_info;
1162 DBUG_ENTER("trans_register_ha");
1163 DBUG_PRINT("enter",("%s", all ? "all" : "stmt"));
1164
1165 if (all)
1166 {
1167 trans= &thd->transaction.all;
1168 thd->server_status|= SERVER_STATUS_IN_TRANS;
1169 if (thd->tx_read_only)
1170 thd->server_status|= SERVER_STATUS_IN_TRANS_READONLY;
1171 DBUG_PRINT("info", ("setting SERVER_STATUS_IN_TRANS"));
1172 }
1173 else
1174 trans= &thd->transaction.stmt;
1175
1176 ha_info= thd->ha_data[ht_arg->slot].ha_info + (all ? 1 : 0);
1177
1178 if (ha_info->is_started())
1179 DBUG_VOID_RETURN; /* already registered, return */
1180
1181 ha_info->register_ha(trans, ht_arg);
1182
1183 trans->no_2pc|=(ht_arg->prepare==0);
1184 if (thd->transaction.xid_state.xid.is_null())
1185 thd->transaction.xid_state.xid.set(thd->query_id);
1186 DBUG_VOID_RETURN;
1187 }
1188
1189
prepare_or_error(handlerton * ht,THD * thd,bool all)1190 static int prepare_or_error(handlerton *ht, THD *thd, bool all)
1191 {
1192 int err= ht->prepare(ht, thd, all);
1193 status_var_increment(thd->status_var.ha_prepare_count);
1194 if (err)
1195 {
1196 /* avoid sending error, if we're going to replay the transaction */
1197 #ifdef WITH_WSREP
1198 if (ht != wsrep_hton ||
1199 err == EMSGSIZE || thd->wsrep_conflict_state != MUST_REPLAY)
1200 #endif
1201 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
1202 }
1203 return err;
1204 }
1205
1206
1207 /**
1208 @retval
1209 0 ok
1210 @retval
1211 1 error, transaction was rolled back
1212 */
ha_prepare(THD * thd)1213 int ha_prepare(THD *thd)
1214 {
1215 int error=0, all=1;
1216 THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
1217 Ha_trx_info *ha_info= trans->ha_list;
1218 DBUG_ENTER("ha_prepare");
1219
1220 if (ha_info)
1221 {
1222 for (; ha_info; ha_info= ha_info->next())
1223 {
1224 handlerton *ht= ha_info->ht();
1225 if (ht->prepare)
1226 {
1227 if (unlikely(prepare_or_error(ht, thd, all)))
1228 {
1229 ha_rollback_trans(thd, all);
1230 error=1;
1231 break;
1232 }
1233 }
1234 else
1235 {
1236 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
1237 ER_GET_ERRNO, ER_THD(thd, ER_GET_ERRNO),
1238 HA_ERR_WRONG_COMMAND,
1239 ha_resolve_storage_engine_name(ht));
1240
1241 }
1242 }
1243 }
1244
1245 DBUG_RETURN(error);
1246 }
1247
1248 /**
1249 Check if we can skip the two-phase commit.
1250
1251 A helper function to evaluate if two-phase commit is mandatory.
1252 As a side effect, propagates the read-only/read-write flags
1253 of the statement transaction to its enclosing normal transaction.
1254
1255 If we have at least two engines with read-write changes we must
1256 run a two-phase commit. Otherwise we can run several independent
1257 commits as the only transactional engine has read-write changes
1258 and others are read-only.
1259
1260 @retval 0 All engines are read-only.
1261 @retval 1 We have the only engine with read-write changes.
1262 @retval >1 More than one engine have read-write changes.
1263 Note: return value might NOT be the exact number of
1264 engines with read-write changes.
1265 */
1266
1267 static
1268 uint
ha_check_and_coalesce_trx_read_only(THD * thd,Ha_trx_info * ha_list,bool all)1269 ha_check_and_coalesce_trx_read_only(THD *thd, Ha_trx_info *ha_list,
1270 bool all)
1271 {
1272 /* The number of storage engines that have actual changes. */
1273 unsigned rw_ha_count= 0;
1274 Ha_trx_info *ha_info;
1275
1276 for (ha_info= ha_list; ha_info; ha_info= ha_info->next())
1277 {
1278 if (ha_info->is_trx_read_write())
1279 ++rw_ha_count;
1280
1281 if (! all)
1282 {
1283 Ha_trx_info *ha_info_all= &thd->ha_data[ha_info->ht()->slot].ha_info[1];
1284 DBUG_ASSERT(ha_info != ha_info_all);
1285 /*
1286 Merge read-only/read-write information about statement
1287 transaction to its enclosing normal transaction. Do this
1288 only if in a real transaction -- that is, if we know
1289 that ha_info_all is registered in thd->transaction.all.
1290 Since otherwise we only clutter the normal transaction flags.
1291 */
1292 if (ha_info_all->is_started()) /* FALSE if autocommit. */
1293 ha_info_all->coalesce_trx_with(ha_info);
1294 }
1295 else if (rw_ha_count > 1)
1296 {
1297 /*
1298 It is a normal transaction, so we don't need to merge read/write
1299 information up, and the need for two-phase commit has been
1300 already established. Break the loop prematurely.
1301 */
1302 break;
1303 }
1304 }
1305 return rw_ha_count;
1306 }
1307
1308
1309 /**
1310 @retval
1311 0 ok
1312 @retval
1313 1 transaction was rolled back
1314 @retval
1315 2 error during commit, data may be inconsistent
1316
1317 @todo
1318 Since we don't support nested statement transactions in 5.0,
1319 we can't commit or rollback stmt transactions while we are inside
1320 stored functions or triggers. So we simply do nothing now.
1321 TODO: This should be fixed in later ( >= 5.1) releases.
1322 */
ha_commit_trans(THD * thd,bool all)1323 int ha_commit_trans(THD *thd, bool all)
1324 {
1325 int error= 0, cookie;
1326 /*
1327 'all' means that this is either an explicit commit issued by
1328 user, or an implicit commit issued by a DDL.
1329 */
1330 THD_TRANS *trans= all ? &thd->transaction.all : &thd->transaction.stmt;
1331 /*
1332 "real" is a nick name for a transaction for which a commit will
1333 make persistent changes. E.g. a 'stmt' transaction inside an 'all'
1334 transaction is not 'real': even though it's possible to commit it,
1335 the changes are not durable as they might be rolled back if the
1336 enclosing 'all' transaction is rolled back.
1337 */
1338 bool is_real_trans= ((all || thd->transaction.all.ha_list == 0) &&
1339 !(thd->variables.option_bits & OPTION_GTID_BEGIN));
1340 Ha_trx_info *ha_info= trans->ha_list;
1341 bool need_prepare_ordered, need_commit_ordered;
1342 my_xid xid;
1343 DBUG_ENTER("ha_commit_trans");
1344 DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d",
1345 thd, (ulong) thd->variables.option_bits, all));
1346
1347 /* Just a random warning to test warnings pushed during autocommit. */
1348 DBUG_EXECUTE_IF("warn_during_ha_commit_trans",
1349 push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
1350 ER_WARNING_NOT_COMPLETE_ROLLBACK,
1351 ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK)););
1352
1353 DBUG_PRINT("info",
1354 ("all: %d thd->in_sub_stmt: %d ha_info: %p is_real_trans: %d",
1355 all, thd->in_sub_stmt, ha_info, is_real_trans));
1356 /*
1357 We must not commit the normal transaction if a statement
1358 transaction is pending. Otherwise statement transaction
1359 flags will not get propagated to its normal transaction's
1360 counterpart.
1361 */
1362 DBUG_ASSERT(thd->transaction.stmt.ha_list == NULL ||
1363 trans == &thd->transaction.stmt);
1364
1365 if (thd->in_sub_stmt)
1366 {
1367 DBUG_ASSERT(0);
1368 /*
1369 Since we don't support nested statement transactions in 5.0,
1370 we can't commit or rollback stmt transactions while we are inside
1371 stored functions or triggers. So we simply do nothing now.
1372 TODO: This should be fixed in later ( >= 5.1) releases.
1373 */
1374 if (!all)
1375 DBUG_RETURN(0);
1376 /*
1377 We assume that all statements which commit or rollback main transaction
1378 are prohibited inside of stored functions or triggers. So they should
1379 bail out with error even before ha_commit_trans() call. To be 100% safe
1380 let us throw error in non-debug builds.
1381 */
1382 my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0));
1383 DBUG_RETURN(2);
1384 }
1385
1386 #ifdef WITH_ARIA_STORAGE_ENGINE
1387 ha_maria::implicit_commit(thd, TRUE);
1388 #endif
1389
1390 if (!ha_info)
1391 {
1392 /*
1393 Free resources and perform other cleanup even for 'empty' transactions.
1394 */
1395 if (is_real_trans)
1396 thd->transaction.cleanup();
1397 DBUG_RETURN(0);
1398 }
1399
1400 DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE(););
1401
1402 /* Close all cursors that can not survive COMMIT */
1403 if (is_real_trans) /* not a statement commit */
1404 thd->stmt_map.close_transient_cursors();
1405
1406 uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
1407 /* rw_trans is TRUE when we in a transaction changing data */
1408 bool rw_trans= is_real_trans &&
1409 (rw_ha_count > (thd->is_current_stmt_binlog_disabled()?0U:1U));
1410 MDL_request mdl_request;
1411 DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
1412 is_real_trans, rw_trans, rw_ha_count));
1413
1414 if (rw_trans)
1415 {
1416 /*
1417 Acquire a metadata lock which will ensure that COMMIT is blocked
1418 by an active FLUSH TABLES WITH READ LOCK (and vice versa:
1419 COMMIT in progress blocks FTWRL).
1420
1421 We allow the owner of FTWRL to COMMIT; we assume that it knows
1422 what it does.
1423 */
1424 mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
1425 MDL_EXPLICIT);
1426
1427 if (!WSREP(thd) &&
1428 thd->mdl_context.acquire_lock(&mdl_request,
1429 thd->variables.lock_wait_timeout))
1430 {
1431 ha_rollback_trans(thd, all);
1432 DBUG_RETURN(1);
1433 }
1434
1435 DEBUG_SYNC(thd, "ha_commit_trans_after_acquire_commit_lock");
1436 }
1437
1438 if (rw_trans &&
1439 opt_readonly &&
1440 !(thd->security_ctx->master_access & SUPER_ACL) &&
1441 !thd->slave_thread)
1442 {
1443 my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
1444 goto err;
1445 }
1446
1447 #if 1 // FIXME: This should be done in ha_prepare().
1448 if (rw_trans || (thd->lex->sql_command == SQLCOM_ALTER_TABLE &&
1449 thd->lex->alter_info.flags & ALTER_ADD_SYSTEM_VERSIONING &&
1450 is_real_trans))
1451 {
1452 ulonglong trx_start_id= 0, trx_end_id= 0;
1453 for (Ha_trx_info *ha_info= trans->ha_list; ha_info; ha_info= ha_info->next())
1454 {
1455 if (ha_info->ht()->prepare_commit_versioned)
1456 {
1457 trx_end_id= ha_info->ht()->prepare_commit_versioned(thd, &trx_start_id);
1458 if (trx_end_id)
1459 break; // FIXME: use a common ID for cross-engine transactions
1460 }
1461 }
1462
1463 if (trx_end_id)
1464 {
1465 if (!TR_table::use_transaction_registry)
1466 {
1467 my_error(ER_VERS_TRT_IS_DISABLED, MYF(0));
1468 goto err;
1469 }
1470 DBUG_ASSERT(trx_start_id);
1471 TR_table trt(thd, true);
1472 if (trt.update(trx_start_id, trx_end_id))
1473 goto err;
1474 // Here, the call will not commit inside InnoDB. It is only working
1475 // around closing thd->transaction.stmt open by TR_table::open().
1476 if (all)
1477 commit_one_phase_2(thd, false, &thd->transaction.stmt, false);
1478 }
1479 }
1480 #endif
1481
1482 if (trans->no_2pc || (rw_ha_count <= 1))
1483 {
1484 error= ha_commit_one_phase(thd, all);
1485 goto done;
1486 }
1487
1488 need_prepare_ordered= FALSE;
1489 need_commit_ordered= FALSE;
1490 xid= thd->transaction.xid_state.xid.get_my_xid();
1491
1492 for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
1493 {
1494 handlerton *ht= hi->ht();
1495 /*
1496 Do not call two-phase commit if this particular
1497 transaction is read-only. This allows for simpler
1498 implementation in engines that are always read-only.
1499 */
1500 if (! hi->is_trx_read_write())
1501 continue;
1502 /*
1503 Sic: we know that prepare() is not NULL since otherwise
1504 trans->no_2pc would have been set.
1505 */
1506 if (unlikely(prepare_or_error(ht, thd, all)))
1507 goto err;
1508
1509 need_prepare_ordered|= (ht->prepare_ordered != NULL);
1510 need_commit_ordered|= (ht->commit_ordered != NULL);
1511 }
1512 DEBUG_SYNC(thd, "ha_commit_trans_after_prepare");
1513 DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
1514
1515 #ifdef WITH_WSREP
1516 if (!error && WSREP_ON && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid))
1517 {
1518 // xid was rewritten by wsrep
1519 xid= wsrep_xid_seqno(thd->transaction.xid_state.xid);
1520 }
1521 #endif /* WITH_WSREP */
1522
1523 if (!is_real_trans)
1524 {
1525 error= commit_one_phase_2(thd, all, trans, is_real_trans);
1526 goto done;
1527 }
1528
1529 DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
1530 cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
1531 need_commit_ordered);
1532 if (!cookie)
1533 goto err;
1534
1535 DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order");
1536 DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
1537
1538 error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
1539
1540 DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
1541 if (tc_log->unlog(cookie, xid))
1542 {
1543 error= 2; /* Error during commit */
1544 goto end;
1545 }
1546
1547 done:
1548 DBUG_EXECUTE_IF("crash_commit_after", DBUG_SUICIDE(););
1549
1550 mysql_mutex_assert_not_owner(&LOCK_prepare_ordered);
1551 mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
1552 mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
1553 mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
1554 #ifdef HAVE_REPLICATION
1555 repl_semisync_master.wait_after_commit(thd, all);
1556 DEBUG_SYNC(thd, "after_group_after_commit");
1557 #endif
1558 goto end;
1559
1560 /* Come here if error and we need to rollback. */
1561 err:
1562 error= 1; /* Transaction was rolled back */
1563 /*
1564 In parallel replication, rollback is delayed, as there is extra replication
1565 book-keeping to be done before rolling back and allowing a conflicting
1566 transaction to continue (MDEV-7458).
1567 */
1568 if (!(thd->rgi_slave && thd->rgi_slave->is_parallel_exec))
1569 ha_rollback_trans(thd, all);
1570
1571 end:
1572 if (rw_trans && mdl_request.ticket)
1573 {
1574 /*
1575 We do not always immediately release transactional locks
1576 after ha_commit_trans() (see uses of ha_enable_transaction()),
1577 thus we release the commit blocker lock as soon as it's
1578 not needed.
1579 */
1580 thd->mdl_context.release_lock(mdl_request.ticket);
1581 }
1582 DBUG_RETURN(error);
1583 }
1584
1585 /**
1586 @note
1587 This function does not care about global read lock. A caller should.
1588
1589 @param[in] all Is set in case of explicit commit
1590 (COMMIT statement), or implicit commit
1591 issued by DDL. Is not set when called
1592 at the end of statement, even if
1593 autocommit=1.
1594 */
1595
ha_commit_one_phase(THD * thd,bool all)1596 int ha_commit_one_phase(THD *thd, bool all)
1597 {
1598 THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
1599 /*
1600 "real" is a nick name for a transaction for which a commit will
1601 make persistent changes. E.g. a 'stmt' transaction inside a 'all'
1602 transaction is not 'real': even though it's possible to commit it,
1603 the changes are not durable as they might be rolled back if the
1604 enclosing 'all' transaction is rolled back.
1605 We establish the value of 'is_real_trans' by checking
1606 if it's an explicit COMMIT/BEGIN statement, or implicit
1607 commit issued by DDL (all == TRUE), or if we're running
1608 in autocommit mode (it's only in the autocommit mode
1609 ha_commit_one_phase() can be called with an empty
1610 transaction.all.ha_list, see why in trans_register_ha()).
1611 */
1612 bool is_real_trans= ((all || thd->transaction.all.ha_list == 0) &&
1613 !(thd->variables.option_bits & OPTION_GTID_BEGIN));
1614 int res;
1615 DBUG_ENTER("ha_commit_one_phase");
1616 if (is_real_trans)
1617 {
1618 DEBUG_SYNC(thd, "ha_commit_one_phase");
1619 if ((res= thd->wait_for_prior_commit()))
1620 DBUG_RETURN(res);
1621 }
1622 res= commit_one_phase_2(thd, all, trans, is_real_trans);
1623 DBUG_RETURN(res);
1624 }
1625
1626
1627 static int
commit_one_phase_2(THD * thd,bool all,THD_TRANS * trans,bool is_real_trans)1628 commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
1629 {
1630 int error= 0;
1631 uint count= 0;
1632 Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
1633 DBUG_ENTER("commit_one_phase_2");
1634 if (is_real_trans)
1635 DEBUG_SYNC(thd, "commit_one_phase_2");
1636 if (ha_info)
1637 {
1638 for (; ha_info; ha_info= ha_info_next)
1639 {
1640 int err;
1641 handlerton *ht= ha_info->ht();
1642 if ((err= ht->commit(ht, thd, all)))
1643 {
1644 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
1645 error=1;
1646 }
1647 /* Should this be done only if is_real_trans is set ? */
1648 status_var_increment(thd->status_var.ha_commit_count);
1649 if (is_real_trans && ht != binlog_hton && ha_info->is_trx_read_write())
1650 ++count;
1651 ha_info_next= ha_info->next();
1652 ha_info->reset(); /* keep it conveniently zero-filled */
1653 }
1654 trans->ha_list= 0;
1655 trans->no_2pc=0;
1656 if (all)
1657 {
1658 #ifdef HAVE_QUERY_CACHE
1659 if (thd->transaction.changed_tables)
1660 query_cache.invalidate(thd, thd->transaction.changed_tables);
1661 #endif
1662 }
1663 }
1664 /* Free resources and perform other cleanup even for 'empty' transactions. */
1665 if (is_real_trans)
1666 {
1667 thd->has_waiter= false;
1668 thd->transaction.cleanup();
1669 if (count >= 2)
1670 statistic_increment(transactions_multi_engine, LOCK_status);
1671 }
1672
1673 DBUG_RETURN(error);
1674 }
1675
1676
ha_rollback_trans(THD * thd,bool all)1677 int ha_rollback_trans(THD *thd, bool all)
1678 {
1679 int error=0;
1680 THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
1681 Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
1682 /*
1683 "real" is a nick name for a transaction for which a commit will
1684 make persistent changes. E.g. a 'stmt' transaction inside a 'all'
1685 transaction is not 'real': even though it's possible to commit it,
1686 the changes are not durable as they might be rolled back if the
1687 enclosing 'all' transaction is rolled back.
1688 We establish the value of 'is_real_trans' by checking
1689 if it's an explicit COMMIT or BEGIN statement, or implicit
1690 commit issued by DDL (in these cases all == TRUE),
1691 or if we're running in autocommit mode (it's only in the autocommit mode
1692 ha_commit_one_phase() is called with an empty
1693 transaction.all.ha_list, see why in trans_register_ha()).
1694 */
1695 bool is_real_trans=all || thd->transaction.all.ha_list == 0;
1696 DBUG_ENTER("ha_rollback_trans");
1697
1698 /*
1699 We must not rollback the normal transaction if a statement
1700 transaction is pending.
1701 */
1702 DBUG_ASSERT(thd->transaction.stmt.ha_list == NULL ||
1703 trans == &thd->transaction.stmt);
1704
1705 #ifdef HAVE_REPLICATION
1706 if (is_real_trans)
1707 {
1708 /*
1709 In parallel replication, if we need to rollback during commit, we must
1710 first inform following transactions that we are going to abort our commit
1711 attempt. Otherwise those following transactions can run too early, and
1712 possibly cause replication to fail. See comments in retry_event_group().
1713
1714 There were several bugs with this in the past that were very hard to
1715 track down (MDEV-7458, MDEV-8302). So we add here an assertion for
1716 rollback without signalling following transactions. And in release
1717 builds, we explicitly do the signalling before rolling back.
1718 */
1719 DBUG_ASSERT(!(thd->rgi_slave && thd->rgi_slave->did_mark_start_commit));
1720 if (thd->rgi_slave && thd->rgi_slave->did_mark_start_commit)
1721 thd->rgi_slave->unmark_start_commit();
1722 }
1723 #endif
1724
1725 if (thd->in_sub_stmt)
1726 {
1727 DBUG_ASSERT(0);
1728 /*
1729 If we are inside stored function or trigger we should not commit or
1730 rollback current statement transaction. See comment in ha_commit_trans()
1731 call for more information.
1732 */
1733 if (!all)
1734 DBUG_RETURN(0);
1735 my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0));
1736 DBUG_RETURN(1);
1737 }
1738
1739 if (ha_info)
1740 {
1741 /* Close all cursors that can not survive ROLLBACK */
1742 if (is_real_trans) /* not a statement commit */
1743 thd->stmt_map.close_transient_cursors();
1744
1745 for (; ha_info; ha_info= ha_info_next)
1746 {
1747 int err;
1748 handlerton *ht= ha_info->ht();
1749 if ((err= ht->rollback(ht, thd, all)))
1750 { // cannot happen
1751 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
1752 error=1;
1753 #ifdef WITH_WSREP
1754 WSREP_WARN("handlerton rollback failed, thd %llu %lld conf %d SQL %s",
1755 thd->thread_id, thd->query_id, thd->wsrep_conflict_state,
1756 thd->query());
1757 #endif /* WITH_WSREP */
1758 }
1759 status_var_increment(thd->status_var.ha_rollback_count);
1760 ha_info_next= ha_info->next();
1761 ha_info->reset(); /* keep it conveniently zero-filled */
1762 }
1763 trans->ha_list= 0;
1764 trans->no_2pc=0;
1765 }
1766
1767 /*
1768 Thanks to possibility of MDL deadlock rollback request can come even if
1769 transaction hasn't been started in any transactional storage engine.
1770 */
1771 if (is_real_trans && thd->transaction_rollback_request &&
1772 thd->transaction.xid_state.xa_state != XA_NOTR)
1773 thd->transaction.xid_state.rm_error= thd->get_stmt_da()->sql_errno();
1774
1775 /* Always cleanup. Even if nht==0. There may be savepoints. */
1776 if (is_real_trans)
1777 {
1778 thd->has_waiter= false;
1779 thd->transaction.cleanup();
1780 }
1781 if (all)
1782 thd->transaction_rollback_request= FALSE;
1783
1784 /*
1785 If a non-transactional table was updated, warn; don't warn if this is a
1786 slave thread (because when a slave thread executes a ROLLBACK, it has
1787 been read from the binary log, so it's 100% sure and normal to produce
1788 error ER_WARNING_NOT_COMPLETE_ROLLBACK. If we sent the warning to the
1789 slave SQL thread, it would not stop the thread but just be printed in
1790 the error log; but we don't want users to wonder why they have this
1791 message in the error log, so we don't send it.
1792
1793 We don't have to test for thd->killed == KILL_SYSTEM_THREAD as
1794 it doesn't matter if a warning is pushed to a system thread or not:
1795 No one will see it...
1796 */
1797 if (is_real_trans && thd->transaction.all.modified_non_trans_table &&
1798 !thd->slave_thread && thd->killed < KILL_CONNECTION)
1799 push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
1800 ER_WARNING_NOT_COMPLETE_ROLLBACK,
1801 ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
1802 #ifdef HAVE_REPLICATION
1803 repl_semisync_master.wait_after_rollback(thd, all);
1804 #endif
1805 DBUG_RETURN(error);
1806 }
1807
1808
1809 struct xahton_st {
1810 XID *xid;
1811 int result;
1812 };
1813
xacommit_handlerton(THD * unused1,plugin_ref plugin,void * arg)1814 static my_bool xacommit_handlerton(THD *unused1, plugin_ref plugin,
1815 void *arg)
1816 {
1817 handlerton *hton= plugin_hton(plugin);
1818 if (hton->state == SHOW_OPTION_YES && hton->recover)
1819 {
1820 hton->commit_by_xid(hton, ((struct xahton_st *)arg)->xid);
1821 ((struct xahton_st *)arg)->result= 0;
1822 }
1823 return FALSE;
1824 }
1825
xarollback_handlerton(THD * unused1,plugin_ref plugin,void * arg)1826 static my_bool xarollback_handlerton(THD *unused1, plugin_ref plugin,
1827 void *arg)
1828 {
1829 handlerton *hton= plugin_hton(plugin);
1830 if (hton->state == SHOW_OPTION_YES && hton->recover)
1831 {
1832 hton->rollback_by_xid(hton, ((struct xahton_st *)arg)->xid);
1833 ((struct xahton_st *)arg)->result= 0;
1834 }
1835 return FALSE;
1836 }
1837
1838
ha_commit_or_rollback_by_xid(XID * xid,bool commit)1839 int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
1840 {
1841 struct xahton_st xaop;
1842 xaop.xid= xid;
1843 xaop.result= 1;
1844
1845 plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
1846 MYSQL_STORAGE_ENGINE_PLUGIN, &xaop);
1847
1848 return xaop.result;
1849 }
1850
1851
1852 #ifndef DBUG_OFF
1853 /** Converts XID to string.
1854
1855 @param[out] buf output buffer
1856 @param[in] xid XID to convert
1857
1858 @return pointer to converted string
1859
1860 @note This does not need to be multi-byte safe or anything */
xid_to_str(char * buf,const XID & xid)1861 char *xid_to_str(char *buf, const XID &xid)
1862 {
1863 int i;
1864 char *s=buf;
1865 *s++='\'';
1866 for (i= 0; i < xid.gtrid_length + xid.bqual_length; i++)
1867 {
1868 uchar c= (uchar) xid.data[i];
1869 /* is_next_dig is set if next character is a number */
1870 bool is_next_dig= FALSE;
1871 if (i < XIDDATASIZE)
1872 {
1873 char ch= xid.data[i + 1];
1874 is_next_dig= (ch >= '0' && ch <='9');
1875 }
1876 if (i == xid.gtrid_length)
1877 {
1878 *s++='\'';
1879 if (xid.bqual_length)
1880 {
1881 *s++='.';
1882 *s++='\'';
1883 }
1884 }
1885 if (c < 32 || c > 126)
1886 {
1887 *s++='\\';
1888 /*
1889 If next character is a number, write current character with
1890 3 octal numbers to ensure that the next number is not seen
1891 as part of the octal number
1892 */
1893 if (c > 077 || is_next_dig)
1894 *s++=_dig_vec_lower[c >> 6];
1895 if (c > 007 || is_next_dig)
1896 *s++=_dig_vec_lower[(c >> 3) & 7];
1897 *s++=_dig_vec_lower[c & 7];
1898 }
1899 else
1900 {
1901 if (c == '\'' || c == '\\')
1902 *s++='\\';
1903 *s++=c;
1904 }
1905 }
1906 *s++='\'';
1907 *s=0;
1908 return buf;
1909 }
1910 #endif
1911
1912 #ifdef WITH_WSREP
wsrep_order_and_check_continuity(XID * list,int len)1913 static my_xid wsrep_order_and_check_continuity(XID *list, int len)
1914 {
1915 wsrep_sort_xid_array(list, len);
1916 wsrep_uuid_t uuid;
1917 wsrep_seqno_t seqno;
1918 if (wsrep_get_SE_checkpoint(uuid, seqno))
1919 {
1920 WSREP_ERROR("Could not read wsrep SE checkpoint for recovery");
1921 return 0;
1922 }
1923 long long cur_seqno= seqno;
1924 for (int i= 0; i < len; ++i)
1925 {
1926 if (!wsrep_is_wsrep_xid(list + i) ||
1927 wsrep_xid_seqno(*(list + i)) != cur_seqno + 1)
1928 {
1929 WSREP_WARN("Discovered discontinuity in recovered wsrep "
1930 "transaction XIDs. Truncating the recovery list to "
1931 "%d entries", i);
1932 break;
1933 }
1934 ++cur_seqno;
1935 }
1936 WSREP_INFO("Last wsrep seqno to be recovered %lld", cur_seqno);
1937 return (cur_seqno < 0 ? 0 : cur_seqno);
1938 }
1939 #endif /* WITH_WSREP */
1940
1941 /**
1942 recover() step of xa.
1943
1944 @note
1945 there are three modes of operation:
1946 - automatic recover after a crash
1947 in this case commit_list != 0, tc_heuristic_recover==0
1948 all xids from commit_list are committed, others are rolled back
1949 - manual (heuristic) recover
1950 in this case commit_list==0, tc_heuristic_recover != 0
1951 DBA has explicitly specified that all prepared transactions should
1952 be committed (or rolled back).
1953 - no recovery (MySQL did not detect a crash)
1954 in this case commit_list==0, tc_heuristic_recover == 0
1955 there should be no prepared transactions in this case.
1956 */
1957 struct xarecover_st
1958 {
1959 int len, found_foreign_xids, found_my_xids;
1960 XID *list;
1961 HASH *commit_list;
1962 bool dry_run;
1963 };
1964
xarecover_handlerton(THD * unused,plugin_ref plugin,void * arg)1965 static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
1966 void *arg)
1967 {
1968 handlerton *hton= plugin_hton(plugin);
1969 struct xarecover_st *info= (struct xarecover_st *) arg;
1970 int got;
1971
1972 if (hton->state == SHOW_OPTION_YES && hton->recover)
1973 {
1974 while ((got= hton->recover(hton, info->list, info->len)) > 0 )
1975 {
1976 sql_print_information("Found %d prepared transaction(s) in %s",
1977 got, hton_name(hton)->str);
1978 #ifdef WITH_WSREP
1979 /* If wsrep_on=ON, XIDs are first ordered and then the range of
1980 recovered XIDs is checked for continuity. All the XIDs which
1981 are in continuous range can be safely committed if binlog
1982 is off since they have already ordered and certified in the
1983 cluster. */
1984 my_xid wsrep_limit= 0;
1985 if (WSREP_ON)
1986 {
1987 wsrep_limit= wsrep_order_and_check_continuity(info->list, got);
1988 }
1989 #endif /* WITH_WSREP */
1990
1991 for (int i=0; i < got; i ++)
1992 {
1993 my_xid x= IF_WSREP(WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ?
1994 wsrep_xid_seqno(info->list[i]) :
1995 info->list[i].get_my_xid(),
1996 info->list[i].get_my_xid());
1997 if (!x) // not "mine" - that is generated by external TM
1998 {
1999 #ifndef DBUG_OFF
2000 char buf[XIDDATASIZE*4+6]; // see xid_to_str
2001 DBUG_PRINT("info",
2002 ("ignore xid %s", xid_to_str(buf, info->list[i])));
2003 #endif
2004 xid_cache_insert(info->list+i, XA_PREPARED);
2005 info->found_foreign_xids++;
2006 continue;
2007 }
2008 if (IF_WSREP(!(wsrep_emulate_bin_log &&
2009 wsrep_is_wsrep_xid(info->list + i) &&
2010 x <= wsrep_limit) && info->dry_run,
2011 info->dry_run))
2012 {
2013 info->found_my_xids++;
2014 continue;
2015 }
2016 // recovery mode
2017 if (IF_WSREP((wsrep_emulate_bin_log &&
2018 wsrep_is_wsrep_xid(info->list + i) &&
2019 x <= wsrep_limit), false) ||
2020 (info->commit_list ?
2021 my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
2022 tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT))
2023 {
2024 #ifndef DBUG_OFF
2025 int rc=
2026 #endif
2027 hton->commit_by_xid(hton, info->list+i);
2028 #ifndef DBUG_OFF
2029 if (rc == 0)
2030 {
2031 char buf[XIDDATASIZE*4+6]; // see xid_to_str
2032 DBUG_PRINT("info",
2033 ("commit xid %s", xid_to_str(buf, info->list[i])));
2034 }
2035 #endif
2036 }
2037 else
2038 {
2039 #ifndef DBUG_OFF
2040 int rc=
2041 #endif
2042 hton->rollback_by_xid(hton, info->list+i);
2043 #ifndef DBUG_OFF
2044 if (rc == 0)
2045 {
2046 char buf[XIDDATASIZE*4+6]; // see xid_to_str
2047 DBUG_PRINT("info",
2048 ("rollback xid %s", xid_to_str(buf, info->list[i])));
2049 }
2050 #endif
2051 }
2052 }
2053 if (got < info->len)
2054 break;
2055 }
2056 }
2057 return FALSE;
2058 }
2059
ha_recover(HASH * commit_list)2060 int ha_recover(HASH *commit_list)
2061 {
2062 struct xarecover_st info;
2063 DBUG_ENTER("ha_recover");
2064 info.found_foreign_xids= info.found_my_xids= 0;
2065 info.commit_list= commit_list;
2066 info.dry_run= (info.commit_list==0 && tc_heuristic_recover==0);
2067 info.list= NULL;
2068
2069 /* commit_list and tc_heuristic_recover cannot be set both */
2070 DBUG_ASSERT(info.commit_list==0 || tc_heuristic_recover==0);
2071 /* if either is set, total_ha_2pc must be set too */
2072 DBUG_ASSERT(info.dry_run ||
2073 (failed_ha_2pc + total_ha_2pc) > (ulong)opt_bin_log);
2074
2075 if (total_ha_2pc <= (ulong)opt_bin_log)
2076 DBUG_RETURN(0);
2077
2078 if (info.commit_list)
2079 sql_print_information("Starting crash recovery...");
2080
2081 for (info.len= MAX_XID_LIST_SIZE ;
2082 info.list==0 && info.len > MIN_XID_LIST_SIZE; info.len/=2)
2083 {
2084 DBUG_EXECUTE_IF("min_xa_len", info.len = 16;);
2085 info.list=(XID *)my_malloc(info.len*sizeof(XID), MYF(0));
2086 }
2087 if (!info.list)
2088 {
2089 sql_print_error(ER(ER_OUTOFMEMORY),
2090 static_cast<int>(info.len*sizeof(XID)));
2091 DBUG_RETURN(1);
2092 }
2093
2094 plugin_foreach(NULL, xarecover_handlerton,
2095 MYSQL_STORAGE_ENGINE_PLUGIN, &info);
2096
2097 my_free(info.list);
2098 if (info.found_foreign_xids)
2099 sql_print_warning("Found %d prepared XA transactions",
2100 info.found_foreign_xids);
2101 if (info.dry_run && info.found_my_xids)
2102 {
2103 sql_print_error("Found %d prepared transactions! It means that mysqld was "
2104 "not shut down properly last time and critical recovery "
2105 "information (last binlog or %s file) was manually deleted "
2106 "after a crash. You have to start mysqld with "
2107 "--tc-heuristic-recover switch to commit or rollback "
2108 "pending transactions.",
2109 info.found_my_xids, opt_tc_log_file);
2110 DBUG_RETURN(1);
2111 }
2112 if (info.commit_list)
2113 sql_print_information("Crash recovery finished.");
2114 DBUG_RETURN(0);
2115 }
2116
2117 /**
2118 return the XID as it appears in the SQL function's arguments.
2119 So this string can be passed to XA START, XA PREPARE etc...
2120
2121 @note
2122 the 'buf' has to have space for at least SQL_XIDSIZE bytes.
2123 */
2124
2125
2126 /*
2127 'a'..'z' 'A'..'Z', '0'..'9'
2128 and '-' '_' ' ' symbols don't have to be
2129 converted.
2130 */
2131
2132 static const char xid_needs_conv[128]=
2133 {
2134 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
2135 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
2136 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
2137 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
2138 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
2139 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
2140 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
2141 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
2142 };
2143
get_sql_xid(XID * xid,char * buf)2144 uint get_sql_xid(XID *xid, char *buf)
2145 {
2146 int tot_len= xid->gtrid_length + xid->bqual_length;
2147 int i;
2148 const char *orig_buf= buf;
2149
2150 for (i=0; i<tot_len; i++)
2151 {
2152 uchar c= ((uchar *) xid->data)[i];
2153 if (c >= 128 || xid_needs_conv[c])
2154 break;
2155 }
2156
2157 if (i >= tot_len)
2158 {
2159 /* No need to convert characters to hexadecimals. */
2160 *buf++= '\'';
2161 memcpy(buf, xid->data, xid->gtrid_length);
2162 buf+= xid->gtrid_length;
2163 *buf++= '\'';
2164 if (xid->bqual_length > 0 || xid->formatID != 1)
2165 {
2166 *buf++= ',';
2167 *buf++= '\'';
2168 memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
2169 buf+= xid->bqual_length;
2170 *buf++= '\'';
2171 }
2172 }
2173 else
2174 {
2175 *buf++= 'X';
2176 *buf++= '\'';
2177 for (i= 0; i < xid->gtrid_length; i++)
2178 {
2179 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
2180 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
2181 }
2182 *buf++= '\'';
2183 if (xid->bqual_length > 0 || xid->formatID != 1)
2184 {
2185 *buf++= ',';
2186 *buf++= 'X';
2187 *buf++= '\'';
2188 for (; i < tot_len; i++)
2189 {
2190 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
2191 *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
2192 }
2193 *buf++= '\'';
2194 }
2195 }
2196
2197 if (xid->formatID != 1)
2198 {
2199 *buf++= ',';
2200 buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
2201 MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
2202 }
2203
2204 return (uint)(buf - orig_buf);
2205 }
2206
2207
2208 /**
2209 return the list of XID's to a client, the same way SHOW commands do.
2210
2211 @note
2212 I didn't find in XA specs that an RM cannot return the same XID twice,
2213 so mysql_xa_recover does not filter XID's to ensure uniqueness.
2214 It can be easily fixed later, if necessary.
2215 */
2216
xa_recover_callback(XID_STATE * xs,Protocol * protocol,char * data,uint data_len,CHARSET_INFO * data_cs)2217 static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
2218 char *data, uint data_len, CHARSET_INFO *data_cs)
2219 {
2220 if (xs->xa_state == XA_PREPARED)
2221 {
2222 protocol->prepare_for_resend();
2223 protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
2224 protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
2225 protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
2226 protocol->store(data, data_len, data_cs);
2227 if (protocol->write())
2228 return TRUE;
2229 }
2230 return FALSE;
2231 }
2232
2233
xa_recover_callback_short(XID_STATE * xs,Protocol * protocol)2234 static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
2235 {
2236 return xa_recover_callback(xs, protocol, xs->xid.data,
2237 xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
2238 }
2239
2240
xa_recover_callback_verbose(XID_STATE * xs,Protocol * protocol)2241 static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
2242 {
2243 char buf[SQL_XIDSIZE];
2244 uint len= get_sql_xid(&xs->xid, buf);
2245 return xa_recover_callback(xs, protocol, buf, len,
2246 &my_charset_utf8_general_ci);
2247 }
2248
2249
mysql_xa_recover(THD * thd)2250 bool mysql_xa_recover(THD *thd)
2251 {
2252 List<Item> field_list;
2253 Protocol *protocol= thd->protocol;
2254 MEM_ROOT *mem_root= thd->mem_root;
2255 my_hash_walk_action action;
2256 DBUG_ENTER("mysql_xa_recover");
2257
2258 field_list.push_back(new (mem_root)
2259 Item_int(thd, "formatID", 0,
2260 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
2261 field_list.push_back(new (mem_root)
2262 Item_int(thd, "gtrid_length", 0,
2263 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
2264 field_list.push_back(new (mem_root)
2265 Item_int(thd, "bqual_length", 0,
2266 MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
2267 {
2268 uint len;
2269 CHARSET_INFO *cs;
2270
2271 if (thd->lex->verbose)
2272 {
2273 len= SQL_XIDSIZE;
2274 cs= &my_charset_utf8_general_ci;
2275 action= (my_hash_walk_action) xa_recover_callback_verbose;
2276 }
2277 else
2278 {
2279 len= XIDDATASIZE;
2280 cs= &my_charset_bin;
2281 action= (my_hash_walk_action) xa_recover_callback_short;
2282 }
2283
2284 field_list.push_back(new (mem_root)
2285 Item_empty_string(thd, "data", len, cs), mem_root);
2286 }
2287
2288 if (protocol->send_result_set_metadata(&field_list,
2289 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
2290 DBUG_RETURN(1);
2291
2292 if (xid_cache_iterate(thd, action, protocol))
2293 DBUG_RETURN(1);
2294 my_eof(thd);
2295 DBUG_RETURN(0);
2296 }
2297
2298 /*
2299 Called by engine to notify TC that a new commit checkpoint has been reached.
2300 See comments on handlerton method commit_checkpoint_request() for details.
2301 */
2302 void
commit_checkpoint_notify_ha(handlerton * hton,void * cookie)2303 commit_checkpoint_notify_ha(handlerton *hton, void *cookie)
2304 {
2305 tc_log->commit_checkpoint_notify(cookie);
2306 }
2307
2308
2309 /**
2310 Check if all storage engines used in transaction agree that after
2311 rollback to savepoint it is safe to release MDL locks acquired after
2312 savepoint creation.
2313
2314 @param thd The client thread that executes the transaction.
2315
2316 @return true - It is safe to release MDL locks.
2317 false - If it is not.
2318 */
ha_rollback_to_savepoint_can_release_mdl(THD * thd)2319 bool ha_rollback_to_savepoint_can_release_mdl(THD *thd)
2320 {
2321 Ha_trx_info *ha_info;
2322 THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
2323 &thd->transaction.all);
2324
2325 DBUG_ENTER("ha_rollback_to_savepoint_can_release_mdl");
2326
2327 /**
2328 Checking whether it is safe to release metadata locks after rollback to
2329 savepoint in all the storage engines that are part of the transaction.
2330 */
2331 for (ha_info= trans->ha_list; ha_info; ha_info= ha_info->next())
2332 {
2333 handlerton *ht= ha_info->ht();
2334 DBUG_ASSERT(ht);
2335
2336 if (ht->savepoint_rollback_can_release_mdl == 0 ||
2337 ht->savepoint_rollback_can_release_mdl(ht, thd) == false)
2338 DBUG_RETURN(false);
2339 }
2340
2341 DBUG_RETURN(true);
2342 }
2343
ha_rollback_to_savepoint(THD * thd,SAVEPOINT * sv)2344 int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
2345 {
2346 int error=0;
2347 THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
2348 &thd->transaction.all);
2349 Ha_trx_info *ha_info, *ha_info_next;
2350
2351 DBUG_ENTER("ha_rollback_to_savepoint");
2352
2353 trans->no_2pc=0;
2354 /*
2355 rolling back to savepoint in all storage engines that were part of the
2356 transaction when the savepoint was set
2357 */
2358 for (ha_info= sv->ha_list; ha_info; ha_info= ha_info->next())
2359 {
2360 int err;
2361 handlerton *ht= ha_info->ht();
2362 DBUG_ASSERT(ht);
2363 DBUG_ASSERT(ht->savepoint_set != 0);
2364 if ((err= ht->savepoint_rollback(ht, thd,
2365 (uchar *)(sv+1)+ht->savepoint_offset)))
2366 { // cannot happen
2367 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
2368 error=1;
2369 }
2370 status_var_increment(thd->status_var.ha_savepoint_rollback_count);
2371 trans->no_2pc|= ht->prepare == 0;
2372 }
2373 /*
2374 rolling back the transaction in all storage engines that were not part of
2375 the transaction when the savepoint was set
2376 */
2377 for (ha_info= trans->ha_list; ha_info != sv->ha_list;
2378 ha_info= ha_info_next)
2379 {
2380 int err;
2381 handlerton *ht= ha_info->ht();
2382 if ((err= ht->rollback(ht, thd, !thd->in_sub_stmt)))
2383 { // cannot happen
2384 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
2385 error=1;
2386 }
2387 status_var_increment(thd->status_var.ha_rollback_count);
2388 ha_info_next= ha_info->next();
2389 ha_info->reset(); /* keep it conveniently zero-filled */
2390 }
2391 trans->ha_list= sv->ha_list;
2392 DBUG_RETURN(error);
2393 }
2394
2395 /**
2396 @note
2397 according to the sql standard (ISO/IEC 9075-2:2003)
2398 section "4.33.4 SQL-statements and transaction states",
2399 SAVEPOINT is *not* transaction-initiating SQL-statement
2400 */
ha_savepoint(THD * thd,SAVEPOINT * sv)2401 int ha_savepoint(THD *thd, SAVEPOINT *sv)
2402 {
2403 int error=0;
2404 THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
2405 &thd->transaction.all);
2406 Ha_trx_info *ha_info= trans->ha_list;
2407 DBUG_ENTER("ha_savepoint");
2408
2409 for (; ha_info; ha_info= ha_info->next())
2410 {
2411 int err;
2412 handlerton *ht= ha_info->ht();
2413 DBUG_ASSERT(ht);
2414 if (! ht->savepoint_set)
2415 {
2416 my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "SAVEPOINT");
2417 error=1;
2418 break;
2419 }
2420 if ((err= ht->savepoint_set(ht, thd, (uchar *)(sv+1)+ht->savepoint_offset)))
2421 { // cannot happen
2422 my_error(ER_GET_ERRNO, MYF(0), err, hton_name(ht)->str);
2423 error=1;
2424 }
2425 status_var_increment(thd->status_var.ha_savepoint_count);
2426 }
2427 /*
2428 Remember the list of registered storage engines. All new
2429 engines are prepended to the beginning of the list.
2430 */
2431 sv->ha_list= trans->ha_list;
2432
2433 DBUG_RETURN(error);
2434 }
2435
ha_release_savepoint(THD * thd,SAVEPOINT * sv)2436 int ha_release_savepoint(THD *thd, SAVEPOINT *sv)
2437 {
2438 int error=0;
2439 Ha_trx_info *ha_info= sv->ha_list;
2440 DBUG_ENTER("ha_release_savepoint");
2441
2442 for (; ha_info; ha_info= ha_info->next())
2443 {
2444 int err;
2445 handlerton *ht= ha_info->ht();
2446 /* Savepoint life time is enclosed into transaction life time. */
2447 DBUG_ASSERT(ht);
2448 if (!ht->savepoint_release)
2449 continue;
2450 if ((err= ht->savepoint_release(ht, thd,
2451 (uchar *)(sv+1) + ht->savepoint_offset)))
2452 { // cannot happen
2453 my_error(ER_GET_ERRNO, MYF(0), err, hton_name(ht)->str);
2454 error=1;
2455 }
2456 }
2457 DBUG_RETURN(error);
2458 }
2459
2460
snapshot_handlerton(THD * thd,plugin_ref plugin,void * arg)2461 static my_bool snapshot_handlerton(THD *thd, plugin_ref plugin,
2462 void *arg)
2463 {
2464 handlerton *hton= plugin_hton(plugin);
2465 if (hton->state == SHOW_OPTION_YES &&
2466 hton->start_consistent_snapshot)
2467 {
2468 if (hton->start_consistent_snapshot(hton, thd))
2469 return TRUE;
2470 *((bool *)arg)= false;
2471 }
2472 return FALSE;
2473 }
2474
ha_start_consistent_snapshot(THD * thd)2475 int ha_start_consistent_snapshot(THD *thd)
2476 {
2477 bool err, warn= true;
2478
2479 /*
2480 Holding the LOCK_commit_ordered mutex ensures that we get the same
2481 snapshot for all engines (including the binary log). This allows us
2482 among other things to do backups with
2483 START TRANSACTION WITH CONSISTENT SNAPSHOT and
2484 have a consistent binlog position.
2485 */
2486 mysql_mutex_lock(&LOCK_commit_ordered);
2487 err= plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn);
2488 mysql_mutex_unlock(&LOCK_commit_ordered);
2489
2490 if (err)
2491 {
2492 ha_rollback_trans(thd, true);
2493 return 1;
2494 }
2495
2496 /*
2497 Same idea as when one wants to CREATE TABLE in one engine which does not
2498 exist:
2499 */
2500 if (warn)
2501 push_warning(thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
2502 "This MariaDB server does not support any "
2503 "consistent-read capable storage engine");
2504 return 0;
2505 }
2506
2507
flush_handlerton(THD * thd,plugin_ref plugin,void * arg)2508 static my_bool flush_handlerton(THD *thd, plugin_ref plugin,
2509 void *arg)
2510 {
2511 handlerton *hton= plugin_hton(plugin);
2512 if (hton->state == SHOW_OPTION_YES && hton->flush_logs &&
2513 hton->flush_logs(hton))
2514 return TRUE;
2515 return FALSE;
2516 }
2517
2518
ha_flush_logs(handlerton * db_type)2519 bool ha_flush_logs(handlerton *db_type)
2520 {
2521 if (db_type == NULL)
2522 {
2523 if (plugin_foreach(NULL, flush_handlerton,
2524 MYSQL_STORAGE_ENGINE_PLUGIN, 0))
2525 return TRUE;
2526 }
2527 else
2528 {
2529 if (db_type->state != SHOW_OPTION_YES ||
2530 (db_type->flush_logs && db_type->flush_logs(db_type)))
2531 return TRUE;
2532 }
2533 return FALSE;
2534 }
2535
2536
2537 /**
2538 @brief make canonical filename
2539
2540 @param[in] file table handler
2541 @param[in] path original path
2542 @param[out] tmp_path buffer for canonized path
2543
2544 @details Lower case db name and table name path parts for
2545 non file based tables when lower_case_table_names
2546 is 2 (store as is, compare in lower case).
2547 Filesystem path prefix (mysql_data_home or tmpdir)
2548 is left intact.
2549
2550 @note tmp_path may be left intact if no conversion was
2551 performed.
2552
2553 @retval canonized path
2554
2555 @todo This may be done more efficiently when table path
2556 gets built. Convert this function to something like
2557 ASSERT_CANONICAL_FILENAME.
2558 */
get_canonical_filename(handler * file,const char * path,char * tmp_path)2559 const char *get_canonical_filename(handler *file, const char *path,
2560 char *tmp_path)
2561 {
2562 uint i;
2563 if (lower_case_table_names != 2 || (file->ha_table_flags() & HA_FILE_BASED))
2564 return path;
2565
2566 for (i= 0; i <= mysql_tmpdir_list.max; i++)
2567 {
2568 if (is_prefix(path, mysql_tmpdir_list.list[i]))
2569 return path;
2570 }
2571
2572 /* Ensure that table handler get path in lower case */
2573 if (tmp_path != path)
2574 strmov(tmp_path, path);
2575
2576 /*
2577 we only should turn into lowercase database/table part
2578 so start the process after homedirectory
2579 */
2580 my_casedn_str(files_charset_info, tmp_path + mysql_data_home_len);
2581 return tmp_path;
2582 }
2583
2584
2585 /** delete a table in the engine
2586
2587 @note
2588 ENOENT and HA_ERR_NO_SUCH_TABLE are not considered errors.
2589 The .frm file will be deleted only if we return 0.
2590 */
ha_delete_table(THD * thd,handlerton * table_type,const char * path,const LEX_CSTRING * db,const LEX_CSTRING * alias,bool generate_warning)2591 int ha_delete_table(THD *thd, handlerton *table_type, const char *path,
2592 const LEX_CSTRING *db, const LEX_CSTRING *alias, bool generate_warning)
2593 {
2594 handler *file;
2595 char tmp_path[FN_REFLEN];
2596 int error;
2597 TABLE dummy_table;
2598 TABLE_SHARE dummy_share;
2599 DBUG_ENTER("ha_delete_table");
2600
2601 /* table_type is NULL in ALTER TABLE when renaming only .frm files */
2602 if (table_type == NULL || table_type == view_pseudo_hton ||
2603 ! (file=get_new_handler((TABLE_SHARE*)0, thd->mem_root, table_type)))
2604 DBUG_RETURN(0);
2605
2606 bzero((char*) &dummy_table, sizeof(dummy_table));
2607 bzero((char*) &dummy_share, sizeof(dummy_share));
2608 dummy_table.s= &dummy_share;
2609
2610 path= get_canonical_filename(file, path, tmp_path);
2611 if (unlikely((error= file->ha_delete_table(path))))
2612 {
2613 /*
2614 it's not an error if the table doesn't exist in the engine.
2615 warn the user, but still report DROP being a success
2616 */
2617 bool intercept= error == ENOENT || error == HA_ERR_NO_SUCH_TABLE;
2618
2619 if (!intercept || generate_warning)
2620 {
2621 /* Fill up strucutures that print_error may need */
2622 dummy_share.path.str= (char*) path;
2623 dummy_share.path.length= strlen(path);
2624 dummy_share.normalized_path= dummy_share.path;
2625 dummy_share.db= *db;
2626 dummy_share.table_name= *alias;
2627 dummy_table.alias.set(alias->str, alias->length, table_alias_charset);
2628 file->change_table_ptr(&dummy_table, &dummy_share);
2629 file->print_error(error, MYF(intercept ? ME_JUST_WARNING : 0));
2630 }
2631 if (intercept)
2632 error= 0;
2633 }
2634 delete file;
2635
2636 DBUG_RETURN(error);
2637 }
2638
2639 /****************************************************************************
2640 ** General handler functions
2641 ****************************************************************************/
2642
2643
2644 /**
2645 Clone a handler
2646
2647 @param name name of new table instance
2648 @param mem_root Where 'this->ref' should be allocated. It can't be
2649 in this->table->mem_root as otherwise we will not be
2650 able to reclaim that memory when the clone handler
2651 object is destroyed.
2652 */
2653
clone(const char * name,MEM_ROOT * mem_root)2654 handler *handler::clone(const char *name, MEM_ROOT *mem_root)
2655 {
2656 handler *new_handler= get_new_handler(table->s, mem_root, ht);
2657
2658 if (!new_handler)
2659 return NULL;
2660 if (new_handler->set_ha_share_ref(ha_share))
2661 goto err;
2662
2663 /*
2664 TODO: Implement a more efficient way to have more than one index open for
2665 the same table instance. The ha_open call is not cacheable for clone.
2666
2667 This is not critical as the engines already have the table open
2668 and should be able to use the original instance of the table.
2669 */
2670 if (new_handler->ha_open(table, name, table->db_stat,
2671 HA_OPEN_IGNORE_IF_LOCKED, mem_root))
2672 goto err;
2673
2674 return new_handler;
2675
2676 err:
2677 delete new_handler;
2678 return NULL;
2679 }
2680
engine_name()2681 LEX_CSTRING *handler::engine_name()
2682 {
2683 return hton_name(ht);
2684 }
2685
2686
keyread_time(uint index,uint ranges,ha_rows rows)2687 double handler::keyread_time(uint index, uint ranges, ha_rows rows)
2688 {
2689 /*
2690 It is assumed that we will read trough the whole key range and that all
2691 key blocks are half full (normally things are much better). It is also
2692 assumed that each time we read the next key from the index, the handler
2693 performs a random seek, thus the cost is proportional to the number of
2694 blocks read. This model does not take into account clustered indexes -
2695 engines that support that (e.g. InnoDB) may want to overwrite this method.
2696 The model counts in the time to read index entries from cache.
2697 */
2698 size_t len= table->key_info[index].key_length + ref_length;
2699 if (index == table->s->primary_key && table->file->primary_key_is_clustered())
2700 len= table->s->stored_rec_length;
2701 double keys_per_block= (stats.block_size/2.0/len+1);
2702 return (rows + keys_per_block-1)/ keys_per_block +
2703 len*rows/(stats.block_size+1)/TIME_FOR_COMPARE ;
2704 }
2705
ha_data(THD * thd) const2706 void **handler::ha_data(THD *thd) const
2707 {
2708 return thd_ha_data(thd, ht);
2709 }
2710
ha_thd(void) const2711 THD *handler::ha_thd(void) const
2712 {
2713 DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd);
2714 return (table && table->in_use) ? table->in_use : current_thd;
2715 }
2716
unbind_psi()2717 void handler::unbind_psi()
2718 {
2719 /*
2720 Notify the instrumentation that this table is not owned
2721 by this thread any more.
2722 */
2723 PSI_CALL_unbind_table(m_psi);
2724 }
2725
rebind_psi()2726 void handler::rebind_psi()
2727 {
2728 /*
2729 Notify the instrumentation that this table is now owned
2730 by this thread.
2731 */
2732 m_psi= PSI_CALL_rebind_table(ha_table_share_psi(), this, m_psi);
2733 }
2734
2735
ha_table_share_psi() const2736 PSI_table_share *handler::ha_table_share_psi() const
2737 {
2738 return table_share->m_psi;
2739 }
2740
2741 /** @brief
2742 Open database-handler.
2743
2744 IMPLEMENTATION
2745 Try O_RDONLY if cannot open as O_RDWR
2746 Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set
2747 */
ha_open(TABLE * table_arg,const char * name,int mode,uint test_if_locked,MEM_ROOT * mem_root,List<String> * partitions_to_open)2748 int handler::ha_open(TABLE *table_arg, const char *name, int mode,
2749 uint test_if_locked, MEM_ROOT *mem_root,
2750 List<String> *partitions_to_open)
2751 {
2752 int error;
2753 DBUG_ENTER("handler::ha_open");
2754 DBUG_PRINT("enter",
2755 ("name: %s db_type: %d db_stat: %d mode: %d lock_test: %d",
2756 name, ht->db_type, table_arg->db_stat, mode,
2757 test_if_locked));
2758
2759 table= table_arg;
2760 DBUG_ASSERT(table->s == table_share);
2761 DBUG_ASSERT(m_lock_type == F_UNLCK);
2762 DBUG_PRINT("info", ("old m_lock_type: %d F_UNLCK %d", m_lock_type, F_UNLCK));
2763 DBUG_ASSERT(alloc_root_inited(&table->mem_root));
2764
2765 set_partitions_to_open(partitions_to_open);
2766
2767 if (unlikely((error=open(name,mode,test_if_locked))))
2768 {
2769 if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
2770 (table->db_stat & HA_TRY_READ_ONLY))
2771 {
2772 table->db_stat|=HA_READ_ONLY;
2773 error=open(name,O_RDONLY,test_if_locked);
2774 }
2775 }
2776 if (unlikely(error))
2777 {
2778 my_errno= error; /* Safeguard */
2779 DBUG_PRINT("error",("error: %d errno: %d",error,errno));
2780 }
2781 else
2782 {
2783 DBUG_ASSERT(m_psi == NULL);
2784 DBUG_ASSERT(table_share != NULL);
2785 /*
2786 Do not call this for partitions handlers, since it may take too much
2787 resources.
2788 So only use the m_psi on table level, not for individual partitions.
2789 */
2790 if (!(test_if_locked & HA_OPEN_NO_PSI_CALL))
2791 {
2792 m_psi= PSI_CALL_open_table(ha_table_share_psi(), this);
2793 }
2794
2795 if (table->s->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
2796 table->db_stat|=HA_READ_ONLY;
2797 (void) extra(HA_EXTRA_NO_READCHECK); // Not needed in SQL
2798
2799 /* Allocate ref in thd or on the table's mem_root */
2800 if (!(ref= (uchar*) alloc_root(mem_root ? mem_root : &table->mem_root,
2801 ALIGN_SIZE(ref_length)*2)))
2802 {
2803 ha_close();
2804 error=HA_ERR_OUT_OF_MEM;
2805 }
2806 else
2807 dup_ref=ref+ALIGN_SIZE(ref_length);
2808 cached_table_flags= table_flags();
2809 }
2810 reset_statistics();
2811 internal_tmp_table= MY_TEST(test_if_locked & HA_OPEN_INTERNAL_TABLE);
2812
2813 DBUG_RETURN(error);
2814 }
2815
ha_close(void)2816 int handler::ha_close(void)
2817 {
2818 DBUG_ENTER("ha_close");
2819 /*
2820 Increment global statistics for temporary tables.
2821 In_use is 0 for tables that was closed from the table cache.
2822 */
2823 if (table->in_use)
2824 status_var_add(table->in_use->status_var.rows_tmp_read, rows_tmp_read);
2825 PSI_CALL_close_table(m_psi);
2826 m_psi= NULL; /* instrumentation handle, invalid after close_table() */
2827
2828 /* Detach from ANALYZE tracker */
2829 tracker= NULL;
2830
2831 DBUG_ASSERT(m_lock_type == F_UNLCK);
2832 DBUG_ASSERT(inited == NONE);
2833 DBUG_RETURN(close());
2834 }
2835
2836
ha_rnd_next(uchar * buf)2837 int handler::ha_rnd_next(uchar *buf)
2838 {
2839 int result;
2840 DBUG_ENTER("handler::ha_rnd_next");
2841 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
2842 m_lock_type != F_UNLCK);
2843 DBUG_ASSERT(inited == RND);
2844
2845 do
2846 {
2847 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
2848 { result= rnd_next(buf); })
2849 if (result != HA_ERR_RECORD_DELETED)
2850 break;
2851 status_var_increment(table->in_use->status_var.ha_read_rnd_deleted_count);
2852 } while (!table->in_use->check_killed(1));
2853
2854 if (result == HA_ERR_RECORD_DELETED)
2855 result= HA_ERR_ABORTED_BY_USER;
2856 else
2857 {
2858 if (!result)
2859 {
2860 update_rows_read();
2861 if (table->vfield && buf == table->record[0])
2862 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
2863 }
2864 increment_statistics(&SSV::ha_read_rnd_next_count);
2865 }
2866
2867 table->status=result ? STATUS_NOT_FOUND: 0;
2868 DBUG_RETURN(result);
2869 }
2870
ha_rnd_pos(uchar * buf,uchar * pos)2871 int handler::ha_rnd_pos(uchar *buf, uchar *pos)
2872 {
2873 int result;
2874 DBUG_ENTER("handler::ha_rnd_pos");
2875 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
2876 m_lock_type != F_UNLCK);
2877 DBUG_ASSERT(inited == RND);
2878
2879 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
2880 { result= rnd_pos(buf, pos); })
2881 increment_statistics(&SSV::ha_read_rnd_count);
2882 if (result == HA_ERR_RECORD_DELETED)
2883 result= HA_ERR_KEY_NOT_FOUND;
2884 else if (!result)
2885 {
2886 update_rows_read();
2887 if (table->vfield && buf == table->record[0])
2888 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
2889 }
2890 table->status=result ? STATUS_NOT_FOUND: 0;
2891 DBUG_RETURN(result);
2892 }
2893
ha_index_read_map(uchar * buf,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)2894 int handler::ha_index_read_map(uchar *buf, const uchar *key,
2895 key_part_map keypart_map,
2896 enum ha_rkey_function find_flag)
2897 {
2898 int result;
2899 DBUG_ENTER("handler::ha_index_read_map");
2900 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
2901 m_lock_type != F_UNLCK);
2902 DBUG_ASSERT(inited==INDEX);
2903
2904 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
2905 { result= index_read_map(buf, key, keypart_map, find_flag); })
2906 increment_statistics(&SSV::ha_read_key_count);
2907 if (!result)
2908 {
2909 update_index_statistics();
2910 if (table->vfield && buf == table->record[0])
2911 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
2912 }
2913 table->status=result ? STATUS_NOT_FOUND: 0;
2914 DBUG_RETURN(result);
2915 }
2916
2917 /*
2918 @note: Other index lookup/navigation functions require prior
2919 handler->index_init() call. This function is different, it requires
2920 that the scan is not initialized, and accepts "uint index" as an argument.
2921 */
2922
ha_index_read_idx_map(uchar * buf,uint index,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)2923 int handler::ha_index_read_idx_map(uchar *buf, uint index, const uchar *key,
2924 key_part_map keypart_map,
2925 enum ha_rkey_function find_flag)
2926 {
2927 int result;
2928 DBUG_ASSERT(inited==NONE);
2929 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
2930 m_lock_type != F_UNLCK);
2931 DBUG_ASSERT(end_range == NULL);
2932 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, index, 0,
2933 { result= index_read_idx_map(buf, index, key, keypart_map, find_flag); })
2934 increment_statistics(&SSV::ha_read_key_count);
2935 if (!result)
2936 {
2937 update_rows_read();
2938 index_rows_read[index]++;
2939 if (table->vfield && buf == table->record[0])
2940 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
2941 }
2942 table->status=result ? STATUS_NOT_FOUND: 0;
2943 return result;
2944 }
2945
ha_index_next(uchar * buf)2946 int handler::ha_index_next(uchar * buf)
2947 {
2948 int result;
2949 DBUG_ENTER("handler::ha_index_next");
2950 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
2951 m_lock_type != F_UNLCK);
2952 DBUG_ASSERT(inited==INDEX);
2953
2954 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
2955 { result= index_next(buf); })
2956 increment_statistics(&SSV::ha_read_next_count);
2957 if (!result)
2958 {
2959 update_index_statistics();
2960 if (table->vfield && buf == table->record[0])
2961 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
2962 }
2963 table->status=result ? STATUS_NOT_FOUND: 0;
2964 DBUG_RETURN(result);
2965 }
2966
ha_index_prev(uchar * buf)2967 int handler::ha_index_prev(uchar * buf)
2968 {
2969 int result;
2970 DBUG_ENTER("handler::ha_index_prev");
2971 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
2972 m_lock_type != F_UNLCK);
2973 DBUG_ASSERT(inited==INDEX);
2974
2975 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
2976 { result= index_prev(buf); })
2977 increment_statistics(&SSV::ha_read_prev_count);
2978 if (!result)
2979 {
2980 update_index_statistics();
2981 if (table->vfield && buf == table->record[0])
2982 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
2983 }
2984 table->status=result ? STATUS_NOT_FOUND: 0;
2985 DBUG_RETURN(result);
2986 }
2987
ha_index_first(uchar * buf)2988 int handler::ha_index_first(uchar * buf)
2989 {
2990 int result;
2991 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
2992 m_lock_type != F_UNLCK);
2993 DBUG_ASSERT(inited==INDEX);
2994
2995 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
2996 { result= index_first(buf); })
2997 increment_statistics(&SSV::ha_read_first_count);
2998 if (!result)
2999 {
3000 update_index_statistics();
3001 if (table->vfield && buf == table->record[0])
3002 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
3003 }
3004 table->status=result ? STATUS_NOT_FOUND: 0;
3005 return result;
3006 }
3007
ha_index_last(uchar * buf)3008 int handler::ha_index_last(uchar * buf)
3009 {
3010 int result;
3011 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
3012 m_lock_type != F_UNLCK);
3013 DBUG_ASSERT(inited==INDEX);
3014
3015 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
3016 { result= index_last(buf); })
3017 increment_statistics(&SSV::ha_read_last_count);
3018 if (!result)
3019 {
3020 update_index_statistics();
3021 if (table->vfield && buf == table->record[0])
3022 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
3023 }
3024 table->status=result ? STATUS_NOT_FOUND: 0;
3025 return result;
3026 }
3027
ha_index_next_same(uchar * buf,const uchar * key,uint keylen)3028 int handler::ha_index_next_same(uchar *buf, const uchar *key, uint keylen)
3029 {
3030 int result;
3031 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
3032 m_lock_type != F_UNLCK);
3033 DBUG_ASSERT(inited==INDEX);
3034
3035 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
3036 { result= index_next_same(buf, key, keylen); })
3037 increment_statistics(&SSV::ha_read_next_count);
3038 if (!result)
3039 {
3040 update_index_statistics();
3041 if (table->vfield && buf == table->record[0])
3042 table->update_virtual_fields(this, VCOL_UPDATE_FOR_READ);
3043 }
3044 table->status=result ? STATUS_NOT_FOUND: 0;
3045 return result;
3046 }
3047
3048
ha_was_semi_consistent_read()3049 bool handler::ha_was_semi_consistent_read()
3050 {
3051 bool result= was_semi_consistent_read();
3052 if (result)
3053 increment_statistics(&SSV::ha_read_retry_count);
3054 return result;
3055 }
3056
3057 /* Initialize handler for random reading, with error handling */
3058
ha_rnd_init_with_error(bool scan)3059 int handler::ha_rnd_init_with_error(bool scan)
3060 {
3061 int error;
3062 if (likely(!(error= ha_rnd_init(scan))))
3063 return 0;
3064 table->file->print_error(error, MYF(0));
3065 return error;
3066 }
3067
3068
3069 /**
3070 Read first row (only) from a table. Used for reading tables with
3071 only one row, either based on table statistics or if table is a SEQUENCE.
3072
3073 This is never called for normal InnoDB tables, as these table types
3074 does not have HA_STATS_RECORDS_IS_EXACT set.
3075 */
read_first_row(uchar * buf,uint primary_key)3076 int handler::read_first_row(uchar * buf, uint primary_key)
3077 {
3078 int error;
3079 DBUG_ENTER("handler::read_first_row");
3080
3081 /*
3082 If there is very few deleted rows in the table, find the first row by
3083 scanning the table.
3084 TODO remove the test for HA_READ_ORDER
3085 */
3086 if (stats.deleted < 10 || primary_key >= MAX_KEY ||
3087 !(index_flags(primary_key, 0, 0) & HA_READ_ORDER))
3088 {
3089 if (likely(!(error= ha_rnd_init(1))))
3090 {
3091 error= ha_rnd_next(buf);
3092 const int end_error= ha_rnd_end();
3093 if (likely(!error))
3094 error= end_error;
3095 }
3096 }
3097 else
3098 {
3099 /* Find the first row through the primary key */
3100 if (likely(!(error= ha_index_init(primary_key, 0))))
3101 {
3102 error= ha_index_first(buf);
3103 const int end_error= ha_index_end();
3104 if (likely(!error))
3105 error= end_error;
3106 }
3107 }
3108 DBUG_RETURN(error);
3109 }
3110
3111 /**
3112 Generate the next auto-increment number based on increment and offset.
3113 computes the lowest number
3114 - strictly greater than "nr"
3115 - of the form: auto_increment_offset + N * auto_increment_increment
3116 If overflow happened then return MAX_ULONGLONG value as an
3117 indication of overflow.
3118 In most cases increment= offset= 1, in which case we get:
3119 @verbatim 1,2,3,4,5,... @endverbatim
3120 If increment=10 and offset=5 and previous number is 1, we get:
3121 @verbatim 1,5,15,25,35,... @endverbatim
3122 */
3123 inline ulonglong
compute_next_insert_id(ulonglong nr,struct system_variables * variables)3124 compute_next_insert_id(ulonglong nr,struct system_variables *variables)
3125 {
3126 const ulonglong save_nr= nr;
3127
3128 if (variables->auto_increment_increment == 1)
3129 nr= nr + 1; // optimization of the formula below
3130 else
3131 {
3132 /*
3133 Calculating the number of complete auto_increment_increment extents:
3134 */
3135 nr= (nr + variables->auto_increment_increment -
3136 variables->auto_increment_offset) /
3137 (ulonglong) variables->auto_increment_increment;
3138 /*
3139 Adding an offset to the auto_increment_increment extent boundary:
3140 */
3141 nr= nr * (ulonglong) variables->auto_increment_increment +
3142 variables->auto_increment_offset;
3143 }
3144
3145 if (unlikely(nr <= save_nr))
3146 return ULONGLONG_MAX;
3147
3148 return nr;
3149 }
3150
3151
adjust_next_insert_id_after_explicit_value(ulonglong nr)3152 void handler::adjust_next_insert_id_after_explicit_value(ulonglong nr)
3153 {
3154 /*
3155 If we have set THD::next_insert_id previously and plan to insert an
3156 explicitly-specified value larger than this, we need to increase
3157 THD::next_insert_id to be greater than the explicit value.
3158 */
3159 if ((next_insert_id > 0) && (nr >= next_insert_id))
3160 set_next_insert_id(compute_next_insert_id(nr, &table->in_use->variables));
3161 }
3162
3163
3164 /** @brief
3165 Computes the largest number X:
3166 - smaller than or equal to "nr"
3167 - of the form: auto_increment_offset + N * auto_increment_increment
3168 where N>=0.
3169
3170 SYNOPSIS
3171 prev_insert_id
3172 nr Number to "round down"
3173 variables variables struct containing auto_increment_increment and
3174 auto_increment_offset
3175
3176 RETURN
3177 The number X if it exists, "nr" otherwise.
3178 */
3179 inline ulonglong
prev_insert_id(ulonglong nr,struct system_variables * variables)3180 prev_insert_id(ulonglong nr, struct system_variables *variables)
3181 {
3182 if (unlikely(nr < variables->auto_increment_offset))
3183 {
3184 /*
3185 There's nothing good we can do here. That is a pathological case, where
3186 the offset is larger than the column's max possible value, i.e. not even
3187 the first sequence value may be inserted. User will receive warning.
3188 */
3189 DBUG_PRINT("info",("auto_increment: nr: %lu cannot honour "
3190 "auto_increment_offset: %lu",
3191 (ulong) nr, variables->auto_increment_offset));
3192 return nr;
3193 }
3194 if (variables->auto_increment_increment == 1)
3195 return nr; // optimization of the formula below
3196 /*
3197 Calculating the number of complete auto_increment_increment extents:
3198 */
3199 nr= (nr - variables->auto_increment_offset) /
3200 (ulonglong) variables->auto_increment_increment;
3201 /*
3202 Adding an offset to the auto_increment_increment extent boundary:
3203 */
3204 return (nr * (ulonglong) variables->auto_increment_increment +
3205 variables->auto_increment_offset);
3206 }
3207
3208
3209 /**
3210 Update the auto_increment field if necessary.
3211
3212 Updates columns with type NEXT_NUMBER if:
3213
3214 - If column value is set to NULL (in which case
3215 auto_increment_field_not_null is 0)
3216 - If column is set to 0 and (sql_mode & MODE_NO_AUTO_VALUE_ON_ZERO) is not
3217 set. In the future we will only set NEXT_NUMBER fields if one sets them
3218 to NULL (or they are not included in the insert list).
3219
3220 In those cases, we check if the currently reserved interval still has
3221 values we have not used. If yes, we pick the smallest one and use it.
3222 Otherwise:
3223
3224 - If a list of intervals has been provided to the statement via SET
3225 INSERT_ID or via an Intvar_log_event (in a replication slave), we pick the
3226 first unused interval from this list, consider it as reserved.
3227
3228 - Otherwise we set the column for the first row to the value
3229 next_insert_id(get_auto_increment(column))) which is usually
3230 max-used-column-value+1.
3231 We call get_auto_increment() for the first row in a multi-row
3232 statement. get_auto_increment() will tell us the interval of values it
3233 reserved for us.
3234
3235 - In both cases, for the following rows we use those reserved values without
3236 calling the handler again (we just progress in the interval, computing
3237 each new value from the previous one). Until we have exhausted them, then
3238 we either take the next provided interval or call get_auto_increment()
3239 again to reserve a new interval.
3240
3241 - In both cases, the reserved intervals are remembered in
3242 thd->auto_inc_intervals_in_cur_stmt_for_binlog if statement-based
3243 binlogging; the last reserved interval is remembered in
3244 auto_inc_interval_for_cur_row. The number of reserved intervals is
3245 remembered in auto_inc_intervals_count. It differs from the number of
3246 elements in thd->auto_inc_intervals_in_cur_stmt_for_binlog() because the
3247 latter list is cumulative over all statements forming one binlog event
3248 (when stored functions and triggers are used), and collapses two
3249 contiguous intervals in one (see its append() method).
3250
3251 The idea is that generated auto_increment values are predictable and
3252 independent of the column values in the table. This is needed to be
3253 able to replicate into a table that already has rows with a higher
3254 auto-increment value than the one that is inserted.
3255
3256 After we have already generated an auto-increment number and the user
3257 inserts a column with a higher value than the last used one, we will
3258 start counting from the inserted value.
3259
3260 This function's "outputs" are: the table's auto_increment field is filled
3261 with a value, thd->next_insert_id is filled with the value to use for the
3262 next row, if a value was autogenerated for the current row it is stored in
3263 thd->insert_id_for_cur_row, if get_auto_increment() was called
3264 thd->auto_inc_interval_for_cur_row is modified, if that interval is not
3265 present in thd->auto_inc_intervals_in_cur_stmt_for_binlog it is added to
3266 this list.
3267
3268 @todo
3269 Replace all references to "next number" or NEXT_NUMBER to
3270 "auto_increment", everywhere (see below: there is
3271 table->auto_increment_field_not_null, and there also exists
3272 table->next_number_field, it's not consistent).
3273
3274 @retval
3275 0 ok
3276 @retval
3277 HA_ERR_AUTOINC_READ_FAILED get_auto_increment() was called and
3278 returned ~(ulonglong) 0
3279 @retval
3280 HA_ERR_AUTOINC_ERANGE storing value in field caused strict mode
3281 failure.
3282 */
3283
3284 #define AUTO_INC_DEFAULT_NB_ROWS 1 // Some prefer 1024 here
3285 #define AUTO_INC_DEFAULT_NB_MAX_BITS 16
3286 #define AUTO_INC_DEFAULT_NB_MAX ((1 << AUTO_INC_DEFAULT_NB_MAX_BITS) - 1)
3287
update_auto_increment()3288 int handler::update_auto_increment()
3289 {
3290 ulonglong nr, nb_reserved_values;
3291 bool append= FALSE;
3292 THD *thd= table->in_use;
3293 struct system_variables *variables= &thd->variables;
3294 int result=0, tmp;
3295 DBUG_ENTER("handler::update_auto_increment");
3296
3297 /*
3298 next_insert_id is a "cursor" into the reserved interval, it may go greater
3299 than the interval, but not smaller.
3300 */
3301 DBUG_ASSERT(next_insert_id >= auto_inc_interval_for_cur_row.minimum());
3302
3303 if ((nr= table->next_number_field->val_int()) != 0 ||
3304 (table->auto_increment_field_not_null &&
3305 thd->variables.sql_mode & MODE_NO_AUTO_VALUE_ON_ZERO))
3306 {
3307
3308 /*
3309 There could be an error reported because value was truncated
3310 when strict mode is enabled.
3311 */
3312 if (thd->is_error())
3313 DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
3314 /*
3315 Update next_insert_id if we had already generated a value in this
3316 statement (case of INSERT VALUES(null),(3763),(null):
3317 the last NULL needs to insert 3764, not the value of the first NULL plus
3318 1).
3319 Ignore negative values.
3320 */
3321 if ((longlong) nr > 0 || (table->next_number_field->flags & UNSIGNED_FLAG))
3322 adjust_next_insert_id_after_explicit_value(nr);
3323 insert_id_for_cur_row= 0; // didn't generate anything
3324 DBUG_RETURN(0);
3325 }
3326
3327 if (table->versioned())
3328 {
3329 Field *end= table->vers_end_field();
3330 DBUG_ASSERT(end);
3331 bitmap_set_bit(table->read_set, end->field_index);
3332 if (!end->is_max())
3333 {
3334 if (thd->lex->sql_command == SQLCOM_ALTER_TABLE)
3335 {
3336 if (!table->next_number_field->real_maybe_null())
3337 DBUG_RETURN(HA_ERR_UNSUPPORTED);
3338 table->next_number_field->set_null();
3339 }
3340 DBUG_RETURN(0);
3341 }
3342 }
3343
3344 // ALTER TABLE ... ADD COLUMN ... AUTO_INCREMENT
3345 if (thd->lex->sql_command == SQLCOM_ALTER_TABLE)
3346 table->next_number_field->set_notnull();
3347
3348 if ((nr= next_insert_id) >= auto_inc_interval_for_cur_row.maximum())
3349 {
3350 /* next_insert_id is beyond what is reserved, so we reserve more. */
3351 const Discrete_interval *forced=
3352 thd->auto_inc_intervals_forced.get_next();
3353 if (forced != NULL)
3354 {
3355 nr= forced->minimum();
3356 nb_reserved_values= forced->values();
3357 }
3358 else
3359 {
3360 /*
3361 handler::estimation_rows_to_insert was set by
3362 handler::ha_start_bulk_insert(); if 0 it means "unknown".
3363 */
3364 ulonglong nb_desired_values;
3365 /*
3366 If an estimation was given to the engine:
3367 - use it.
3368 - if we already reserved numbers, it means the estimation was
3369 not accurate, then we'll reserve 2*AUTO_INC_DEFAULT_NB_ROWS the 2nd
3370 time, twice that the 3rd time etc.
3371 If no estimation was given, use those increasing defaults from the
3372 start, starting from AUTO_INC_DEFAULT_NB_ROWS.
3373 Don't go beyond a max to not reserve "way too much" (because
3374 reservation means potentially losing unused values).
3375 Note that in prelocked mode no estimation is given.
3376 */
3377
3378 if ((auto_inc_intervals_count == 0) && (estimation_rows_to_insert > 0))
3379 nb_desired_values= estimation_rows_to_insert;
3380 else if ((auto_inc_intervals_count == 0) &&
3381 (thd->lex->many_values.elements > 0))
3382 {
3383 /*
3384 For multi-row inserts, if the bulk inserts cannot be started, the
3385 handler::estimation_rows_to_insert will not be set. But we still
3386 want to reserve the autoinc values.
3387 */
3388 nb_desired_values= thd->lex->many_values.elements;
3389 }
3390 else /* go with the increasing defaults */
3391 {
3392 /* avoid overflow in formula, with this if() */
3393 if (auto_inc_intervals_count <= AUTO_INC_DEFAULT_NB_MAX_BITS)
3394 {
3395 nb_desired_values= AUTO_INC_DEFAULT_NB_ROWS *
3396 (1 << auto_inc_intervals_count);
3397 set_if_smaller(nb_desired_values, AUTO_INC_DEFAULT_NB_MAX);
3398 }
3399 else
3400 nb_desired_values= AUTO_INC_DEFAULT_NB_MAX;
3401 }
3402 get_auto_increment(variables->auto_increment_offset,
3403 variables->auto_increment_increment,
3404 nb_desired_values, &nr,
3405 &nb_reserved_values);
3406 if (nr == ULONGLONG_MAX)
3407 DBUG_RETURN(HA_ERR_AUTOINC_READ_FAILED); // Mark failure
3408
3409 /*
3410 That rounding below should not be needed when all engines actually
3411 respect offset and increment in get_auto_increment(). But they don't
3412 so we still do it. Wonder if for the not-first-in-index we should do
3413 it. Hope that this rounding didn't push us out of the interval; even
3414 if it did we cannot do anything about it (calling the engine again
3415 will not help as we inserted no row).
3416 */
3417 nr= compute_next_insert_id(nr-1, variables);
3418 }
3419
3420 if (table->s->next_number_keypart == 0)
3421 {
3422 /* We must defer the appending until "nr" has been possibly truncated */
3423 append= TRUE;
3424 }
3425 else
3426 {
3427 /*
3428 For such auto_increment there is no notion of interval, just a
3429 singleton. The interval is not even stored in
3430 thd->auto_inc_interval_for_cur_row, so we are sure to call the engine
3431 for next row.
3432 */
3433 DBUG_PRINT("info",("auto_increment: special not-first-in-index"));
3434 }
3435 }
3436
3437 if (unlikely(nr == ULONGLONG_MAX))
3438 DBUG_RETURN(HA_ERR_AUTOINC_ERANGE);
3439
3440 DBUG_ASSERT(nr != 0);
3441 DBUG_PRINT("info",("auto_increment: %llu nb_reserved_values: %llu",
3442 nr, append ? nb_reserved_values : 0));
3443
3444 /* Store field without warning (Warning will be printed by insert) */
3445 {
3446 Check_level_instant_set check_level_save(thd, CHECK_FIELD_IGNORE);
3447 tmp= table->next_number_field->store((longlong)nr, TRUE);
3448 }
3449
3450 if (unlikely(tmp)) // Out of range value in store
3451 {
3452 /*
3453 First, test if the query was aborted due to strict mode constraints
3454 or new field value greater than maximum integer value:
3455 */
3456 if (thd->killed == KILL_BAD_DATA ||
3457 nr > table->next_number_field->get_max_int_value())
3458 {
3459 /*
3460 It's better to return an error here than getting a confusing
3461 'duplicate key error' later.
3462 */
3463 result= HA_ERR_AUTOINC_ERANGE;
3464 }
3465 else
3466 {
3467 /*
3468 Field refused this value (overflow) and truncated it, use the result
3469 of the truncation (which is going to be inserted); however we try to
3470 decrease it to honour auto_increment_* variables.
3471 That will shift the left bound of the reserved interval, we don't
3472 bother shifting the right bound (anyway any other value from this
3473 interval will cause a duplicate key).
3474 */
3475 nr= prev_insert_id(table->next_number_field->val_int(), variables);
3476 if (unlikely(table->next_number_field->store((longlong)nr, TRUE)))
3477 nr= table->next_number_field->val_int();
3478 }
3479 }
3480 if (append)
3481 {
3482 auto_inc_interval_for_cur_row.replace(nr, nb_reserved_values,
3483 variables->auto_increment_increment);
3484 auto_inc_intervals_count++;
3485 /* Row-based replication does not need to store intervals in binlog */
3486 if (((WSREP(thd) && wsrep_emulate_bin_log ) || mysql_bin_log.is_open())
3487 && !thd->is_current_stmt_binlog_format_row())
3488 thd->auto_inc_intervals_in_cur_stmt_for_binlog.
3489 append(auto_inc_interval_for_cur_row.minimum(),
3490 auto_inc_interval_for_cur_row.values(),
3491 variables->auto_increment_increment);
3492 }
3493
3494 /*
3495 Record this autogenerated value. If the caller then
3496 succeeds to insert this value, it will call
3497 record_first_successful_insert_id_in_cur_stmt()
3498 which will set first_successful_insert_id_in_cur_stmt if it's not
3499 already set.
3500 */
3501 insert_id_for_cur_row= nr;
3502
3503 if (result) // overflow
3504 DBUG_RETURN(result);
3505
3506 /*
3507 Set next insert id to point to next auto-increment value to be able to
3508 handle multi-row statements.
3509 */
3510 set_next_insert_id(compute_next_insert_id(nr, variables));
3511
3512 DBUG_RETURN(0);
3513 }
3514
3515
3516 /** @brief
3517 MySQL signal that it changed the column bitmap
3518
3519 USAGE
3520 This is for handlers that needs to setup their own column bitmaps.
3521 Normally the handler should set up their own column bitmaps in
3522 index_init() or rnd_init() and in any column_bitmaps_signal() call after
3523 this.
3524
3525 The handler is allowed to do changes to the bitmap after a index_init or
3526 rnd_init() call is made as after this, MySQL will not use the bitmap
3527 for any program logic checking.
3528 */
column_bitmaps_signal()3529 void handler::column_bitmaps_signal()
3530 {
3531 DBUG_ENTER("column_bitmaps_signal");
3532 if (table)
3533 DBUG_PRINT("info", ("read_set: %p write_set: %p",
3534 table->read_set, table->write_set));
3535 DBUG_VOID_RETURN;
3536 }
3537
3538
3539 /** @brief
3540 Reserves an interval of auto_increment values from the handler.
3541
3542 SYNOPSIS
3543 get_auto_increment()
3544 offset
3545 increment
3546 nb_desired_values how many values we want
3547 first_value (OUT) the first value reserved by the handler
3548 nb_reserved_values (OUT) how many values the handler reserved
3549
3550 offset and increment means that we want values to be of the form
3551 offset + N * increment, where N>=0 is integer.
3552 If the function sets *first_value to ~(ulonglong)0 it means an error.
3553 If the function sets *nb_reserved_values to ULONGLONG_MAX it means it has
3554 reserved to "positive infinite".
3555 */
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)3556 void handler::get_auto_increment(ulonglong offset, ulonglong increment,
3557 ulonglong nb_desired_values,
3558 ulonglong *first_value,
3559 ulonglong *nb_reserved_values)
3560 {
3561 ulonglong nr;
3562 int error;
3563 MY_BITMAP *old_read_set;
3564 bool rnd_inited= (inited == RND);
3565
3566 if (rnd_inited && ha_rnd_end())
3567 return;
3568
3569 old_read_set= table->prepare_for_keyread(table->s->next_number_index);
3570
3571 if (ha_index_init(table->s->next_number_index, 1))
3572 {
3573 /* This should never happen, assert in debug, and fail in release build */
3574 DBUG_ASSERT(0);
3575 (void) extra(HA_EXTRA_NO_KEYREAD);
3576 *first_value= ULONGLONG_MAX;
3577 if (rnd_inited && ha_rnd_init_with_error(0))
3578 {
3579 //TODO: it would be nice to return here an error
3580 }
3581 return;
3582 }
3583
3584 if (table->s->next_number_keypart == 0)
3585 { // Autoincrement at key-start
3586 error= ha_index_last(table->record[1]);
3587 /*
3588 MySQL implicitly assumes such method does locking (as MySQL decides to
3589 use nr+increment without checking again with the handler, in
3590 handler::update_auto_increment()), so reserves to infinite.
3591 */
3592 *nb_reserved_values= ULONGLONG_MAX;
3593 }
3594 else
3595 {
3596 uchar key[MAX_KEY_LENGTH];
3597 key_copy(key, table->record[0],
3598 table->key_info + table->s->next_number_index,
3599 table->s->next_number_key_offset);
3600 error= ha_index_read_map(table->record[1], key,
3601 make_prev_keypart_map(table->s->
3602 next_number_keypart),
3603 HA_READ_PREFIX_LAST);
3604 /*
3605 MySQL needs to call us for next row: assume we are inserting ("a",null)
3606 here, we return 3, and next this statement will want to insert
3607 ("b",null): there is no reason why ("b",3+1) would be the good row to
3608 insert: maybe it already exists, maybe 3+1 is too large...
3609 */
3610 *nb_reserved_values= 1;
3611 }
3612
3613 if (unlikely(error))
3614 {
3615 if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
3616 /* No entry found, that's fine */;
3617 else
3618 print_error(error, MYF(0));
3619 nr= 1;
3620 }
3621 else
3622 nr= ((ulonglong) table->next_number_field->
3623 val_int_offset(table->s->rec_buff_length)+1);
3624 ha_index_end();
3625 table->restore_column_maps_after_keyread(old_read_set);
3626 *first_value= nr;
3627 if (rnd_inited && ha_rnd_init_with_error(0))
3628 {
3629 //TODO: it would be nice to return here an error
3630 }
3631 return;
3632 }
3633
3634
ha_release_auto_increment()3635 void handler::ha_release_auto_increment()
3636 {
3637 DBUG_ENTER("ha_release_auto_increment");
3638 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
3639 m_lock_type != F_UNLCK ||
3640 (!next_insert_id && !insert_id_for_cur_row));
3641 release_auto_increment();
3642 insert_id_for_cur_row= 0;
3643 auto_inc_interval_for_cur_row.replace(0, 0, 0);
3644 auto_inc_intervals_count= 0;
3645 if (next_insert_id > 0)
3646 {
3647 next_insert_id= 0;
3648 /*
3649 this statement used forced auto_increment values if there were some,
3650 wipe them away for other statements.
3651 */
3652 table->in_use->auto_inc_intervals_forced.empty();
3653 }
3654 DBUG_VOID_RETURN;
3655 }
3656
3657
3658 /**
3659 Construct and emit duplicate key error message using information
3660 from table's record buffer.
3661
3662 @param table TABLE object which record buffer should be used as
3663 source for column values.
3664 @param key Key description.
3665 @param msg Error message template to which key value should be
3666 added.
3667 @param errflag Flags for my_error() call.
3668
3669 @notes
3670 The error message is from ER_DUP_ENTRY_WITH_KEY_NAME but to keep things compatibly
3671 with old code, the error number is ER_DUP_ENTRY
3672 */
3673
print_keydup_error(TABLE * table,KEY * key,const char * msg,myf errflag)3674 void print_keydup_error(TABLE *table, KEY *key, const char *msg, myf errflag)
3675 {
3676 /* Write the duplicated key in the error message */
3677 char key_buff[MAX_KEY_LENGTH];
3678 String str(key_buff,sizeof(key_buff),system_charset_info);
3679
3680 if (key == NULL)
3681 {
3682 /*
3683 Key is unknown. Should only happen if storage engine reports wrong
3684 duplicate key number.
3685 */
3686 my_printf_error(ER_DUP_ENTRY, msg, errflag, "", "*UNKNOWN*");
3687 }
3688 else
3689 {
3690 /* Table is opened and defined at this point */
3691 key_unpack(&str,table, key);
3692 uint max_length=MYSQL_ERRMSG_SIZE-(uint) strlen(msg);
3693 if (str.length() >= max_length)
3694 {
3695 str.length(max_length-4);
3696 str.append(STRING_WITH_LEN("..."));
3697 }
3698 my_printf_error(ER_DUP_ENTRY, msg, errflag, str.c_ptr_safe(),
3699 key->name.str);
3700 }
3701 }
3702
3703 /**
3704 Construct and emit duplicate key error message using information
3705 from table's record buffer.
3706
3707 @sa print_keydup_error(table, key, msg, errflag).
3708 */
3709
print_keydup_error(TABLE * table,KEY * key,myf errflag)3710 void print_keydup_error(TABLE *table, KEY *key, myf errflag)
3711 {
3712 print_keydup_error(table, key,
3713 ER_THD(table->in_use, ER_DUP_ENTRY_WITH_KEY_NAME),
3714 errflag);
3715 }
3716
3717
3718 /**
3719 Print error that we got from handler function.
3720
3721 @note
3722 In case of delete table it's only safe to use the following parts of
3723 the 'table' structure:
3724 - table->s->path
3725 - table->alias
3726 */
3727
3728 #define SET_FATAL_ERROR fatal_error=1
3729
print_error(int error,myf errflag)3730 void handler::print_error(int error, myf errflag)
3731 {
3732 bool fatal_error= 0;
3733 DBUG_ENTER("handler::print_error");
3734 DBUG_PRINT("enter",("error: %d",error));
3735
3736 if (ha_thd()->transaction_rollback_request)
3737 {
3738 /* Ensure this becomes a true error */
3739 errflag&= ~(ME_JUST_WARNING | ME_JUST_INFO);
3740 }
3741
3742 int textno= -1; // impossible value
3743 switch (error) {
3744 case EACCES:
3745 textno=ER_OPEN_AS_READONLY;
3746 break;
3747 case EAGAIN:
3748 textno=ER_FILE_USED;
3749 break;
3750 case ENOENT:
3751 case ENOTDIR:
3752 case ELOOP:
3753 textno=ER_FILE_NOT_FOUND;
3754 break;
3755 case ENOSPC:
3756 case HA_ERR_DISK_FULL:
3757 textno= ER_DISK_FULL;
3758 SET_FATAL_ERROR; // Ensure error is logged
3759 break;
3760 case HA_ERR_KEY_NOT_FOUND:
3761 case HA_ERR_NO_ACTIVE_RECORD:
3762 case HA_ERR_RECORD_DELETED:
3763 case HA_ERR_END_OF_FILE:
3764 /*
3765 This errors is not not normally fatal (for example for reads). However
3766 if you get it during an update or delete, then its fatal.
3767 As the user is calling print_error() (which is not done on read), we
3768 assume something when wrong with the update or delete.
3769 */
3770 SET_FATAL_ERROR;
3771 textno=ER_KEY_NOT_FOUND;
3772 break;
3773 case HA_ERR_ABORTED_BY_USER:
3774 {
3775 DBUG_ASSERT(ha_thd()->killed);
3776 ha_thd()->send_kill_message();
3777 DBUG_VOID_RETURN;
3778 }
3779 case HA_ERR_WRONG_MRG_TABLE_DEF:
3780 textno=ER_WRONG_MRG_TABLE;
3781 break;
3782 case HA_ERR_FOUND_DUPP_KEY:
3783 {
3784 if (table)
3785 {
3786 uint key_nr=get_dup_key(error);
3787 if ((int) key_nr >= 0 && key_nr < table->s->keys)
3788 {
3789 print_keydup_error(table, &table->key_info[key_nr], errflag);
3790 DBUG_VOID_RETURN;
3791 }
3792 }
3793 textno=ER_DUP_KEY;
3794 break;
3795 }
3796 case HA_ERR_FOREIGN_DUPLICATE_KEY:
3797 {
3798 char rec_buf[MAX_KEY_LENGTH];
3799 String rec(rec_buf, sizeof(rec_buf), system_charset_info);
3800 /* Table is opened and defined at this point */
3801
3802 /*
3803 Just print the subset of fields that are part of the first index,
3804 printing the whole row from there is not easy.
3805 */
3806 key_unpack(&rec, table, &table->key_info[0]);
3807
3808 char child_table_name[NAME_LEN + 1];
3809 char child_key_name[NAME_LEN + 1];
3810 if (get_foreign_dup_key(child_table_name, sizeof(child_table_name),
3811 child_key_name, sizeof(child_key_name)))
3812 {
3813 my_error(ER_FOREIGN_DUPLICATE_KEY_WITH_CHILD_INFO, errflag,
3814 table_share->table_name.str, rec.c_ptr_safe(),
3815 child_table_name, child_key_name);
3816 }
3817 else
3818 {
3819 my_error(ER_FOREIGN_DUPLICATE_KEY_WITHOUT_CHILD_INFO, errflag,
3820 table_share->table_name.str, rec.c_ptr_safe());
3821 }
3822 DBUG_VOID_RETURN;
3823 }
3824 case HA_ERR_NULL_IN_SPATIAL:
3825 my_error(ER_CANT_CREATE_GEOMETRY_OBJECT, errflag);
3826 DBUG_VOID_RETURN;
3827 case HA_ERR_FOUND_DUPP_UNIQUE:
3828 textno=ER_DUP_UNIQUE;
3829 break;
3830 case HA_ERR_RECORD_CHANGED:
3831 /*
3832 This is not fatal error when using HANDLER interface
3833 SET_FATAL_ERROR;
3834 */
3835 textno=ER_CHECKREAD;
3836 break;
3837 case HA_ERR_CRASHED:
3838 SET_FATAL_ERROR;
3839 textno=ER_NOT_KEYFILE;
3840 break;
3841 case HA_ERR_WRONG_IN_RECORD:
3842 SET_FATAL_ERROR;
3843 textno= ER_CRASHED_ON_USAGE;
3844 break;
3845 case HA_ERR_CRASHED_ON_USAGE:
3846 SET_FATAL_ERROR;
3847 textno=ER_CRASHED_ON_USAGE;
3848 break;
3849 case HA_ERR_NOT_A_TABLE:
3850 textno= error;
3851 break;
3852 case HA_ERR_CRASHED_ON_REPAIR:
3853 SET_FATAL_ERROR;
3854 textno=ER_CRASHED_ON_REPAIR;
3855 break;
3856 case HA_ERR_OUT_OF_MEM:
3857 textno=ER_OUT_OF_RESOURCES;
3858 break;
3859 case HA_ERR_WRONG_COMMAND:
3860 my_error(ER_ILLEGAL_HA, MYF(0), table_type(), table_share->db.str,
3861 table_share->table_name.str);
3862 DBUG_VOID_RETURN;
3863 break;
3864 case HA_ERR_OLD_FILE:
3865 textno=ER_OLD_KEYFILE;
3866 break;
3867 case HA_ERR_UNSUPPORTED:
3868 textno=ER_UNSUPPORTED_EXTENSION;
3869 break;
3870 case HA_ERR_RECORD_FILE_FULL:
3871 {
3872 textno=ER_RECORD_FILE_FULL;
3873 /* Write the error message to error log */
3874 errflag|= ME_NOREFRESH;
3875 break;
3876 }
3877 case HA_ERR_INDEX_FILE_FULL:
3878 {
3879 textno=ER_INDEX_FILE_FULL;
3880 /* Write the error message to error log */
3881 errflag|= ME_NOREFRESH;
3882 break;
3883 }
3884 case HA_ERR_LOCK_WAIT_TIMEOUT:
3885 textno=ER_LOCK_WAIT_TIMEOUT;
3886 break;
3887 case HA_ERR_LOCK_TABLE_FULL:
3888 textno=ER_LOCK_TABLE_FULL;
3889 break;
3890 case HA_ERR_LOCK_DEADLOCK:
3891 {
3892 String str, full_err_msg(ER_DEFAULT(ER_LOCK_DEADLOCK), system_charset_info);
3893
3894 get_error_message(error, &str);
3895 full_err_msg.append(str);
3896 my_printf_error(ER_LOCK_DEADLOCK, "%s", errflag, full_err_msg.c_ptr_safe());
3897 DBUG_VOID_RETURN;
3898 }
3899 case HA_ERR_READ_ONLY_TRANSACTION:
3900 textno=ER_READ_ONLY_TRANSACTION;
3901 break;
3902 case HA_ERR_CANNOT_ADD_FOREIGN:
3903 textno=ER_CANNOT_ADD_FOREIGN;
3904 break;
3905 case HA_ERR_ROW_IS_REFERENCED:
3906 {
3907 String str;
3908 get_error_message(error, &str);
3909 my_printf_error(ER_ROW_IS_REFERENCED_2,
3910 ER(str.length() ? ER_ROW_IS_REFERENCED_2 : ER_ROW_IS_REFERENCED),
3911 errflag, str.c_ptr_safe());
3912 DBUG_VOID_RETURN;
3913 }
3914 case HA_ERR_NO_REFERENCED_ROW:
3915 {
3916 String str;
3917 get_error_message(error, &str);
3918 my_printf_error(ER_NO_REFERENCED_ROW_2,
3919 ER(str.length() ? ER_NO_REFERENCED_ROW_2 : ER_NO_REFERENCED_ROW),
3920 errflag, str.c_ptr_safe());
3921 DBUG_VOID_RETURN;
3922 }
3923 case HA_ERR_TABLE_DEF_CHANGED:
3924 textno=ER_TABLE_DEF_CHANGED;
3925 break;
3926 case HA_ERR_NO_SUCH_TABLE:
3927 my_error(ER_NO_SUCH_TABLE_IN_ENGINE, errflag, table_share->db.str,
3928 table_share->table_name.str);
3929 DBUG_VOID_RETURN;
3930 case HA_ERR_RBR_LOGGING_FAILED:
3931 textno= ER_BINLOG_ROW_LOGGING_FAILED;
3932 break;
3933 case HA_ERR_DROP_INDEX_FK:
3934 {
3935 const char *ptr= "???";
3936 uint key_nr= get_dup_key(error);
3937 if ((int) key_nr >= 0)
3938 ptr= table->key_info[key_nr].name.str;
3939 my_error(ER_DROP_INDEX_FK, errflag, ptr);
3940 DBUG_VOID_RETURN;
3941 }
3942 case HA_ERR_TABLE_NEEDS_UPGRADE:
3943 textno= ER_TABLE_NEEDS_UPGRADE;
3944 my_error(ER_TABLE_NEEDS_UPGRADE, errflag,
3945 "TABLE", table_share->table_name.str);
3946 DBUG_VOID_RETURN;
3947 case HA_ERR_NO_PARTITION_FOUND:
3948 textno=ER_WRONG_PARTITION_NAME;
3949 break;
3950 case HA_ERR_TABLE_READONLY:
3951 textno= ER_OPEN_AS_READONLY;
3952 break;
3953 case HA_ERR_AUTOINC_READ_FAILED:
3954 textno= ER_AUTOINC_READ_FAILED;
3955 break;
3956 case HA_ERR_AUTOINC_ERANGE:
3957 textno= error;
3958 my_error(textno, errflag, table->next_number_field->field_name.str,
3959 table->in_use->get_stmt_da()->current_row_for_warning());
3960 DBUG_VOID_RETURN;
3961 break;
3962 case HA_ERR_TOO_MANY_CONCURRENT_TRXS:
3963 textno= ER_TOO_MANY_CONCURRENT_TRXS;
3964 break;
3965 case HA_ERR_INDEX_COL_TOO_LONG:
3966 textno= ER_INDEX_COLUMN_TOO_LONG;
3967 break;
3968 case HA_ERR_NOT_IN_LOCK_PARTITIONS:
3969 textno=ER_ROW_DOES_NOT_MATCH_GIVEN_PARTITION_SET;
3970 break;
3971 case HA_ERR_INDEX_CORRUPT:
3972 textno= ER_INDEX_CORRUPT;
3973 break;
3974 case HA_ERR_UNDO_REC_TOO_BIG:
3975 textno= ER_UNDO_RECORD_TOO_BIG;
3976 break;
3977 case HA_ERR_TABLE_IN_FK_CHECK:
3978 textno= ER_TABLE_IN_FK_CHECK;
3979 break;
3980 case HA_ERR_PARTITION_LIST:
3981 my_error(ER_VERS_NOT_ALLOWED, errflag, table->s->db.str, table->s->table_name.str);
3982 DBUG_VOID_RETURN;
3983 default:
3984 {
3985 /* The error was "unknown" to this function.
3986 Ask handler if it has got a message for this error */
3987 bool temporary= FALSE;
3988 String str;
3989 temporary= get_error_message(error, &str);
3990 if (!str.is_empty())
3991 {
3992 const char* engine= table_type();
3993 if (temporary)
3994 my_error(ER_GET_TEMPORARY_ERRMSG, errflag, error, str.c_ptr(),
3995 engine);
3996 else
3997 {
3998 SET_FATAL_ERROR;
3999 my_error(ER_GET_ERRMSG, errflag, error, str.c_ptr(), engine);
4000 }
4001 }
4002 else
4003 my_error(ER_GET_ERRNO, errflag, error, table_type());
4004 DBUG_VOID_RETURN;
4005 }
4006 }
4007 DBUG_ASSERT(textno > 0);
4008 if (unlikely(fatal_error))
4009 {
4010 /* Ensure this becomes a true error */
4011 errflag&= ~(ME_JUST_WARNING | ME_JUST_INFO);
4012 if ((debug_assert_if_crashed_table ||
4013 global_system_variables.log_warnings > 1))
4014 {
4015 /*
4016 Log error to log before we crash or if extended warnings are requested
4017 */
4018 errflag|= ME_NOREFRESH;
4019 }
4020 }
4021
4022 /* if we got an OS error from a file-based engine, specify a path of error */
4023 if (error < HA_ERR_FIRST && bas_ext()[0])
4024 {
4025 char buff[FN_REFLEN];
4026 strxnmov(buff, sizeof(buff),
4027 table_share->normalized_path.str, bas_ext()[0], NULL);
4028 my_error(textno, errflag, buff, error);
4029 }
4030 else
4031 my_error(textno, errflag, table_share->table_name.str, error);
4032 DBUG_VOID_RETURN;
4033 }
4034
4035
4036 /**
4037 Return an error message specific to this handler.
4038
4039 @param error error code previously returned by handler
4040 @param buf pointer to String where to add error message
4041
4042 @return
4043 Returns true if this is a temporary error
4044 */
get_error_message(int error,String * buf)4045 bool handler::get_error_message(int error, String* buf)
4046 {
4047 DBUG_EXECUTE_IF("external_lock_failure",
4048 buf->set_ascii(STRING_WITH_LEN("KABOOM!")););
4049 return FALSE;
4050 }
4051
4052 /**
4053 Check for incompatible collation changes.
4054
4055 @retval
4056 HA_ADMIN_NEEDS_UPGRADE Table may have data requiring upgrade.
4057 @retval
4058 0 No upgrade required.
4059 */
4060
check_collation_compatibility()4061 int handler::check_collation_compatibility()
4062 {
4063 ulong mysql_version= table->s->mysql_version;
4064
4065 if (mysql_version < 50124)
4066 {
4067 KEY *key= table->key_info;
4068 KEY *key_end= key + table->s->keys;
4069 for (; key < key_end; key++)
4070 {
4071 KEY_PART_INFO *key_part= key->key_part;
4072 KEY_PART_INFO *key_part_end= key_part + key->user_defined_key_parts;
4073 for (; key_part < key_part_end; key_part++)
4074 {
4075 if (!key_part->fieldnr)
4076 continue;
4077 Field *field= table->field[key_part->fieldnr - 1];
4078 uint cs_number= field->charset()->number;
4079 if ((mysql_version < 50048 &&
4080 (cs_number == 11 || /* ascii_general_ci - bug #29499, bug #27562 */
4081 cs_number == 41 || /* latin7_general_ci - bug #29461 */
4082 cs_number == 42 || /* latin7_general_cs - bug #29461 */
4083 cs_number == 20 || /* latin7_estonian_cs - bug #29461 */
4084 cs_number == 21 || /* latin2_hungarian_ci - bug #29461 */
4085 cs_number == 22 || /* koi8u_general_ci - bug #29461 */
4086 cs_number == 23 || /* cp1251_ukrainian_ci - bug #29461 */
4087 cs_number == 26)) || /* cp1250_general_ci - bug #29461 */
4088 (mysql_version < 50124 &&
4089 (cs_number == 33 || /* utf8_general_ci - bug #27877 */
4090 cs_number == 35))) /* ucs2_general_ci - bug #27877 */
4091 return HA_ADMIN_NEEDS_UPGRADE;
4092 }
4093 }
4094 }
4095
4096 return 0;
4097 }
4098
4099
ha_check_for_upgrade(HA_CHECK_OPT * check_opt)4100 int handler::ha_check_for_upgrade(HA_CHECK_OPT *check_opt)
4101 {
4102 int error;
4103 KEY *keyinfo, *keyend;
4104 KEY_PART_INFO *keypart, *keypartend;
4105
4106 if (table->s->incompatible_version)
4107 return HA_ADMIN_NEEDS_ALTER;
4108
4109 if (!table->s->mysql_version)
4110 {
4111 /* check for blob-in-key error */
4112 keyinfo= table->key_info;
4113 keyend= table->key_info + table->s->keys;
4114 for (; keyinfo < keyend; keyinfo++)
4115 {
4116 keypart= keyinfo->key_part;
4117 keypartend= keypart + keyinfo->user_defined_key_parts;
4118 for (; keypart < keypartend; keypart++)
4119 {
4120 if (!keypart->fieldnr)
4121 continue;
4122 Field *field= table->field[keypart->fieldnr-1];
4123 if (field->type() == MYSQL_TYPE_BLOB)
4124 {
4125 if (check_opt->sql_flags & TT_FOR_UPGRADE)
4126 check_opt->flags= T_MEDIUM;
4127 return HA_ADMIN_NEEDS_CHECK;
4128 }
4129 }
4130 }
4131 }
4132 if (table->s->frm_version < FRM_VER_TRUE_VARCHAR)
4133 return HA_ADMIN_NEEDS_ALTER;
4134
4135 if (unlikely((error= check_collation_compatibility())))
4136 return error;
4137
4138 return check_for_upgrade(check_opt);
4139 }
4140
4141
check_old_types()4142 int handler::check_old_types()
4143 {
4144 Field** field;
4145
4146 if (!table->s->mysql_version)
4147 {
4148 /* check for bad DECIMAL field */
4149 for (field= table->field; (*field); field++)
4150 {
4151 if ((*field)->type() == MYSQL_TYPE_NEWDECIMAL)
4152 {
4153 return HA_ADMIN_NEEDS_ALTER;
4154 }
4155 if ((*field)->type() == MYSQL_TYPE_VAR_STRING)
4156 {
4157 return HA_ADMIN_NEEDS_ALTER;
4158 }
4159 }
4160 }
4161 return 0;
4162 }
4163
4164
update_frm_version(TABLE * table)4165 static bool update_frm_version(TABLE *table)
4166 {
4167 char path[FN_REFLEN];
4168 File file;
4169 int result= 1;
4170 DBUG_ENTER("update_frm_version");
4171
4172 /*
4173 No need to update frm version in case table was created or checked
4174 by server with the same version. This also ensures that we do not
4175 update frm version for temporary tables as this code doesn't support
4176 temporary tables.
4177 */
4178 if (table->s->mysql_version == MYSQL_VERSION_ID)
4179 DBUG_RETURN(0);
4180
4181 strxmov(path, table->s->normalized_path.str, reg_ext, NullS);
4182
4183 if ((file= mysql_file_open(key_file_frm,
4184 path, O_RDWR|O_BINARY, MYF(MY_WME))) >= 0)
4185 {
4186 uchar version[4];
4187
4188 int4store(version, MYSQL_VERSION_ID);
4189
4190 if ((result= (int)mysql_file_pwrite(file, (uchar*) version, 4, 51L, MYF_RW)))
4191 goto err;
4192
4193 table->s->mysql_version= MYSQL_VERSION_ID;
4194 }
4195 err:
4196 if (file >= 0)
4197 (void) mysql_file_close(file, MYF(MY_WME));
4198 DBUG_RETURN(result);
4199 }
4200
4201
4202
4203 /**
4204 @return
4205 key if error because of duplicated keys
4206 */
get_dup_key(int error)4207 uint handler::get_dup_key(int error)
4208 {
4209 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4210 m_lock_type != F_UNLCK);
4211 DBUG_ENTER("handler::get_dup_key");
4212 table->file->errkey = (uint) -1;
4213 if (error == HA_ERR_FOUND_DUPP_KEY ||
4214 error == HA_ERR_FOREIGN_DUPLICATE_KEY ||
4215 error == HA_ERR_FOUND_DUPP_UNIQUE || error == HA_ERR_NULL_IN_SPATIAL ||
4216 error == HA_ERR_DROP_INDEX_FK)
4217 table->file->info(HA_STATUS_ERRKEY | HA_STATUS_NO_LOCK);
4218 DBUG_RETURN(table->file->errkey);
4219 }
4220
4221
4222 /**
4223 Delete all files with extension from bas_ext().
4224
4225 @param name Base name of table
4226
4227 @note
4228 We assume that the handler may return more extensions than
4229 was actually used for the file.
4230
4231 @retval
4232 0 If we successfully deleted at least one file from base_ext and
4233 didn't get any other errors than ENOENT
4234 @retval
4235 !0 Error
4236 */
delete_table(const char * name)4237 int handler::delete_table(const char *name)
4238 {
4239 int saved_error= 0;
4240 int error= 0;
4241 int enoent_or_zero;
4242
4243 if (ht->discover_table)
4244 enoent_or_zero= 0; // the table may not exist in the engine, it's ok
4245 else
4246 enoent_or_zero= ENOENT; // the first file of bas_ext() *must* exist
4247
4248 for (const char **ext=bas_ext(); *ext ; ext++)
4249 {
4250 if (mysql_file_delete_with_symlink(key_file_misc, name, *ext, 0))
4251 {
4252 if (my_errno != ENOENT)
4253 {
4254 /*
4255 If error on the first existing file, return the error.
4256 Otherwise delete as much as possible.
4257 */
4258 if (enoent_or_zero)
4259 return my_errno;
4260 saved_error= my_errno;
4261 }
4262 }
4263 else
4264 enoent_or_zero= 0; // No error for ENOENT
4265 error= enoent_or_zero;
4266 }
4267 return saved_error ? saved_error : error;
4268 }
4269
4270
rename_table(const char * from,const char * to)4271 int handler::rename_table(const char * from, const char * to)
4272 {
4273 int error= 0;
4274 const char **ext, **start_ext;
4275 start_ext= bas_ext();
4276 for (ext= start_ext; *ext ; ext++)
4277 {
4278 if (unlikely(rename_file_ext(from, to, *ext)))
4279 {
4280 if ((error=my_errno) != ENOENT)
4281 break;
4282 error= 0;
4283 }
4284 }
4285 if (unlikely(error))
4286 {
4287 /* Try to revert the rename. Ignore errors. */
4288 for (; ext >= start_ext; ext--)
4289 rename_file_ext(to, from, *ext);
4290 }
4291 return error;
4292 }
4293
4294
drop_table(const char * name)4295 void handler::drop_table(const char *name)
4296 {
4297 ha_close();
4298 delete_table(name);
4299 }
4300
4301
4302 /**
4303 Performs checks upon the table.
4304
4305 @param thd thread doing CHECK TABLE operation
4306 @param check_opt options from the parser
4307
4308 @retval
4309 HA_ADMIN_OK Successful upgrade
4310 @retval
4311 HA_ADMIN_NEEDS_UPGRADE Table has structures requiring upgrade
4312 @retval
4313 HA_ADMIN_NEEDS_ALTER Table has structures requiring ALTER TABLE
4314 @retval
4315 HA_ADMIN_NOT_IMPLEMENTED
4316 */
ha_check(THD * thd,HA_CHECK_OPT * check_opt)4317 int handler::ha_check(THD *thd, HA_CHECK_OPT *check_opt)
4318 {
4319 int error;
4320 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4321 m_lock_type != F_UNLCK);
4322
4323 if ((table->s->mysql_version >= MYSQL_VERSION_ID) &&
4324 (check_opt->sql_flags & TT_FOR_UPGRADE))
4325 return 0;
4326
4327 if (table->s->mysql_version < MYSQL_VERSION_ID)
4328 {
4329 if (unlikely((error= check_old_types())))
4330 return error;
4331 error= ha_check_for_upgrade(check_opt);
4332 if (unlikely(error && (error != HA_ADMIN_NEEDS_CHECK)))
4333 return error;
4334 if (unlikely(!error && (check_opt->sql_flags & TT_FOR_UPGRADE)))
4335 return 0;
4336 }
4337 if (unlikely((error= check(thd, check_opt))))
4338 return error;
4339 /* Skip updating frm version if not main handler. */
4340 if (table->file != this)
4341 return error;
4342 return update_frm_version(table);
4343 }
4344
4345 /**
4346 A helper function to mark a transaction read-write,
4347 if it is started.
4348 */
4349
mark_trx_read_write_internal()4350 void handler::mark_trx_read_write_internal()
4351 {
4352 Ha_trx_info *ha_info= &ha_thd()->ha_data[ht->slot].ha_info[0];
4353 /*
4354 When a storage engine method is called, the transaction must
4355 have been started, unless it's a DDL call, for which the
4356 storage engine starts the transaction internally, and commits
4357 it internally, without registering in the ha_list.
4358 Unfortunately here we can't know know for sure if the engine
4359 has registered the transaction or not, so we must check.
4360 */
4361 if (ha_info->is_started())
4362 {
4363 DBUG_ASSERT(has_transaction_manager());
4364 /*
4365 table_share can be NULL in ha_delete_table(). See implementation
4366 of standalone function ha_delete_table() in sql_base.cc.
4367 */
4368 if (table_share == NULL || table_share->tmp_table == NO_TMP_TABLE)
4369 ha_info->set_trx_read_write();
4370 }
4371 }
4372
4373
4374 /**
4375 Repair table: public interface.
4376
4377 @sa handler::repair()
4378 */
4379
ha_repair(THD * thd,HA_CHECK_OPT * check_opt)4380 int handler::ha_repair(THD* thd, HA_CHECK_OPT* check_opt)
4381 {
4382 int result;
4383
4384 mark_trx_read_write();
4385
4386 result= repair(thd, check_opt);
4387 DBUG_ASSERT(result == HA_ADMIN_NOT_IMPLEMENTED ||
4388 ha_table_flags() & HA_CAN_REPAIR);
4389
4390 if (result == HA_ADMIN_OK)
4391 result= update_frm_version(table);
4392 return result;
4393 }
4394
4395
4396 /**
4397 End bulk insert
4398 */
4399
ha_end_bulk_insert()4400 int handler::ha_end_bulk_insert()
4401 {
4402 DBUG_ENTER("handler::ha_end_bulk_insert");
4403 DBUG_EXECUTE_IF("crash_end_bulk_insert",
4404 { extra(HA_EXTRA_FLUSH) ; DBUG_SUICIDE();});
4405 estimation_rows_to_insert= 0;
4406 DBUG_RETURN(end_bulk_insert());
4407 }
4408
4409 /**
4410 Bulk update row: public interface.
4411
4412 @sa handler::bulk_update_row()
4413 */
4414
4415 int
ha_bulk_update_row(const uchar * old_data,const uchar * new_data,ha_rows * dup_key_found)4416 handler::ha_bulk_update_row(const uchar *old_data, const uchar *new_data,
4417 ha_rows *dup_key_found)
4418 {
4419 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4420 m_lock_type == F_WRLCK);
4421 mark_trx_read_write();
4422
4423 return bulk_update_row(old_data, new_data, dup_key_found);
4424 }
4425
4426
4427 /**
4428 Delete all rows: public interface.
4429
4430 @sa handler::delete_all_rows()
4431 */
4432
4433 int
ha_delete_all_rows()4434 handler::ha_delete_all_rows()
4435 {
4436 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4437 m_lock_type == F_WRLCK);
4438 mark_trx_read_write();
4439
4440 return delete_all_rows();
4441 }
4442
4443
4444 /**
4445 Truncate table: public interface.
4446
4447 @sa handler::truncate()
4448 */
4449
4450 int
ha_truncate()4451 handler::ha_truncate()
4452 {
4453 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4454 m_lock_type == F_WRLCK);
4455 mark_trx_read_write();
4456
4457 return truncate();
4458 }
4459
4460
4461 /**
4462 Reset auto increment: public interface.
4463
4464 @sa handler::reset_auto_increment()
4465 */
4466
4467 int
ha_reset_auto_increment(ulonglong value)4468 handler::ha_reset_auto_increment(ulonglong value)
4469 {
4470 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4471 m_lock_type == F_WRLCK);
4472 mark_trx_read_write();
4473
4474 return reset_auto_increment(value);
4475 }
4476
4477
4478 /**
4479 Optimize table: public interface.
4480
4481 @sa handler::optimize()
4482 */
4483
4484 int
ha_optimize(THD * thd,HA_CHECK_OPT * check_opt)4485 handler::ha_optimize(THD* thd, HA_CHECK_OPT* check_opt)
4486 {
4487 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4488 m_lock_type == F_WRLCK);
4489 mark_trx_read_write();
4490
4491 return optimize(thd, check_opt);
4492 }
4493
4494
4495 /**
4496 Analyze table: public interface.
4497
4498 @sa handler::analyze()
4499 */
4500
4501 int
ha_analyze(THD * thd,HA_CHECK_OPT * check_opt)4502 handler::ha_analyze(THD* thd, HA_CHECK_OPT* check_opt)
4503 {
4504 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4505 m_lock_type != F_UNLCK);
4506 mark_trx_read_write();
4507
4508 return analyze(thd, check_opt);
4509 }
4510
4511
4512 /**
4513 Check and repair table: public interface.
4514
4515 @sa handler::check_and_repair()
4516 */
4517
4518 bool
ha_check_and_repair(THD * thd)4519 handler::ha_check_and_repair(THD *thd)
4520 {
4521 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4522 m_lock_type == F_UNLCK);
4523 mark_trx_read_write();
4524
4525 return check_and_repair(thd);
4526 }
4527
4528
4529 /**
4530 Disable indexes: public interface.
4531
4532 @sa handler::disable_indexes()
4533 */
4534
4535 int
ha_disable_indexes(uint mode)4536 handler::ha_disable_indexes(uint mode)
4537 {
4538 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4539 m_lock_type != F_UNLCK);
4540 mark_trx_read_write();
4541
4542 return disable_indexes(mode);
4543 }
4544
4545
4546 /**
4547 Enable indexes: public interface.
4548
4549 @sa handler::enable_indexes()
4550 */
4551
4552 int
ha_enable_indexes(uint mode)4553 handler::ha_enable_indexes(uint mode)
4554 {
4555 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4556 m_lock_type != F_UNLCK);
4557 mark_trx_read_write();
4558
4559 return enable_indexes(mode);
4560 }
4561
4562
4563 /**
4564 Discard or import tablespace: public interface.
4565
4566 @sa handler::discard_or_import_tablespace()
4567 */
4568
4569 int
ha_discard_or_import_tablespace(my_bool discard)4570 handler::ha_discard_or_import_tablespace(my_bool discard)
4571 {
4572 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4573 m_lock_type == F_WRLCK);
4574 mark_trx_read_write();
4575
4576 return discard_or_import_tablespace(discard);
4577 }
4578
4579
ha_prepare_inplace_alter_table(TABLE * altered_table,Alter_inplace_info * ha_alter_info)4580 bool handler::ha_prepare_inplace_alter_table(TABLE *altered_table,
4581 Alter_inplace_info *ha_alter_info)
4582 {
4583 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4584 m_lock_type != F_UNLCK);
4585 mark_trx_read_write();
4586
4587 return prepare_inplace_alter_table(altered_table, ha_alter_info);
4588 }
4589
4590
ha_commit_inplace_alter_table(TABLE * altered_table,Alter_inplace_info * ha_alter_info,bool commit)4591 bool handler::ha_commit_inplace_alter_table(TABLE *altered_table,
4592 Alter_inplace_info *ha_alter_info,
4593 bool commit)
4594 {
4595 /*
4596 At this point we should have an exclusive metadata lock on the table.
4597 The exception is if we're about to roll back changes (commit= false).
4598 In this case, we might be rolling back after a failed lock upgrade,
4599 so we could be holding the same lock level as for inplace_alter_table().
4600 */
4601 DBUG_ASSERT(ha_thd()->mdl_context.is_lock_owner(MDL_key::TABLE,
4602 table->s->db.str,
4603 table->s->table_name.str,
4604 MDL_EXCLUSIVE) ||
4605 !commit);
4606
4607 return commit_inplace_alter_table(altered_table, ha_alter_info, commit);
4608 }
4609
4610
4611 /*
4612 Default implementation to support in-place alter table
4613 and old online add/drop index API
4614 */
4615
4616 enum_alter_inplace_result
check_if_supported_inplace_alter(TABLE * altered_table,Alter_inplace_info * ha_alter_info)4617 handler::check_if_supported_inplace_alter(TABLE *altered_table,
4618 Alter_inplace_info *ha_alter_info)
4619 {
4620 DBUG_ENTER("handler::check_if_supported_inplace_alter");
4621
4622 HA_CREATE_INFO *create_info= ha_alter_info->create_info;
4623
4624 if (altered_table->versioned(VERS_TIMESTAMP))
4625 DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
4626
4627 alter_table_operations inplace_offline_operations=
4628 ALTER_COLUMN_EQUAL_PACK_LENGTH |
4629 ALTER_COLUMN_NAME |
4630 ALTER_RENAME_COLUMN |
4631 ALTER_CHANGE_COLUMN_DEFAULT |
4632 ALTER_COLUMN_DEFAULT |
4633 ALTER_COLUMN_OPTION |
4634 ALTER_CHANGE_CREATE_OPTION |
4635 ALTER_DROP_CHECK_CONSTRAINT |
4636 ALTER_PARTITIONED |
4637 ALTER_VIRTUAL_GCOL_EXPR |
4638 ALTER_RENAME;
4639
4640 /* Is there at least one operation that requires copy algorithm? */
4641 if (ha_alter_info->handler_flags & ~inplace_offline_operations)
4642 DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
4643
4644 /*
4645 The following checks for changes related to ALTER_OPTIONS
4646
4647 ALTER TABLE tbl_name CONVERT TO CHARACTER SET .. and
4648 ALTER TABLE table_name DEFAULT CHARSET = .. most likely
4649 change column charsets and so not supported in-place through
4650 old API.
4651
4652 Changing of PACK_KEYS, MAX_ROWS and ROW_FORMAT options were
4653 not supported as in-place operations in old API either.
4654 */
4655 if (create_info->used_fields & (HA_CREATE_USED_CHARSET |
4656 HA_CREATE_USED_DEFAULT_CHARSET |
4657 HA_CREATE_USED_PACK_KEYS |
4658 HA_CREATE_USED_CHECKSUM |
4659 HA_CREATE_USED_MAX_ROWS) ||
4660 (table->s->row_type != create_info->row_type))
4661 DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
4662
4663 uint table_changes= (ha_alter_info->handler_flags &
4664 ALTER_COLUMN_EQUAL_PACK_LENGTH) ?
4665 IS_EQUAL_PACK_LENGTH : IS_EQUAL_YES;
4666 if (table->file->check_if_incompatible_data(create_info, table_changes)
4667 == COMPATIBLE_DATA_YES)
4668 DBUG_RETURN(HA_ALTER_INPLACE_NO_LOCK);
4669
4670 DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
4671 }
4672
report_unsupported_error(const char * not_supported,const char * try_instead) const4673 void Alter_inplace_info::report_unsupported_error(const char *not_supported,
4674 const char *try_instead) const
4675 {
4676 if (unsupported_reason == NULL)
4677 my_error(ER_ALTER_OPERATION_NOT_SUPPORTED, MYF(0),
4678 not_supported, try_instead);
4679 else
4680 my_error(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON, MYF(0),
4681 not_supported, unsupported_reason, try_instead);
4682 }
4683
4684
4685 /**
4686 Rename table: public interface.
4687
4688 @sa handler::rename_table()
4689 */
4690
4691 int
ha_rename_table(const char * from,const char * to)4692 handler::ha_rename_table(const char *from, const char *to)
4693 {
4694 DBUG_ASSERT(m_lock_type == F_UNLCK);
4695 mark_trx_read_write();
4696
4697 return rename_table(from, to);
4698 }
4699
4700
4701 /**
4702 Delete table: public interface.
4703
4704 @sa handler::delete_table()
4705 */
4706
4707 int
ha_delete_table(const char * name)4708 handler::ha_delete_table(const char *name)
4709 {
4710 mark_trx_read_write();
4711 return delete_table(name);
4712 }
4713
4714
4715 /**
4716 Drop table in the engine: public interface.
4717
4718 @sa handler::drop_table()
4719
4720 The difference between this and delete_table() is that the table is open in
4721 drop_table().
4722 */
4723
4724 void
ha_drop_table(const char * name)4725 handler::ha_drop_table(const char *name)
4726 {
4727 DBUG_ASSERT(m_lock_type == F_UNLCK);
4728 mark_trx_read_write();
4729
4730 return drop_table(name);
4731 }
4732
4733
4734 /**
4735 Create a table in the engine: public interface.
4736
4737 @sa handler::create()
4738 */
4739
4740 int
ha_create(const char * name,TABLE * form,HA_CREATE_INFO * info_arg)4741 handler::ha_create(const char *name, TABLE *form, HA_CREATE_INFO *info_arg)
4742 {
4743 DBUG_ASSERT(m_lock_type == F_UNLCK);
4744 mark_trx_read_write();
4745 int error= create(name, form, info_arg);
4746 if (!error &&
4747 !(info_arg->options & (HA_LEX_CREATE_TMP_TABLE | HA_CREATE_TMP_ALTER)))
4748 mysql_audit_create_table(form);
4749 return error;
4750 }
4751
4752
4753 /**
4754 Create handler files for CREATE TABLE: public interface.
4755
4756 @sa handler::create_partitioning_metadata()
4757 */
4758
4759 int
ha_create_partitioning_metadata(const char * name,const char * old_name,int action_flag)4760 handler::ha_create_partitioning_metadata(const char *name,
4761 const char *old_name,
4762 int action_flag)
4763 {
4764 /*
4765 Normally this is done when unlocked, but in fast_alter_partition_table,
4766 it is done on an already locked handler when preparing to alter/rename
4767 partitions.
4768 */
4769 DBUG_ASSERT(m_lock_type == F_UNLCK ||
4770 (!old_name && strcmp(name, table_share->path.str)));
4771
4772
4773 mark_trx_read_write();
4774 return create_partitioning_metadata(name, old_name, action_flag);
4775 }
4776
4777
4778 /**
4779 Change partitions: public interface.
4780
4781 @sa handler::change_partitions()
4782 */
4783
4784 int
ha_change_partitions(HA_CREATE_INFO * create_info,const char * path,ulonglong * const copied,ulonglong * const deleted,const uchar * pack_frm_data,size_t pack_frm_len)4785 handler::ha_change_partitions(HA_CREATE_INFO *create_info,
4786 const char *path,
4787 ulonglong * const copied,
4788 ulonglong * const deleted,
4789 const uchar *pack_frm_data,
4790 size_t pack_frm_len)
4791 {
4792 /*
4793 Must have at least RDLCK or be a TMP table. Read lock is needed to read
4794 from current partitions and write lock will be taken on new partitions.
4795 */
4796 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
4797 m_lock_type != F_UNLCK);
4798
4799 mark_trx_read_write();
4800
4801 return change_partitions(create_info, path, copied, deleted,
4802 pack_frm_data, pack_frm_len);
4803 }
4804
4805
4806 /**
4807 Drop partitions: public interface.
4808
4809 @sa handler::drop_partitions()
4810 */
4811
4812 int
ha_drop_partitions(const char * path)4813 handler::ha_drop_partitions(const char *path)
4814 {
4815 DBUG_ASSERT(!table->db_stat);
4816
4817 mark_trx_read_write();
4818
4819 return drop_partitions(path);
4820 }
4821
4822
4823 /**
4824 Rename partitions: public interface.
4825
4826 @sa handler::rename_partitions()
4827 */
4828
4829 int
ha_rename_partitions(const char * path)4830 handler::ha_rename_partitions(const char *path)
4831 {
4832 DBUG_ASSERT(!table->db_stat);
4833
4834 mark_trx_read_write();
4835
4836 return rename_partitions(path);
4837 }
4838
4839
4840 /**
4841 Tell the storage engine that it is allowed to "disable transaction" in the
4842 handler. It is a hint that ACID is not required - it was used in NDB for
4843 ALTER TABLE, for example, when data are copied to temporary table.
4844 A storage engine may treat this hint any way it likes. NDB for example
4845 started to commit every now and then automatically.
4846 This hint can be safely ignored.
4847 */
ha_enable_transaction(THD * thd,bool on)4848 int ha_enable_transaction(THD *thd, bool on)
4849 {
4850 int error=0;
4851 DBUG_ENTER("ha_enable_transaction");
4852 DBUG_PRINT("enter", ("on: %d", (int) on));
4853
4854 if ((thd->transaction.on= on))
4855 {
4856 /*
4857 Now all storage engines should have transaction handling enabled.
4858 But some may have it enabled all the time - "disabling" transactions
4859 is an optimization hint that storage engine is free to ignore.
4860 So, let's commit an open transaction (if any) now.
4861 */
4862 if (likely(!(error= ha_commit_trans(thd, 0))))
4863 error= trans_commit_implicit(thd);
4864 }
4865 DBUG_RETURN(error);
4866 }
4867
index_next_same(uchar * buf,const uchar * key,uint keylen)4868 int handler::index_next_same(uchar *buf, const uchar *key, uint keylen)
4869 {
4870 int error;
4871 DBUG_ENTER("handler::index_next_same");
4872 if (!(error=index_next(buf)))
4873 {
4874 my_ptrdiff_t ptrdiff= buf - table->record[0];
4875 uchar *UNINIT_VAR(save_record_0);
4876 KEY *UNINIT_VAR(key_info);
4877 KEY_PART_INFO *UNINIT_VAR(key_part);
4878 KEY_PART_INFO *UNINIT_VAR(key_part_end);
4879
4880 /*
4881 key_cmp_if_same() compares table->record[0] against 'key'.
4882 In parts it uses table->record[0] directly, in parts it uses
4883 field objects with their local pointers into table->record[0].
4884 If 'buf' is distinct from table->record[0], we need to move
4885 all record references. This is table->record[0] itself and
4886 the field pointers of the fields used in this key.
4887 */
4888 if (ptrdiff)
4889 {
4890 save_record_0= table->record[0];
4891 table->record[0]= buf;
4892 key_info= table->key_info + active_index;
4893 key_part= key_info->key_part;
4894 key_part_end= key_part + key_info->user_defined_key_parts;
4895 for (; key_part < key_part_end; key_part++)
4896 {
4897 DBUG_ASSERT(key_part->field);
4898 key_part->field->move_field_offset(ptrdiff);
4899 }
4900 }
4901
4902 if (key_cmp_if_same(table, key, active_index, keylen))
4903 {
4904 table->status=STATUS_NOT_FOUND;
4905 error=HA_ERR_END_OF_FILE;
4906 }
4907
4908 /* Move back if necessary. */
4909 if (ptrdiff)
4910 {
4911 table->record[0]= save_record_0;
4912 for (key_part= key_info->key_part; key_part < key_part_end; key_part++)
4913 key_part->field->move_field_offset(-ptrdiff);
4914 }
4915 }
4916 DBUG_PRINT("return",("%i", error));
4917 DBUG_RETURN(error);
4918 }
4919
4920
get_dynamic_partition_info(PARTITION_STATS * stat_info,uint part_id)4921 void handler::get_dynamic_partition_info(PARTITION_STATS *stat_info,
4922 uint part_id)
4923 {
4924 info(HA_STATUS_CONST | HA_STATUS_TIME | HA_STATUS_VARIABLE |
4925 HA_STATUS_NO_LOCK);
4926 stat_info->records= stats.records;
4927 stat_info->mean_rec_length= stats.mean_rec_length;
4928 stat_info->data_file_length= stats.data_file_length;
4929 stat_info->max_data_file_length= stats.max_data_file_length;
4930 stat_info->index_file_length= stats.index_file_length;
4931 stat_info->max_index_file_length=stats.max_index_file_length;
4932 stat_info->delete_length= stats.delete_length;
4933 stat_info->create_time= stats.create_time;
4934 stat_info->update_time= stats.update_time;
4935 stat_info->check_time= stats.check_time;
4936 stat_info->check_sum= stats.checksum;
4937 }
4938
4939
4940 /*
4941 Updates the global table stats with the TABLE this handler represents
4942 */
4943
update_global_table_stats()4944 void handler::update_global_table_stats()
4945 {
4946 TABLE_STATS * table_stats;
4947
4948 status_var_add(table->in_use->status_var.rows_read, rows_read);
4949 DBUG_ASSERT(rows_tmp_read == 0);
4950
4951 if (!table->in_use->userstat_running)
4952 {
4953 rows_read= rows_changed= 0;
4954 return;
4955 }
4956
4957 if (rows_read + rows_changed == 0)
4958 return; // Nothing to update.
4959
4960 DBUG_ASSERT(table->s);
4961 DBUG_ASSERT(table->s->table_cache_key.str);
4962
4963 mysql_mutex_lock(&LOCK_global_table_stats);
4964 /* Gets the global table stats, creating one if necessary. */
4965 if (!(table_stats= (TABLE_STATS*)
4966 my_hash_search(&global_table_stats,
4967 (uchar*) table->s->table_cache_key.str,
4968 table->s->table_cache_key.length)))
4969 {
4970 if (!(table_stats = ((TABLE_STATS*)
4971 my_malloc(sizeof(TABLE_STATS),
4972 MYF(MY_WME | MY_ZEROFILL)))))
4973 {
4974 /* Out of memory error already given */
4975 goto end;
4976 }
4977 memcpy(table_stats->table, table->s->table_cache_key.str,
4978 table->s->table_cache_key.length);
4979 table_stats->table_name_length= (uint)table->s->table_cache_key.length;
4980 table_stats->engine_type= ht->db_type;
4981 /* No need to set variables to 0, as we use MY_ZEROFILL above */
4982
4983 if (my_hash_insert(&global_table_stats, (uchar*) table_stats))
4984 {
4985 /* Out of memory error is already given */
4986 my_free(table_stats);
4987 goto end;
4988 }
4989 }
4990 // Updates the global table stats.
4991 table_stats->rows_read+= rows_read;
4992 table_stats->rows_changed+= rows_changed;
4993 table_stats->rows_changed_x_indexes+= (rows_changed *
4994 (table->s->keys ? table->s->keys :
4995 1));
4996 rows_read= rows_changed= 0;
4997 end:
4998 mysql_mutex_unlock(&LOCK_global_table_stats);
4999 }
5000
5001
5002 /*
5003 Updates the global index stats with this handler's accumulated index reads.
5004 */
5005
update_global_index_stats()5006 void handler::update_global_index_stats()
5007 {
5008 DBUG_ASSERT(table->s);
5009
5010 if (!table->in_use->userstat_running)
5011 {
5012 /* Reset all index read values */
5013 bzero(index_rows_read, sizeof(index_rows_read[0]) * table->s->keys);
5014 return;
5015 }
5016
5017 for (uint index = 0; index < table->s->keys; index++)
5018 {
5019 if (index_rows_read[index])
5020 {
5021 INDEX_STATS* index_stats;
5022 size_t key_length;
5023 KEY *key_info = &table->key_info[index]; // Rows were read using this
5024
5025 DBUG_ASSERT(key_info->cache_name);
5026 if (!key_info->cache_name)
5027 continue;
5028 key_length= table->s->table_cache_key.length + key_info->name.length + 1;
5029 mysql_mutex_lock(&LOCK_global_index_stats);
5030 // Gets the global index stats, creating one if necessary.
5031 if (!(index_stats= (INDEX_STATS*) my_hash_search(&global_index_stats,
5032 key_info->cache_name,
5033 key_length)))
5034 {
5035 if (!(index_stats = ((INDEX_STATS*)
5036 my_malloc(sizeof(INDEX_STATS),
5037 MYF(MY_WME | MY_ZEROFILL)))))
5038 goto end; // Error is already given
5039
5040 memcpy(index_stats->index, key_info->cache_name, key_length);
5041 index_stats->index_name_length= key_length;
5042 if (my_hash_insert(&global_index_stats, (uchar*) index_stats))
5043 {
5044 my_free(index_stats);
5045 goto end;
5046 }
5047 }
5048 /* Updates the global index stats. */
5049 index_stats->rows_read+= index_rows_read[index];
5050 index_rows_read[index]= 0;
5051 end:
5052 mysql_mutex_unlock(&LOCK_global_index_stats);
5053 }
5054 }
5055 }
5056
5057
flush_checksum(ha_checksum * row_crc,uchar ** checksum_start,size_t * checksum_length)5058 static void flush_checksum(ha_checksum *row_crc, uchar **checksum_start,
5059 size_t *checksum_length)
5060 {
5061 if (*checksum_start)
5062 {
5063 *row_crc= my_checksum(*row_crc, *checksum_start, *checksum_length);
5064 *checksum_start= NULL;
5065 *checksum_length= 0;
5066 }
5067 }
5068
5069
5070 /* calculating table's checksum */
calculate_checksum()5071 int handler::calculate_checksum()
5072 {
5073 int error;
5074 THD *thd=ha_thd();
5075 DBUG_ASSERT(table->s->last_null_bit_pos < 8);
5076 uchar null_mask= table->s->last_null_bit_pos
5077 ? 256 - (1 << table->s->last_null_bit_pos) : 0;
5078
5079 table->use_all_columns();
5080 stats.checksum= 0;
5081
5082 if ((error= ha_rnd_init(1)))
5083 return error;
5084
5085 for (;;)
5086 {
5087 if (thd->killed)
5088 return HA_ERR_ABORTED_BY_USER;
5089
5090 ha_checksum row_crc= 0;
5091 error= table->file->ha_rnd_next(table->record[0]);
5092 if (error)
5093 break;
5094
5095 if (table->s->null_bytes)
5096 {
5097 /* fix undefined null bits */
5098 table->record[0][table->s->null_bytes-1] |= null_mask;
5099 if (!(table->s->db_create_options & HA_OPTION_PACK_RECORD))
5100 table->record[0][0] |= 1;
5101
5102 row_crc= my_checksum(row_crc, table->record[0], table->s->null_bytes);
5103 }
5104
5105 uchar *checksum_start= NULL;
5106 size_t checksum_length= 0;
5107 for (uint i= 0; i < table->s->fields; i++ )
5108 {
5109 Field *f= table->field[i];
5110
5111 if (! thd->variables.old_mode && f->is_real_null(0))
5112 {
5113 flush_checksum(&row_crc, &checksum_start, &checksum_length);
5114 continue;
5115 }
5116 /*
5117 BLOB and VARCHAR have pointers in their field, we must convert
5118 to string; GEOMETRY is implemented on top of BLOB.
5119 BIT may store its data among NULL bits, convert as well.
5120 */
5121 switch (f->type()) {
5122 case MYSQL_TYPE_BLOB:
5123 case MYSQL_TYPE_VARCHAR:
5124 case MYSQL_TYPE_GEOMETRY:
5125 case MYSQL_TYPE_BIT:
5126 {
5127 flush_checksum(&row_crc, &checksum_start, &checksum_length);
5128 String tmp;
5129 f->val_str(&tmp);
5130 row_crc= my_checksum(row_crc, (uchar*) tmp.ptr(), tmp.length());
5131 break;
5132 }
5133 default:
5134 if (!checksum_start)
5135 checksum_start= f->ptr;
5136 DBUG_ASSERT(checksum_start + checksum_length == f->ptr);
5137 checksum_length+= f->pack_length();
5138 break;
5139 }
5140 }
5141 flush_checksum(&row_crc, &checksum_start, &checksum_length);
5142
5143 stats.checksum+= row_crc;
5144 }
5145 table->file->ha_rnd_end();
5146 return error == HA_ERR_END_OF_FILE ? 0 : error;
5147 }
5148
5149
5150 /****************************************************************************
5151 ** Some general functions that isn't in the handler class
5152 ****************************************************************************/
5153
5154 /**
5155 Initiates table-file and calls appropriate database-creator.
5156
5157 @retval
5158 0 ok
5159 @retval
5160 1 error
5161 */
ha_create_table(THD * thd,const char * path,const char * db,const char * table_name,HA_CREATE_INFO * create_info,LEX_CUSTRING * frm)5162 int ha_create_table(THD *thd, const char *path,
5163 const char *db, const char *table_name,
5164 HA_CREATE_INFO *create_info, LEX_CUSTRING *frm)
5165 {
5166 int error= 1;
5167 TABLE table;
5168 char name_buff[FN_REFLEN];
5169 const char *name;
5170 TABLE_SHARE share;
5171 Abort_on_warning_instant_set old_abort_on_warning(thd, 0);
5172 bool temp_table __attribute__((unused)) =
5173 create_info->options & (HA_LEX_CREATE_TMP_TABLE | HA_CREATE_TMP_ALTER);
5174 DBUG_ENTER("ha_create_table");
5175
5176 init_tmp_table_share(thd, &share, db, 0, table_name, path);
5177
5178 if (frm)
5179 {
5180 bool write_frm_now= !create_info->db_type->discover_table &&
5181 !create_info->tmp_table();
5182
5183 share.frm_image= frm;
5184
5185 // open an frm image
5186 if (share.init_from_binary_frm_image(thd, write_frm_now,
5187 frm->str, frm->length))
5188 goto err;
5189 }
5190 else
5191 {
5192 // open an frm file
5193 share.db_plugin= ha_lock_engine(thd, create_info->db_type);
5194
5195 if (open_table_def(thd, &share))
5196 goto err;
5197 }
5198
5199 share.m_psi= PSI_CALL_get_table_share(temp_table, &share);
5200
5201 if (open_table_from_share(thd, &share, &empty_clex_str, 0, READ_ALL, 0,
5202 &table, true))
5203 goto err;
5204
5205 update_create_info_from_table(create_info, &table);
5206
5207 name= get_canonical_filename(table.file, share.path.str, name_buff);
5208
5209 error= table.file->ha_create(name, &table, create_info);
5210
5211 if (unlikely(error))
5212 {
5213 if (!thd->is_error())
5214 my_error(ER_CANT_CREATE_TABLE, MYF(0), db, table_name, error);
5215 table.file->print_error(error, MYF(ME_JUST_WARNING));
5216 PSI_CALL_drop_table_share(temp_table, share.db.str, (uint)share.db.length,
5217 share.table_name.str, (uint)share.table_name.length);
5218 }
5219
5220 (void) closefrm(&table);
5221
5222 err:
5223 free_table_share(&share);
5224 DBUG_RETURN(error != 0);
5225 }
5226
init()5227 void st_ha_check_opt::init()
5228 {
5229 flags= sql_flags= 0;
5230 start_time= my_time(0);
5231 }
5232
5233
5234 /*****************************************************************************
5235 Key cache handling.
5236
5237 This code is only relevant for ISAM/MyISAM tables
5238
5239 key_cache->cache may be 0 only in the case where a key cache is not
5240 initialized or when we where not able to init the key cache in a previous
5241 call to ha_init_key_cache() (probably out of memory)
5242 *****************************************************************************/
5243
5244 /**
5245 Init a key cache if it has not been initied before.
5246 */
ha_init_key_cache(const char * name,KEY_CACHE * key_cache,void * unused)5247 int ha_init_key_cache(const char *name, KEY_CACHE *key_cache, void *unused
5248 __attribute__((unused)))
5249 {
5250 DBUG_ENTER("ha_init_key_cache");
5251
5252 if (!key_cache->key_cache_inited)
5253 {
5254 mysql_mutex_lock(&LOCK_global_system_variables);
5255 size_t tmp_buff_size= (size_t) key_cache->param_buff_size;
5256 uint tmp_block_size= (uint) key_cache->param_block_size;
5257 uint division_limit= (uint)key_cache->param_division_limit;
5258 uint age_threshold= (uint)key_cache->param_age_threshold;
5259 uint partitions= (uint)key_cache->param_partitions;
5260 uint changed_blocks_hash_size= (uint)key_cache->changed_blocks_hash_size;
5261 mysql_mutex_unlock(&LOCK_global_system_variables);
5262 DBUG_RETURN(!init_key_cache(key_cache,
5263 tmp_block_size,
5264 tmp_buff_size,
5265 division_limit, age_threshold,
5266 changed_blocks_hash_size,
5267 partitions));
5268 }
5269 DBUG_RETURN(0);
5270 }
5271
5272
5273 /**
5274 Resize key cache.
5275 */
ha_resize_key_cache(KEY_CACHE * key_cache)5276 int ha_resize_key_cache(KEY_CACHE *key_cache)
5277 {
5278 DBUG_ENTER("ha_resize_key_cache");
5279
5280 if (key_cache->key_cache_inited)
5281 {
5282 mysql_mutex_lock(&LOCK_global_system_variables);
5283 size_t tmp_buff_size= (size_t) key_cache->param_buff_size;
5284 long tmp_block_size= (long) key_cache->param_block_size;
5285 uint division_limit= (uint)key_cache->param_division_limit;
5286 uint age_threshold= (uint)key_cache->param_age_threshold;
5287 uint changed_blocks_hash_size= (uint)key_cache->changed_blocks_hash_size;
5288 mysql_mutex_unlock(&LOCK_global_system_variables);
5289 DBUG_RETURN(!resize_key_cache(key_cache, tmp_block_size,
5290 tmp_buff_size,
5291 division_limit, age_threshold,
5292 changed_blocks_hash_size));
5293 }
5294 DBUG_RETURN(0);
5295 }
5296
5297
5298 /**
5299 Change parameters for key cache (like division_limit)
5300 */
ha_change_key_cache_param(KEY_CACHE * key_cache)5301 int ha_change_key_cache_param(KEY_CACHE *key_cache)
5302 {
5303 DBUG_ENTER("ha_change_key_cache_param");
5304
5305 if (key_cache->key_cache_inited)
5306 {
5307 mysql_mutex_lock(&LOCK_global_system_variables);
5308 uint division_limit= (uint)key_cache->param_division_limit;
5309 uint age_threshold= (uint)key_cache->param_age_threshold;
5310 mysql_mutex_unlock(&LOCK_global_system_variables);
5311 change_key_cache_param(key_cache, division_limit, age_threshold);
5312 }
5313 DBUG_RETURN(0);
5314 }
5315
5316
5317 /**
5318 Repartition key cache
5319 */
ha_repartition_key_cache(KEY_CACHE * key_cache)5320 int ha_repartition_key_cache(KEY_CACHE *key_cache)
5321 {
5322 DBUG_ENTER("ha_repartition_key_cache");
5323
5324 if (key_cache->key_cache_inited)
5325 {
5326 mysql_mutex_lock(&LOCK_global_system_variables);
5327 size_t tmp_buff_size= (size_t) key_cache->param_buff_size;
5328 long tmp_block_size= (long) key_cache->param_block_size;
5329 uint division_limit= (uint)key_cache->param_division_limit;
5330 uint age_threshold= (uint)key_cache->param_age_threshold;
5331 uint partitions= (uint)key_cache->param_partitions;
5332 uint changed_blocks_hash_size= (uint)key_cache->changed_blocks_hash_size;
5333 mysql_mutex_unlock(&LOCK_global_system_variables);
5334 DBUG_RETURN(!repartition_key_cache(key_cache, tmp_block_size,
5335 tmp_buff_size,
5336 division_limit, age_threshold,
5337 changed_blocks_hash_size,
5338 partitions));
5339 }
5340 DBUG_RETURN(0);
5341 }
5342
5343
5344 /**
5345 Move all tables from one key cache to another one.
5346 */
ha_change_key_cache(KEY_CACHE * old_key_cache,KEY_CACHE * new_key_cache)5347 int ha_change_key_cache(KEY_CACHE *old_key_cache,
5348 KEY_CACHE *new_key_cache)
5349 {
5350 mi_change_key_cache(old_key_cache, new_key_cache);
5351 return 0;
5352 }
5353
5354
discover_handlerton(THD * thd,plugin_ref plugin,void * arg)5355 static my_bool discover_handlerton(THD *thd, plugin_ref plugin,
5356 void *arg)
5357 {
5358 TABLE_SHARE *share= (TABLE_SHARE *)arg;
5359 handlerton *hton= plugin_hton(plugin);
5360 if (hton->state == SHOW_OPTION_YES && hton->discover_table)
5361 {
5362 share->db_plugin= plugin;
5363 int error= hton->discover_table(hton, thd, share);
5364 if (error != HA_ERR_NO_SUCH_TABLE)
5365 {
5366 if (unlikely(error))
5367 {
5368 if (!share->error)
5369 {
5370 share->error= OPEN_FRM_ERROR_ALREADY_ISSUED;
5371 plugin_unlock(0, share->db_plugin);
5372 }
5373
5374 /*
5375 report an error, unless it is "generic" and a more
5376 specific one was already reported
5377 */
5378 if (error != HA_ERR_GENERIC || !thd->is_error())
5379 my_error(ER_GET_ERRNO, MYF(0), error, plugin_name(plugin)->str);
5380 share->db_plugin= 0;
5381 }
5382 else
5383 share->error= OPEN_FRM_OK;
5384
5385 status_var_increment(thd->status_var.ha_discover_count);
5386 return TRUE; // abort the search
5387 }
5388 share->db_plugin= 0;
5389 }
5390
5391 DBUG_ASSERT(share->error == OPEN_FRM_OPEN_ERROR);
5392 return FALSE; // continue with the next engine
5393 }
5394
ha_discover_table(THD * thd,TABLE_SHARE * share)5395 int ha_discover_table(THD *thd, TABLE_SHARE *share)
5396 {
5397 DBUG_ENTER("ha_discover_table");
5398 int found;
5399
5400 DBUG_ASSERT(share->error == OPEN_FRM_OPEN_ERROR); // share is not OK yet
5401
5402 if (!engines_with_discover)
5403 found= FALSE;
5404 else if (share->db_plugin)
5405 found= discover_handlerton(thd, share->db_plugin, share);
5406 else
5407 found= plugin_foreach(thd, discover_handlerton,
5408 MYSQL_STORAGE_ENGINE_PLUGIN, share);
5409
5410 if (!found)
5411 open_table_error(share, OPEN_FRM_OPEN_ERROR, ENOENT); // not found
5412
5413 DBUG_RETURN(share->error != OPEN_FRM_OK);
5414 }
5415
file_ext_exists(char * path,size_t path_len,const char * ext)5416 static my_bool file_ext_exists(char *path, size_t path_len, const char *ext)
5417 {
5418 strmake(path + path_len, ext, FN_REFLEN - path_len);
5419 return !access(path, F_OK);
5420 }
5421
5422 struct st_discover_existence_args
5423 {
5424 char *path;
5425 size_t path_len;
5426 const char *db, *table_name;
5427 handlerton *hton;
5428 bool frm_exists;
5429 };
5430
discover_existence(THD * thd,plugin_ref plugin,void * arg)5431 static my_bool discover_existence(THD *thd, plugin_ref plugin,
5432 void *arg)
5433 {
5434 st_discover_existence_args *args= (st_discover_existence_args*)arg;
5435 handlerton *ht= plugin_hton(plugin);
5436 if (ht->state != SHOW_OPTION_YES || !ht->discover_table_existence)
5437 return args->frm_exists;
5438
5439 args->hton= ht;
5440
5441 if (ht->discover_table_existence == ext_based_existence)
5442 return file_ext_exists(args->path, args->path_len,
5443 ht->tablefile_extensions[0]);
5444
5445 return ht->discover_table_existence(ht, args->db, args->table_name);
5446 }
5447
5448 class Table_exists_error_handler : public Internal_error_handler
5449 {
5450 public:
Table_exists_error_handler()5451 Table_exists_error_handler()
5452 : m_handled_errors(0), m_unhandled_errors(0)
5453 {}
5454
handle_condition(THD * thd,uint sql_errno,const char * sqlstate,Sql_condition::enum_warning_level * level,const char * msg,Sql_condition ** cond_hdl)5455 bool handle_condition(THD *thd,
5456 uint sql_errno,
5457 const char* sqlstate,
5458 Sql_condition::enum_warning_level *level,
5459 const char* msg,
5460 Sql_condition ** cond_hdl)
5461 {
5462 *cond_hdl= NULL;
5463 if (sql_errno == ER_NO_SUCH_TABLE ||
5464 sql_errno == ER_NO_SUCH_TABLE_IN_ENGINE ||
5465 sql_errno == ER_WRONG_OBJECT)
5466 {
5467 m_handled_errors++;
5468 return TRUE;
5469 }
5470
5471 if (*level == Sql_condition::WARN_LEVEL_ERROR)
5472 m_unhandled_errors++;
5473 return FALSE;
5474 }
5475
safely_trapped_errors()5476 bool safely_trapped_errors()
5477 {
5478 return ((m_handled_errors > 0) && (m_unhandled_errors == 0));
5479 }
5480
5481 private:
5482 int m_handled_errors;
5483 int m_unhandled_errors;
5484 };
5485
5486 /**
5487 Check if a given table exists, without doing a full discover, if possible
5488
5489 If the 'hton' is not NULL, it's set to the handlerton of the storage engine
5490 of this table, or to view_pseudo_hton if the frm belongs to a view.
5491
5492 This function takes discovery correctly into account. If frm is found,
5493 it discovers the table to make sure it really exists in the engine.
5494 If no frm is found it discovers the table, in case it still exists in
5495 the engine.
5496
5497 While it tries to cut corners (don't open .frm if no discovering engine is
5498 enabled, no full discovery if all discovering engines support
5499 discover_table_existence, etc), it still *may* be quite expensive
5500 and must be used sparingly.
5501
5502 @retval true Table exists (even if the error occurred, like bad frm)
5503 @retval false Table does not exist (one can do CREATE TABLE table_name)
5504
5505 @note if frm exists and the table in engine doesn't, *hton will be set,
5506 but the return value will be false.
5507
5508 @note if frm file exists, but the table cannot be opened (engine not
5509 loaded, frm is invalid), the return value will be true, but
5510 *hton will be NULL.
5511 */
5512
ha_table_exists(THD * thd,const LEX_CSTRING * db,const LEX_CSTRING * table_name,handlerton ** hton,bool * is_sequence)5513 bool ha_table_exists(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table_name,
5514 handlerton **hton, bool *is_sequence)
5515 {
5516 handlerton *dummy;
5517 bool dummy2;
5518 DBUG_ENTER("ha_table_exists");
5519
5520 if (hton)
5521 *hton= 0;
5522 else if (engines_with_discover)
5523 hton= &dummy;
5524 if (!is_sequence)
5525 is_sequence= &dummy2;
5526 *is_sequence= 0;
5527
5528 TDC_element *element= tdc_lock_share(thd, db->str, table_name->str);
5529 if (element && element != MY_ERRPTR)
5530 {
5531 if (hton)
5532 *hton= element->share->db_type();
5533 *is_sequence= element->share->table_type == TABLE_TYPE_SEQUENCE;
5534 tdc_unlock_share(element);
5535 DBUG_RETURN(TRUE);
5536 }
5537
5538 char path[FN_REFLEN + 1];
5539 size_t path_len = build_table_filename(path, sizeof(path) - 1,
5540 db->str, table_name->str, "", 0);
5541 st_discover_existence_args args= {path, path_len, db->str, table_name->str, 0, true};
5542
5543 if (file_ext_exists(path, path_len, reg_ext))
5544 {
5545 bool exists= true;
5546 if (hton)
5547 {
5548 char engine_buf[NAME_CHAR_LEN + 1];
5549 LEX_CSTRING engine= { engine_buf, 0 };
5550 Table_type type;
5551
5552 if ((type= dd_frm_type(thd, path, &engine, is_sequence)) ==
5553 TABLE_TYPE_UNKNOWN)
5554 DBUG_RETURN(0);
5555
5556 if (type != TABLE_TYPE_VIEW)
5557 {
5558 plugin_ref p= plugin_lock_by_name(thd, &engine,
5559 MYSQL_STORAGE_ENGINE_PLUGIN);
5560 *hton= p ? plugin_hton(p) : NULL;
5561 if (*hton)
5562 // verify that the table really exists
5563 exists= discover_existence(thd, p, &args);
5564 }
5565 else
5566 *hton= view_pseudo_hton;
5567 }
5568 DBUG_RETURN(exists);
5569 }
5570
5571 args.frm_exists= false;
5572 if (plugin_foreach(thd, discover_existence, MYSQL_STORAGE_ENGINE_PLUGIN,
5573 &args))
5574 {
5575 if (hton)
5576 *hton= args.hton;
5577 DBUG_RETURN(TRUE);
5578 }
5579
5580 if (need_full_discover_for_existence)
5581 {
5582 TABLE_LIST table;
5583 uint flags = GTS_TABLE | GTS_VIEW;
5584 if (!hton)
5585 flags|= GTS_NOLOCK;
5586
5587 Table_exists_error_handler no_such_table_handler;
5588 thd->push_internal_handler(&no_such_table_handler);
5589 table.init_one_table(db, table_name, 0, TL_READ);
5590 TABLE_SHARE *share= tdc_acquire_share(thd, &table, flags);
5591 thd->pop_internal_handler();
5592
5593 if (hton && share)
5594 {
5595 *hton= share->db_type();
5596 tdc_release_share(share);
5597 }
5598
5599 // the table doesn't exist if we've caught ER_NO_SUCH_TABLE and nothing else
5600 DBUG_RETURN(!no_such_table_handler.safely_trapped_errors());
5601 }
5602
5603 DBUG_RETURN(FALSE);
5604 }
5605
5606 /**
5607 Discover all table names in a given database
5608 */
5609 extern "C" {
5610
cmp_file_names(const void * a,const void * b)5611 static int cmp_file_names(const void *a, const void *b)
5612 {
5613 CHARSET_INFO *cs= character_set_filesystem;
5614 char *aa= ((FILEINFO *)a)->name;
5615 char *bb= ((FILEINFO *)b)->name;
5616 return my_strnncoll(cs, (uchar*)aa, strlen(aa), (uchar*)bb, strlen(bb));
5617 }
5618
cmp_table_names(LEX_CSTRING * const * a,LEX_CSTRING * const * b)5619 static int cmp_table_names(LEX_CSTRING * const *a, LEX_CSTRING * const *b)
5620 {
5621 return my_strnncoll(&my_charset_bin, (uchar*)((*a)->str), (*a)->length,
5622 (uchar*)((*b)->str), (*b)->length);
5623 }
5624
5625 #ifndef DBUG_OFF
cmp_table_names_desc(LEX_CSTRING * const * a,LEX_CSTRING * const * b)5626 static int cmp_table_names_desc(LEX_CSTRING * const *a, LEX_CSTRING * const *b)
5627 {
5628 return -cmp_table_names(a, b);
5629 }
5630 #endif
5631
5632 }
5633
Discovered_table_list(THD * thd_arg,Dynamic_array<LEX_CSTRING * > * tables_arg,const LEX_CSTRING * wild_arg)5634 Discovered_table_list::Discovered_table_list(THD *thd_arg,
5635 Dynamic_array<LEX_CSTRING*> *tables_arg,
5636 const LEX_CSTRING *wild_arg) :
5637 thd(thd_arg), with_temps(false), tables(tables_arg)
5638 {
5639 if (wild_arg->str && wild_arg->str[0])
5640 {
5641 wild= wild_arg->str;
5642 wend= wild + wild_arg->length;
5643 }
5644 else
5645 wild= 0;
5646 }
5647
add_table(const char * tname,size_t tlen)5648 bool Discovered_table_list::add_table(const char *tname, size_t tlen)
5649 {
5650 /*
5651 TODO Check with_temps and filter out temp tables.
5652 Implement the check, when we'll have at least one affected engine (with
5653 custom discover_table_names() method, that calls add_table() directly).
5654 Note: avoid comparing the same name twice (here and in add_file).
5655 */
5656 if (wild && my_wildcmp(table_alias_charset, tname, tname + tlen, wild, wend,
5657 wild_prefix, wild_one, wild_many))
5658 return 0;
5659
5660 LEX_CSTRING *name= thd->make_clex_string(tname, tlen);
5661 if (!name || tables->append(name))
5662 return 1;
5663 return 0;
5664 }
5665
add_file(const char * fname)5666 bool Discovered_table_list::add_file(const char *fname)
5667 {
5668 bool is_temp= strncmp(fname, STRING_WITH_LEN(tmp_file_prefix)) == 0;
5669
5670 if (is_temp && !with_temps)
5671 return 0;
5672
5673 char tname[SAFE_NAME_LEN + 1];
5674 size_t tlen= filename_to_tablename(fname, tname, sizeof(tname), is_temp);
5675 return add_table(tname, tlen);
5676 }
5677
5678
sort()5679 void Discovered_table_list::sort()
5680 {
5681 tables->sort(cmp_table_names);
5682 }
5683
5684
5685 #ifndef DBUG_OFF
sort_desc()5686 void Discovered_table_list::sort_desc()
5687 {
5688 tables->sort(cmp_table_names_desc);
5689 }
5690 #endif
5691
5692
remove_duplicates()5693 void Discovered_table_list::remove_duplicates()
5694 {
5695 LEX_CSTRING **src= tables->front();
5696 LEX_CSTRING **dst= src;
5697 sort();
5698 while (++dst <= tables->back())
5699 {
5700 LEX_CSTRING *s= *src, *d= *dst;
5701 DBUG_ASSERT(strncmp(s->str, d->str, MY_MIN(s->length, d->length)) <= 0);
5702 if ((s->length != d->length || strncmp(s->str, d->str, d->length)))
5703 {
5704 src++;
5705 if (src != dst)
5706 *src= *dst;
5707 }
5708 }
5709 tables->elements(src - tables->front() + 1);
5710 }
5711
5712 struct st_discover_names_args
5713 {
5714 LEX_CSTRING *db;
5715 MY_DIR *dirp;
5716 Discovered_table_list *result;
5717 uint possible_duplicates;
5718 };
5719
discover_names(THD * thd,plugin_ref plugin,void * arg)5720 static my_bool discover_names(THD *thd, plugin_ref plugin,
5721 void *arg)
5722 {
5723 st_discover_names_args *args= (st_discover_names_args *)arg;
5724 handlerton *ht= plugin_hton(plugin);
5725
5726 if (ht->state == SHOW_OPTION_YES && ht->discover_table_names)
5727 {
5728 size_t old_elements= args->result->tables->elements();
5729 if (ht->discover_table_names(ht, args->db, args->dirp, args->result))
5730 return 1;
5731
5732 /*
5733 hton_ext_based_table_discovery never discovers a table that has
5734 a corresponding .frm file; but custom engine discover methods might
5735 */
5736 if (ht->discover_table_names != hton_ext_based_table_discovery)
5737 args->possible_duplicates+= (uint)(args->result->tables->elements() - old_elements);
5738 }
5739
5740 return 0;
5741 }
5742
5743 /**
5744 Return the list of tables
5745
5746 @param thd
5747 @param db database to look into
5748 @param dirp list of files in this database (as returned by my_dir())
5749 @param result the object to return the list of files in
5750 @param reusable if true, on return, 'dirp' will be a valid list of all
5751 non-table files. If false, discovery will work much faster,
5752 but it will leave 'dirp' corrupted and completely unusable,
5753 only good for my_dirend().
5754
5755 Normally, reusable=false for SHOW and INFORMATION_SCHEMA, and reusable=true
5756 for DROP DATABASE (as it needs to know and delete non-table files).
5757 */
5758
ha_discover_table_names(THD * thd,LEX_CSTRING * db,MY_DIR * dirp,Discovered_table_list * result,bool reusable)5759 int ha_discover_table_names(THD *thd, LEX_CSTRING *db, MY_DIR *dirp,
5760 Discovered_table_list *result, bool reusable)
5761 {
5762 int error;
5763 DBUG_ENTER("ha_discover_table_names");
5764
5765 if (engines_with_discover_file_names == 0 && !reusable)
5766 {
5767 st_discover_names_args args= {db, NULL, result, 0};
5768 error= ext_table_discovery_simple(dirp, result) ||
5769 plugin_foreach(thd, discover_names,
5770 MYSQL_STORAGE_ENGINE_PLUGIN, &args);
5771 }
5772 else
5773 {
5774 st_discover_names_args args= {db, dirp, result, 0};
5775
5776 /* extension_based_table_discovery relies on dirp being sorted */
5777 my_qsort(dirp->dir_entry, dirp->number_of_files,
5778 sizeof(FILEINFO), cmp_file_names);
5779
5780 error= extension_based_table_discovery(dirp, reg_ext, result) ||
5781 plugin_foreach(thd, discover_names,
5782 MYSQL_STORAGE_ENGINE_PLUGIN, &args);
5783 if (args.possible_duplicates > 0)
5784 result->remove_duplicates();
5785 }
5786
5787 DBUG_RETURN(error);
5788 }
5789
5790
5791 /*
5792 int handler::pre_read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
5793 KEY_MULTI_RANGE *ranges,
5794 uint range_count,
5795 bool sorted, HANDLER_BUFFER *buffer,
5796 bool use_parallel)
5797 {
5798 int result;
5799 DBUG_ENTER("handler::pre_read_multi_range_first");
5800 result = pre_read_range_first(ranges->start_key.keypart_map ?
5801 &ranges->start_key : 0,
5802 ranges->end_key.keypart_map ?
5803 &ranges->end_key : 0,
5804 test(ranges->range_flag & EQ_RANGE),
5805 sorted,
5806 use_parallel);
5807 DBUG_RETURN(result);
5808 }
5809 */
5810
5811
5812 /**
5813 Read first row between two ranges.
5814 Store ranges for future calls to read_range_next.
5815
5816 @param start_key Start key. Is 0 if no min range
5817 @param end_key End key. Is 0 if no max range
5818 @param eq_range_arg Set to 1 if start_key == end_key
5819 @param sorted Set to 1 if result should be sorted per key
5820
5821 @note
5822 Record is read into table->record[0]
5823
5824 @retval
5825 0 Found row
5826 @retval
5827 HA_ERR_END_OF_FILE No rows in range
5828 @retval
5829 \# Error code
5830 */
read_range_first(const key_range * start_key,const key_range * end_key,bool eq_range_arg,bool sorted)5831 int handler::read_range_first(const key_range *start_key,
5832 const key_range *end_key,
5833 bool eq_range_arg, bool sorted)
5834 {
5835 int result;
5836 DBUG_ENTER("handler::read_range_first");
5837
5838 eq_range= eq_range_arg;
5839 set_end_range(end_key);
5840 range_key_part= table->key_info[active_index].key_part;
5841
5842 if (!start_key) // Read first record
5843 result= ha_index_first(table->record[0]);
5844 else
5845 result= ha_index_read_map(table->record[0],
5846 start_key->key,
5847 start_key->keypart_map,
5848 start_key->flag);
5849 if (result)
5850 DBUG_RETURN((result == HA_ERR_KEY_NOT_FOUND)
5851 ? HA_ERR_END_OF_FILE
5852 : result);
5853
5854 if (compare_key(end_range) <= 0)
5855 {
5856 DBUG_RETURN(0);
5857 }
5858 else
5859 {
5860 /*
5861 The last read row does not fall in the range. So request
5862 storage engine to release row lock if possible.
5863 */
5864 unlock_row();
5865 DBUG_RETURN(HA_ERR_END_OF_FILE);
5866 }
5867 }
5868
5869
5870 /**
5871 Read next row between two ranges.
5872
5873 @note
5874 Record is read into table->record[0]
5875
5876 @retval
5877 0 Found row
5878 @retval
5879 HA_ERR_END_OF_FILE No rows in range
5880 @retval
5881 \# Error code
5882 */
read_range_next()5883 int handler::read_range_next()
5884 {
5885 int result;
5886 DBUG_ENTER("handler::read_range_next");
5887
5888 if (eq_range)
5889 {
5890 /* We trust that index_next_same always gives a row in range */
5891 DBUG_RETURN(ha_index_next_same(table->record[0],
5892 end_range->key,
5893 end_range->length));
5894 }
5895 result= ha_index_next(table->record[0]);
5896 if (result)
5897 DBUG_RETURN(result);
5898
5899 if (compare_key(end_range) <= 0)
5900 {
5901 DBUG_RETURN(0);
5902 }
5903 else
5904 {
5905 /*
5906 The last read row does not fall in the range. So request
5907 storage engine to release row lock if possible.
5908 */
5909 unlock_row();
5910 DBUG_RETURN(HA_ERR_END_OF_FILE);
5911 }
5912 }
5913
5914
set_end_range(const key_range * end_key)5915 void handler::set_end_range(const key_range *end_key)
5916 {
5917 end_range= 0;
5918 if (end_key)
5919 {
5920 end_range= &save_end_range;
5921 save_end_range= *end_key;
5922 key_compare_result_on_equal=
5923 ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
5924 (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
5925 }
5926 }
5927
5928
5929 /**
5930 Compare if found key (in row) is over max-value.
5931
5932 @param range range to compare to row. May be 0 for no range
5933
5934 @see also
5935 key.cc::key_cmp()
5936
5937 @return
5938 The return value is SIGN(key_in_row - range_key):
5939
5940 - 0 : Key is equal to range or 'range' == 0 (no range)
5941 - -1 : Key is less than range
5942 - 1 : Key is larger than range
5943 */
compare_key(key_range * range)5944 int handler::compare_key(key_range *range)
5945 {
5946 int cmp;
5947 if (!range || in_range_check_pushed_down)
5948 return 0; // No max range
5949 cmp= key_cmp(range_key_part, range->key, range->length);
5950 if (!cmp)
5951 cmp= key_compare_result_on_equal;
5952 return cmp;
5953 }
5954
5955
5956 /*
5957 Same as compare_key() but doesn't check have in_range_check_pushed_down.
5958 This is used by index condition pushdown implementation.
5959 */
5960
compare_key2(key_range * range) const5961 int handler::compare_key2(key_range *range) const
5962 {
5963 int cmp;
5964 if (!range)
5965 return 0; // no max range
5966 cmp= key_cmp(range_key_part, range->key, range->length);
5967 if (!cmp)
5968 cmp= key_compare_result_on_equal;
5969 return cmp;
5970 }
5971
5972
5973 /**
5974 ICP callback - to be called by an engine to check the pushed condition
5975 */
handler_index_cond_check(void * h_arg)5976 extern "C" enum icp_result handler_index_cond_check(void* h_arg)
5977 {
5978 handler *h= (handler*)h_arg;
5979 THD *thd= h->table->in_use;
5980 enum icp_result res;
5981
5982 DEBUG_SYNC(thd, "handler_index_cond_check");
5983 enum thd_kill_levels abort_at= h->has_transactions() ?
5984 THD_ABORT_SOFTLY : THD_ABORT_ASAP;
5985 if (thd_kill_level(thd) > abort_at)
5986 return ICP_ABORTED_BY_USER;
5987
5988 if (h->end_range && h->compare_key2(h->end_range) > 0)
5989 return ICP_OUT_OF_RANGE;
5990 h->increment_statistics(&SSV::ha_icp_attempts);
5991 if ((res= h->pushed_idx_cond->val_int()? ICP_MATCH : ICP_NO_MATCH) ==
5992 ICP_MATCH)
5993 h->increment_statistics(&SSV::ha_icp_match);
5994 return res;
5995 }
5996
index_read_idx_map(uchar * buf,uint index,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)5997 int handler::index_read_idx_map(uchar * buf, uint index, const uchar * key,
5998 key_part_map keypart_map,
5999 enum ha_rkey_function find_flag)
6000 {
6001 int error, UNINIT_VAR(error1);
6002
6003 error= ha_index_init(index, 0);
6004 if (likely(!error))
6005 {
6006 error= index_read_map(buf, key, keypart_map, find_flag);
6007 error1= ha_index_end();
6008 }
6009 return error ? error : error1;
6010 }
6011
6012
6013 /**
6014 Returns a list of all known extensions.
6015
6016 No mutexes, worst case race is a minor surplus memory allocation
6017 We have to recreate the extension map if mysqld is restarted (for example
6018 within libmysqld)
6019
6020 @retval
6021 pointer pointer to TYPELIB structure
6022 */
exts_handlerton(THD * unused,plugin_ref plugin,void * arg)6023 static my_bool exts_handlerton(THD *unused, plugin_ref plugin,
6024 void *arg)
6025 {
6026 List<char> *found_exts= (List<char> *) arg;
6027 handlerton *hton= plugin_hton(plugin);
6028 List_iterator_fast<char> it(*found_exts);
6029 const char **ext, *old_ext;
6030
6031 for (ext= hton->tablefile_extensions; *ext; ext++)
6032 {
6033 while ((old_ext= it++))
6034 {
6035 if (!strcmp(old_ext, *ext))
6036 break;
6037 }
6038 if (!old_ext)
6039 found_exts->push_back((char *) *ext);
6040
6041 it.rewind();
6042 }
6043 return FALSE;
6044 }
6045
ha_known_exts(void)6046 TYPELIB *ha_known_exts(void)
6047 {
6048 if (!known_extensions.type_names || mysys_usage_id != known_extensions_id)
6049 {
6050 List<char> found_exts;
6051 const char **ext, *old_ext;
6052
6053 known_extensions_id= mysys_usage_id;
6054 found_exts.push_back((char*) TRG_EXT);
6055 found_exts.push_back((char*) TRN_EXT);
6056
6057 plugin_foreach(NULL, exts_handlerton,
6058 MYSQL_STORAGE_ENGINE_PLUGIN, &found_exts);
6059
6060 ext= (const char **) my_once_alloc(sizeof(char *)*
6061 (found_exts.elements+1),
6062 MYF(MY_WME | MY_FAE));
6063
6064 DBUG_ASSERT(ext != 0);
6065 known_extensions.count= found_exts.elements;
6066 known_extensions.type_names= ext;
6067
6068 List_iterator_fast<char> it(found_exts);
6069 while ((old_ext= it++))
6070 *ext++= old_ext;
6071 *ext= 0;
6072 }
6073 return &known_extensions;
6074 }
6075
6076
stat_print(THD * thd,const char * type,size_t type_len,const char * file,size_t file_len,const char * status,size_t status_len)6077 static bool stat_print(THD *thd, const char *type, size_t type_len,
6078 const char *file, size_t file_len,
6079 const char *status, size_t status_len)
6080 {
6081 Protocol *protocol= thd->protocol;
6082 protocol->prepare_for_resend();
6083 protocol->store(type, type_len, system_charset_info);
6084 protocol->store(file, file_len, system_charset_info);
6085 protocol->store(status, status_len, system_charset_info);
6086 if (protocol->write())
6087 return TRUE;
6088 return FALSE;
6089 }
6090
6091
showstat_handlerton(THD * thd,plugin_ref plugin,void * arg)6092 static my_bool showstat_handlerton(THD *thd, plugin_ref plugin,
6093 void *arg)
6094 {
6095 enum ha_stat_type stat= *(enum ha_stat_type *) arg;
6096 handlerton *hton= plugin_hton(plugin);
6097 if (hton->state == SHOW_OPTION_YES && hton->show_status &&
6098 hton->show_status(hton, thd, stat_print, stat))
6099 return TRUE;
6100 return FALSE;
6101 }
6102
ha_show_status(THD * thd,handlerton * db_type,enum ha_stat_type stat)6103 bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat)
6104 {
6105 List<Item> field_list;
6106 Protocol *protocol= thd->protocol;
6107 MEM_ROOT *mem_root= thd->mem_root;
6108 bool result;
6109
6110 field_list.push_back(new (mem_root) Item_empty_string(thd, "Type", 10),
6111 mem_root);
6112 field_list.push_back(new (mem_root)
6113 Item_empty_string(thd, "Name", FN_REFLEN), mem_root);
6114 field_list.push_back(new (mem_root)
6115 Item_empty_string(thd, "Status", 10),
6116 mem_root);
6117
6118 if (protocol->send_result_set_metadata(&field_list,
6119 Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
6120 return TRUE;
6121
6122 if (db_type == NULL)
6123 {
6124 result= plugin_foreach(thd, showstat_handlerton,
6125 MYSQL_STORAGE_ENGINE_PLUGIN, &stat);
6126 }
6127 else
6128 {
6129 if (db_type->state != SHOW_OPTION_YES)
6130 {
6131 const LEX_CSTRING *name= hton_name(db_type);
6132 result= stat_print(thd, name->str, name->length,
6133 "", 0, "DISABLED", 8) ? 1 : 0;
6134 }
6135 else
6136 {
6137 result= db_type->show_status &&
6138 db_type->show_status(db_type, thd, stat_print, stat) ? 1 : 0;
6139 }
6140 }
6141
6142 /*
6143 We also check thd->is_error() as Innodb may return 0 even if
6144 there was an error.
6145 */
6146 if (likely(!result && !thd->is_error()))
6147 my_eof(thd);
6148 else if (!thd->is_error())
6149 my_error(ER_GET_ERRNO, MYF(0), errno, hton_name(db_type)->str);
6150 return result;
6151 }
6152
6153 /*
6154 Function to check if the conditions for row-based binlogging is
6155 correct for the table.
6156
6157 A row in the given table should be replicated if:
6158 - It's not called by partition engine
6159 - Row-based replication is enabled in the current thread
6160 - The binlog is enabled
6161 - It is not a temporary table
6162 - The binary log is open
6163 - The database the table resides in shall be binlogged (binlog_*_db rules)
6164 - table is not mysql.event
6165
6166 RETURN VALUE
6167 0 No binary logging in row format
6168 1 Row needs to be logged
6169 */
6170
check_table_binlog_row_based(bool binlog_row)6171 bool handler::check_table_binlog_row_based(bool binlog_row)
6172 {
6173 if (table->versioned(VERS_TRX_ID))
6174 return false;
6175 if (unlikely((table->in_use->variables.sql_log_bin_off)))
6176 return 0; /* Called by partitioning engine */
6177 if (unlikely((!check_table_binlog_row_based_done)))
6178 {
6179 check_table_binlog_row_based_done= 1;
6180 check_table_binlog_row_based_result=
6181 check_table_binlog_row_based_internal(binlog_row);
6182 }
6183 return check_table_binlog_row_based_result;
6184 }
6185
check_table_binlog_row_based_internal(bool binlog_row)6186 bool handler::check_table_binlog_row_based_internal(bool binlog_row)
6187 {
6188 THD *thd= table->in_use;
6189
6190 return (table->s->can_do_row_logging &&
6191 thd->is_current_stmt_binlog_format_row() &&
6192 /*
6193 Wsrep partially enables binary logging if it have not been
6194 explicitly turned on. As a result we return 'true' if we are in
6195 wsrep binlog emulation mode and the current thread is not a wsrep
6196 applier or replayer thread. This decision is not affected by
6197 @@sql_log_bin as we want the events to make into the binlog
6198 cache only to filter them later before they make into binary log
6199 file.
6200
6201 However, we do return 'false' if binary logging was temporarily
6202 turned off (see tmp_disable_binlog(A)).
6203
6204 Otherwise, return 'true' if binary logging is on.
6205 */
6206 IF_WSREP(((WSREP_EMULATE_BINLOG(thd) &&
6207 (thd->wsrep_exec_mode != REPL_RECV)) ||
6208 ((WSREP(thd) ||
6209 (thd->variables.option_bits & OPTION_BIN_LOG)) &&
6210 mysql_bin_log.is_open())),
6211 (thd->variables.option_bits & OPTION_BIN_LOG) &&
6212 mysql_bin_log.is_open()));
6213 }
6214
6215
6216 /** @brief
6217 Write table maps for all (manually or automatically) locked tables
6218 to the binary log. Also, if binlog_annotate_row_events is ON,
6219 write Annotate_rows event before the first table map.
6220
6221 SYNOPSIS
6222 write_locked_table_maps()
6223 thd Pointer to THD structure
6224
6225 DESCRIPTION
6226 This function will generate and write table maps for all tables
6227 that are locked by the thread 'thd'.
6228
6229 RETURN VALUE
6230 0 All OK
6231 1 Failed to write all table maps
6232
6233 SEE ALSO
6234 THD::lock
6235 */
6236
write_locked_table_maps(THD * thd)6237 static int write_locked_table_maps(THD *thd)
6238 {
6239 DBUG_ENTER("write_locked_table_maps");
6240 DBUG_PRINT("enter", ("thd:%p thd->lock:%p "
6241 "thd->extra_lock: %p",
6242 thd, thd->lock, thd->extra_lock));
6243
6244 DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps()));
6245
6246 MYSQL_LOCK *locks[2];
6247 locks[0]= thd->extra_lock;
6248 locks[1]= thd->lock;
6249 my_bool with_annotate= thd->variables.binlog_annotate_row_events &&
6250 thd->query() && thd->query_length();
6251
6252 for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
6253 {
6254 MYSQL_LOCK const *const lock= locks[i];
6255 if (lock == NULL)
6256 continue;
6257
6258 TABLE **const end_ptr= lock->table + lock->table_count;
6259 for (TABLE **table_ptr= lock->table ;
6260 table_ptr != end_ptr ;
6261 ++table_ptr)
6262 {
6263 TABLE *const table= *table_ptr;
6264 DBUG_PRINT("info", ("Checking table %s", table->s->table_name.str));
6265 if (table->current_lock == F_WRLCK &&
6266 table->file->check_table_binlog_row_based(0))
6267 {
6268 /*
6269 We need to have a transactional behavior for SQLCOM_CREATE_TABLE
6270 (e.g. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
6271 compatible behavior with the STMT based replication even when
6272 the table is not transactional. In other words, if the operation
6273 fails while executing the insert phase nothing is written to the
6274 binlog.
6275
6276 Note that at this point, we check the type of a set of tables to
6277 create the table map events. In the function binlog_log_row(),
6278 which calls the current function, we check the type of the table
6279 of the current row.
6280 */
6281 bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE ||
6282 table->file->has_transactions();
6283 int const error= thd->binlog_write_table_map(table, has_trans,
6284 &with_annotate);
6285 /*
6286 If an error occurs, it is the responsibility of the caller to
6287 roll back the transaction.
6288 */
6289 if (unlikely(error))
6290 DBUG_RETURN(1);
6291 }
6292 }
6293 }
6294 DBUG_RETURN(0);
6295 }
6296
6297
binlog_log_row_internal(TABLE * table,const uchar * before_record,const uchar * after_record,Log_func * log_func)6298 static int binlog_log_row_internal(TABLE* table,
6299 const uchar *before_record,
6300 const uchar *after_record,
6301 Log_func *log_func)
6302 {
6303 bool error= 0;
6304 THD *const thd= table->in_use;
6305
6306 /*
6307 If there are no table maps written to the binary log, this is
6308 the first row handled in this statement. In that case, we need
6309 to write table maps for all locked tables to the binary log.
6310 */
6311 if (likely(!(error= ((thd->get_binlog_table_maps() == 0 &&
6312 write_locked_table_maps(thd))))))
6313 {
6314 /*
6315 We need to have a transactional behavior for SQLCOM_CREATE_TABLE
6316 (i.e. CREATE TABLE... SELECT * FROM TABLE) in order to keep a
6317 compatible behavior with the STMT based replication even when
6318 the table is not transactional. In other words, if the operation
6319 fails while executing the insert phase nothing is written to the
6320 binlog.
6321 */
6322 bool const has_trans= thd->lex->sql_command == SQLCOM_CREATE_TABLE ||
6323 table->file->has_transactions();
6324 error= (*log_func)(thd, table, has_trans, before_record, after_record);
6325 }
6326 return error ? HA_ERR_RBR_LOGGING_FAILED : 0;
6327 }
6328
binlog_log_row(TABLE * table,const uchar * before_record,const uchar * after_record,Log_func * log_func)6329 int binlog_log_row(TABLE* table, const uchar *before_record,
6330 const uchar *after_record, Log_func *log_func)
6331 {
6332 #ifdef WITH_WSREP
6333 THD *const thd= table->in_use;
6334
6335 /* only InnoDB tables will be replicated through binlog emulation */
6336 if ((WSREP_EMULATE_BINLOG(thd) &&
6337 table->file->partition_ht()->db_type != DB_TYPE_INNODB) ||
6338 (thd->wsrep_ignore_table == true))
6339 return 0;
6340
6341 /* enforce wsrep_max_ws_rows */
6342 if (WSREP(thd) && table->s->tmp_table == NO_TMP_TABLE)
6343 {
6344 thd->wsrep_affected_rows++;
6345 if (wsrep_max_ws_rows &&
6346 thd->wsrep_exec_mode != REPL_RECV &&
6347 thd->wsrep_affected_rows > wsrep_max_ws_rows)
6348 {
6349 trans_rollback_stmt(thd) || trans_rollback(thd);
6350 my_message(ER_ERROR_DURING_COMMIT, "wsrep_max_ws_rows exceeded", MYF(0));
6351 return ER_ERROR_DURING_COMMIT;
6352 }
6353 }
6354 #endif
6355
6356 if (!table->file->check_table_binlog_row_based(1))
6357 return 0;
6358 return binlog_log_row_internal(table, before_record, after_record, log_func);
6359 }
6360
6361
ha_external_lock(THD * thd,int lock_type)6362 int handler::ha_external_lock(THD *thd, int lock_type)
6363 {
6364 int error;
6365 DBUG_ENTER("handler::ha_external_lock");
6366 /*
6367 Whether this is lock or unlock, this should be true, and is to verify that
6368 if get_auto_increment() was called (thus may have reserved intervals or
6369 taken a table lock), ha_release_auto_increment() was too.
6370 */
6371 DBUG_ASSERT(next_insert_id == 0);
6372 /* Consecutive calls for lock without unlocking in between is not allowed */
6373 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
6374 ((lock_type != F_UNLCK && m_lock_type == F_UNLCK) ||
6375 lock_type == F_UNLCK));
6376 /* SQL HANDLER call locks/unlock while scanning (RND/INDEX). */
6377 DBUG_ASSERT(inited == NONE || table->open_by_handler);
6378
6379 if (MYSQL_HANDLER_RDLOCK_START_ENABLED() ||
6380 MYSQL_HANDLER_WRLOCK_START_ENABLED() ||
6381 MYSQL_HANDLER_UNLOCK_START_ENABLED())
6382 {
6383 if (lock_type == F_RDLCK)
6384 {
6385 MYSQL_HANDLER_RDLOCK_START(table_share->db.str,
6386 table_share->table_name.str);
6387 }
6388 else if (lock_type == F_WRLCK)
6389 {
6390 MYSQL_HANDLER_WRLOCK_START(table_share->db.str,
6391 table_share->table_name.str);
6392 }
6393 else if (lock_type == F_UNLCK)
6394 {
6395 MYSQL_HANDLER_UNLOCK_START(table_share->db.str,
6396 table_share->table_name.str);
6397 }
6398 }
6399
6400 /*
6401 We cache the table flags if the locking succeeded. Otherwise, we
6402 keep them as they were when they were fetched in ha_open().
6403 */
6404 MYSQL_TABLE_LOCK_WAIT(m_psi, PSI_TABLE_EXTERNAL_LOCK, lock_type,
6405 { error= external_lock(thd, lock_type); })
6406
6407 DBUG_EXECUTE_IF("external_lock_failure", error= HA_ERR_GENERIC;);
6408
6409 if (likely(error == 0 || lock_type == F_UNLCK))
6410 {
6411 m_lock_type= lock_type;
6412 cached_table_flags= table_flags();
6413 if (table_share->tmp_table == NO_TMP_TABLE)
6414 mysql_audit_external_lock(thd, table_share, lock_type);
6415 }
6416
6417 if (MYSQL_HANDLER_RDLOCK_DONE_ENABLED() ||
6418 MYSQL_HANDLER_WRLOCK_DONE_ENABLED() ||
6419 MYSQL_HANDLER_UNLOCK_DONE_ENABLED())
6420 {
6421 if (lock_type == F_RDLCK)
6422 {
6423 MYSQL_HANDLER_RDLOCK_DONE(error);
6424 }
6425 else if (lock_type == F_WRLCK)
6426 {
6427 MYSQL_HANDLER_WRLOCK_DONE(error);
6428 }
6429 else if (lock_type == F_UNLCK)
6430 {
6431 MYSQL_HANDLER_UNLOCK_DONE(error);
6432 }
6433 }
6434 DBUG_RETURN(error);
6435 }
6436
6437
6438 /** @brief
6439 Check handler usage and reset state of file to after 'open'
6440 */
ha_reset()6441 int handler::ha_reset()
6442 {
6443 DBUG_ENTER("ha_reset");
6444 /* Check that we have called all proper deallocation functions */
6445 DBUG_ASSERT((uchar*) table->def_read_set.bitmap +
6446 table->s->column_bitmap_size ==
6447 (uchar*) table->def_write_set.bitmap);
6448 DBUG_ASSERT(bitmap_is_set_all(&table->s->all_set));
6449 DBUG_ASSERT(!table->file->keyread_enabled());
6450 /* ensure that ha_index_end / ha_rnd_end has been called */
6451 DBUG_ASSERT(inited == NONE);
6452 /* reset the bitmaps to point to defaults */
6453 table->default_column_bitmaps();
6454 pushed_cond= NULL;
6455 tracker= NULL;
6456 mark_trx_read_write_done= 0;
6457 clear_cached_table_binlog_row_based_flag();
6458 /* Reset information about pushed engine conditions */
6459 cancel_pushed_idx_cond();
6460 /* Reset information about pushed index conditions */
6461 clear_top_table_fields();
6462 DBUG_RETURN(reset());
6463 }
6464
6465
ha_write_row(uchar * buf)6466 int handler::ha_write_row(uchar *buf)
6467 {
6468 int error;
6469 Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
6470 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
6471 m_lock_type == F_WRLCK);
6472 DBUG_ENTER("handler::ha_write_row");
6473 DEBUG_SYNC_C("ha_write_row_start");
6474
6475 MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str);
6476 mark_trx_read_write();
6477 increment_statistics(&SSV::ha_write_count);
6478
6479 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
6480 { error= write_row(buf); })
6481
6482 MYSQL_INSERT_ROW_DONE(error);
6483 if (likely(!error) && !row_already_logged)
6484 {
6485 rows_changed++;
6486 error= binlog_log_row(table, 0, buf, log_func);
6487 }
6488 DEBUG_SYNC_C("ha_write_row_end");
6489 DBUG_RETURN(error);
6490 }
6491
6492
ha_update_row(const uchar * old_data,const uchar * new_data)6493 int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
6494 {
6495 int error;
6496 Log_func *log_func= Update_rows_log_event::binlog_row_logging_function;
6497 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
6498 m_lock_type == F_WRLCK);
6499
6500 /*
6501 Some storage engines require that the new record is in record[0]
6502 (and the old record is in record[1]).
6503 */
6504 DBUG_ASSERT(new_data == table->record[0]);
6505 DBUG_ASSERT(old_data == table->record[1]);
6506
6507 MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
6508 mark_trx_read_write();
6509 increment_statistics(&SSV::ha_update_count);
6510
6511 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
6512 { error= update_row(old_data, new_data);})
6513
6514 MYSQL_UPDATE_ROW_DONE(error);
6515 if (likely(!error) && !row_already_logged)
6516 {
6517 rows_changed++;
6518 error= binlog_log_row(table, old_data, new_data, log_func);
6519 }
6520 return error;
6521 }
6522
6523 /*
6524 Update first row. Only used by sequence tables
6525 */
6526
update_first_row(uchar * new_data)6527 int handler::update_first_row(uchar *new_data)
6528 {
6529 int error;
6530 if (likely(!(error= ha_rnd_init(1))))
6531 {
6532 int end_error;
6533 if (likely(!(error= ha_rnd_next(table->record[1]))))
6534 {
6535 /*
6536 We have to do the memcmp as otherwise we may get error 169 from InnoDB
6537 */
6538 if (memcmp(new_data, table->record[1], table->s->reclength))
6539 error= update_row(table->record[1], new_data);
6540 }
6541 end_error= ha_rnd_end();
6542 if (likely(!error))
6543 error= end_error;
6544 /* Logging would be wrong if update_row works but ha_rnd_end fails */
6545 DBUG_ASSERT(!end_error || error != 0);
6546 }
6547 return error;
6548 }
6549
6550
ha_delete_row(const uchar * buf)6551 int handler::ha_delete_row(const uchar *buf)
6552 {
6553 int error;
6554 Log_func *log_func= Delete_rows_log_event::binlog_row_logging_function;
6555 DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
6556 m_lock_type == F_WRLCK);
6557 /*
6558 Normally table->record[0] is used, but sometimes table->record[1] is used.
6559 */
6560 DBUG_ASSERT(buf == table->record[0] ||
6561 buf == table->record[1]);
6562
6563 MYSQL_DELETE_ROW_START(table_share->db.str, table_share->table_name.str);
6564 mark_trx_read_write();
6565 increment_statistics(&SSV::ha_delete_count);
6566
6567 TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_DELETE_ROW, active_index, 0,
6568 { error= delete_row(buf);})
6569 MYSQL_DELETE_ROW_DONE(error);
6570 if (likely(!error))
6571 {
6572 rows_changed++;
6573 error= binlog_log_row(table, buf, 0, log_func);
6574 }
6575 return error;
6576 }
6577
6578
6579 /**
6580 Execute a direct update request. A direct update request updates all
6581 qualified rows in a single operation, rather than one row at a time.
6582 In a Spider cluster the direct update operation is pushed down to the
6583 child levels of the cluster.
6584
6585 Note that this can't be used in case of statment logging
6586
6587 @param update_rows Number of updated rows.
6588
6589 @retval 0 Success.
6590 @retval != 0 Failure.
6591 */
6592
ha_direct_update_rows(ha_rows * update_rows)6593 int handler::ha_direct_update_rows(ha_rows *update_rows)
6594 {
6595 int error;
6596
6597 MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
6598 mark_trx_read_write();
6599
6600 error = direct_update_rows(update_rows);
6601 MYSQL_UPDATE_ROW_DONE(error);
6602 return error;
6603 }
6604
6605
6606 /**
6607 Execute a direct delete request. A direct delete request deletes all
6608 qualified rows in a single operation, rather than one row at a time.
6609 In a Spider cluster the direct delete operation is pushed down to the
6610 child levels of the cluster.
6611
6612 @param delete_rows Number of deleted rows.
6613
6614 @retval 0 Success.
6615 @retval != 0 Failure.
6616 */
6617
ha_direct_delete_rows(ha_rows * delete_rows)6618 int handler::ha_direct_delete_rows(ha_rows *delete_rows)
6619 {
6620 int error;
6621 /* Ensure we are not using binlog row */
6622 DBUG_ASSERT(!table->in_use->is_current_stmt_binlog_format_row());
6623
6624 MYSQL_DELETE_ROW_START(table_share->db.str, table_share->table_name.str);
6625 mark_trx_read_write();
6626
6627 error = direct_delete_rows(delete_rows);
6628 MYSQL_DELETE_ROW_DONE(error);
6629 return error;
6630 }
6631
6632
6633 /** @brief
6634 use_hidden_primary_key() is called in case of an update/delete when
6635 (table_flags() and HA_PRIMARY_KEY_REQUIRED_FOR_DELETE) is defined
6636 but we don't have a primary key
6637 */
use_hidden_primary_key()6638 void handler::use_hidden_primary_key()
6639 {
6640 /* fallback to use all columns in the table to identify row */
6641 table->column_bitmaps_set(&table->s->all_set, table->write_set);
6642 }
6643
6644
6645 /**
6646 Get an initialized ha_share.
6647
6648 @return Initialized ha_share
6649 @retval NULL ha_share is not yet initialized.
6650 @retval != NULL previous initialized ha_share.
6651
6652 @note
6653 If not a temp table, then LOCK_ha_data must be held.
6654 */
6655
get_ha_share_ptr()6656 Handler_share *handler::get_ha_share_ptr()
6657 {
6658 DBUG_ENTER("handler::get_ha_share_ptr");
6659 DBUG_ASSERT(ha_share);
6660 DBUG_ASSERT(table_share);
6661
6662 #ifndef DBUG_OFF
6663 if (table_share->tmp_table == NO_TMP_TABLE)
6664 mysql_mutex_assert_owner(&table_share->LOCK_ha_data);
6665 #endif
6666
6667 DBUG_RETURN(*ha_share);
6668 }
6669
6670
6671 /**
6672 Set ha_share to be used by all instances of the same table/partition.
6673
6674 @param ha_share Handler_share to be shared.
6675
6676 @note
6677 If not a temp table, then LOCK_ha_data must be held.
6678 */
6679
set_ha_share_ptr(Handler_share * arg_ha_share)6680 void handler::set_ha_share_ptr(Handler_share *arg_ha_share)
6681 {
6682 DBUG_ENTER("handler::set_ha_share_ptr");
6683 DBUG_ASSERT(ha_share);
6684 #ifndef DBUG_OFF
6685 if (table_share->tmp_table == NO_TMP_TABLE)
6686 mysql_mutex_assert_owner(&table_share->LOCK_ha_data);
6687 #endif
6688
6689 *ha_share= arg_ha_share;
6690 DBUG_VOID_RETURN;
6691 }
6692
6693
6694 /**
6695 Take a lock for protecting shared handler data.
6696 */
6697
lock_shared_ha_data()6698 void handler::lock_shared_ha_data()
6699 {
6700 DBUG_ASSERT(table_share);
6701 if (table_share->tmp_table == NO_TMP_TABLE)
6702 mysql_mutex_lock(&table_share->LOCK_ha_data);
6703 }
6704
6705
6706 /**
6707 Release lock for protecting ha_share.
6708 */
6709
unlock_shared_ha_data()6710 void handler::unlock_shared_ha_data()
6711 {
6712 DBUG_ASSERT(table_share);
6713 if (table_share->tmp_table == NO_TMP_TABLE)
6714 mysql_mutex_unlock(&table_share->LOCK_ha_data);
6715 }
6716
6717 /** @brief
6718 Dummy function which accept information about log files which is not need
6719 by handlers
6720 */
signal_log_not_needed(struct handlerton,char * log_file)6721 void signal_log_not_needed(struct handlerton, char *log_file)
6722 {
6723 DBUG_ENTER("signal_log_not_needed");
6724 DBUG_PRINT("enter", ("logfile '%s'", log_file));
6725 DBUG_VOID_RETURN;
6726 }
6727
set_lock_type(enum thr_lock_type lock)6728 void handler::set_lock_type(enum thr_lock_type lock)
6729 {
6730 table->reginfo.lock_type= lock;
6731 }
6732
6733 #ifdef WITH_WSREP
6734 /**
6735 @details
6736 This function makes the storage engine to force the victim transaction
6737 to abort. Currently, only innodb has this functionality, but any SE
6738 implementing the wsrep API should provide this service to support
6739 multi-master operation.
6740
6741 @note Aborting the transaction does NOT end it, it still has to
6742 be rolled back with hton->rollback().
6743
6744 @note It is safe to abort from one thread (bf_thd) the transaction,
6745 running in another thread (victim_thd), because InnoDB's lock_sys and
6746 trx_mutex guarantee the necessary protection. However, its not safe
6747 to access victim_thd->transaction, because it's not protected from
6748 concurrent accesses. And it's an overkill to take LOCK_plugin and
6749 iterate the whole installed_htons[] array every time.
6750
6751 @param bf_thd brute force THD asking for the abort
6752 @param victim_thd victim THD to be aborted
6753
6754 @return
6755 always 0
6756 */
6757
ha_abort_transaction(THD * bf_thd,THD * victim_thd,my_bool signal)6758 int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
6759 {
6760 DBUG_ENTER("ha_abort_transaction");
6761 if (!WSREP(bf_thd) &&
6762 !(bf_thd->variables.wsrep_OSU_method == WSREP_OSU_RSU &&
6763 bf_thd->wsrep_exec_mode == TOTAL_ORDER)) {
6764 DBUG_RETURN(0);
6765 }
6766
6767 handlerton *hton= installed_htons[DB_TYPE_INNODB];
6768 if (hton && hton->abort_transaction)
6769 {
6770 hton->abort_transaction(hton, bf_thd, victim_thd, signal);
6771 }
6772 else
6773 {
6774 WSREP_WARN("Cannot abort InnoDB transaction");
6775 }
6776
6777 DBUG_RETURN(0);
6778 }
6779
ha_fake_trx_id(THD * thd)6780 void ha_fake_trx_id(THD *thd)
6781 {
6782 DBUG_ENTER("ha_fake_trx_id");
6783
6784 bool no_fake_trx_id= true;
6785
6786 if (!WSREP(thd))
6787 {
6788 DBUG_VOID_RETURN;
6789 }
6790
6791 if (thd->wsrep_ws_handle.trx_id != WSREP_UNDEFINED_TRX_ID)
6792 {
6793 WSREP_DEBUG("fake trx id skipped: %" PRIu64, thd->wsrep_ws_handle.trx_id);
6794 DBUG_VOID_RETURN;
6795 }
6796
6797 /* Try statement transaction if standard one is not set. */
6798 THD_TRANS *trans= (thd->transaction.all.ha_list) ? &thd->transaction.all :
6799 &thd->transaction.stmt;
6800
6801 Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
6802
6803 for (; ha_info; ha_info= ha_info_next)
6804 {
6805 handlerton *hton= ha_info->ht();
6806 if (hton->fake_trx_id)
6807 {
6808 hton->fake_trx_id(hton, thd);
6809
6810 /* Got a fake trx id. */
6811 no_fake_trx_id= false;
6812
6813 /*
6814 We need transaction ID from just one storage engine providing
6815 fake_trx_id (which will most likely be the case).
6816 */
6817 break;
6818 }
6819 ha_info_next= ha_info->next();
6820 }
6821
6822 if (unlikely(no_fake_trx_id))
6823 WSREP_WARN("Cannot get fake transaction ID from storage engine.");
6824
6825 DBUG_VOID_RETURN;
6826 }
6827 #endif /* WITH_WSREP */
6828
6829
6830 #ifdef TRANS_LOG_MGM_EXAMPLE_CODE
6831 /*
6832 Example of transaction log management functions based on assumption that logs
6833 placed into a directory
6834 */
6835 #include <my_dir.h>
6836 #include <my_sys.h>
example_of_iterator_using_for_logs_cleanup(handlerton * hton)6837 int example_of_iterator_using_for_logs_cleanup(handlerton *hton)
6838 {
6839 void *buffer;
6840 int res= 1;
6841 struct handler_iterator iterator;
6842 struct handler_log_file_data data;
6843
6844 if (!hton->create_iterator)
6845 return 1; /* iterator creator is not supported */
6846
6847 if ((*hton->create_iterator)(hton, HA_TRANSACTLOG_ITERATOR, &iterator) !=
6848 HA_ITERATOR_OK)
6849 {
6850 /* error during creation of log iterator or iterator is not supported */
6851 return 1;
6852 }
6853 while((*iterator.next)(&iterator, (void*)&data) == 0)
6854 {
6855 printf("%s\n", data.filename.str);
6856 if (data.status == HA_LOG_STATUS_FREE &&
6857 mysql_file_delete(INSTRUMENT_ME,
6858 data.filename.str, MYF(MY_WME)))
6859 goto err;
6860 }
6861 res= 0;
6862 err:
6863 (*iterator.destroy)(&iterator);
6864 return res;
6865 }
6866
6867
6868 /*
6869 Here we should get info from handler where it save logs but here is
6870 just example, so we use constant.
6871 IMHO FN_ROOTDIR ("/") is safe enough for example, because nobody has
6872 rights on it except root and it consist of directories only at lest for
6873 *nix (sorry, can't find windows-safe solution here, but it is only example).
6874 */
6875 #define fl_dir FN_ROOTDIR
6876
6877
6878 /** @brief
6879 Dummy function to return log status should be replaced by function which
6880 really detect the log status and check that the file is a log of this
6881 handler.
6882 */
fl_get_log_status(char * log)6883 enum log_status fl_get_log_status(char *log)
6884 {
6885 MY_STAT stat_buff;
6886 if (mysql_file_stat(INSTRUMENT_ME, log, &stat_buff, MYF(0)))
6887 return HA_LOG_STATUS_INUSE;
6888 return HA_LOG_STATUS_NOSUCHLOG;
6889 }
6890
6891
6892 struct fl_buff
6893 {
6894 LEX_STRING *names;
6895 enum log_status *statuses;
6896 uint32 entries;
6897 uint32 current;
6898 };
6899
6900
fl_log_iterator_next(struct handler_iterator * iterator,void * iterator_object)6901 int fl_log_iterator_next(struct handler_iterator *iterator,
6902 void *iterator_object)
6903 {
6904 struct fl_buff *buff= (struct fl_buff *)iterator->buffer;
6905 struct handler_log_file_data *data=
6906 (struct handler_log_file_data *) iterator_object;
6907 if (buff->current >= buff->entries)
6908 return 1;
6909 data->filename= buff->names[buff->current];
6910 data->status= buff->statuses[buff->current];
6911 buff->current++;
6912 return 0;
6913 }
6914
6915
fl_log_iterator_destroy(struct handler_iterator * iterator)6916 void fl_log_iterator_destroy(struct handler_iterator *iterator)
6917 {
6918 my_free(iterator->buffer);
6919 }
6920
6921
6922 /** @brief
6923 returns buffer, to be assigned in handler_iterator struct
6924 */
6925 enum handler_create_iterator_result
fl_log_iterator_buffer_init(struct handler_iterator * iterator)6926 fl_log_iterator_buffer_init(struct handler_iterator *iterator)
6927 {
6928 MY_DIR *dirp;
6929 struct fl_buff *buff;
6930 char *name_ptr;
6931 uchar *ptr;
6932 FILEINFO *file;
6933 uint32 i;
6934
6935 /* to be able to make my_free without crash in case of error */
6936 iterator->buffer= 0;
6937
6938 if (!(dirp = my_dir(fl_dir, MYF(MY_THREAD_SPECIFIC))))
6939 {
6940 return HA_ITERATOR_ERROR;
6941 }
6942 if ((ptr= (uchar*)my_malloc(ALIGN_SIZE(sizeof(fl_buff)) +
6943 ((ALIGN_SIZE(sizeof(LEX_STRING)) +
6944 sizeof(enum log_status) +
6945 + FN_REFLEN + 1) *
6946 (uint) dirp->number_off_files),
6947 MYF(MY_THREAD_SPECIFIC))) == 0)
6948 {
6949 return HA_ITERATOR_ERROR;
6950 }
6951 buff= (struct fl_buff *)ptr;
6952 buff->entries= buff->current= 0;
6953 ptr= ptr + (ALIGN_SIZE(sizeof(fl_buff)));
6954 buff->names= (LEX_STRING*) (ptr);
6955 ptr= ptr + ((ALIGN_SIZE(sizeof(LEX_STRING)) *
6956 (uint) dirp->number_off_files));
6957 buff->statuses= (enum log_status *)(ptr);
6958 name_ptr= (char *)(ptr + (sizeof(enum log_status) *
6959 (uint) dirp->number_off_files));
6960 for (i=0 ; i < (uint) dirp->number_off_files ; i++)
6961 {
6962 enum log_status st;
6963 file= dirp->dir_entry + i;
6964 if ((file->name[0] == '.' &&
6965 ((file->name[1] == '.' && file->name[2] == '\0') ||
6966 file->name[1] == '\0')))
6967 continue;
6968 if ((st= fl_get_log_status(file->name)) == HA_LOG_STATUS_NOSUCHLOG)
6969 continue;
6970 name_ptr= strxnmov(buff->names[buff->entries].str= name_ptr,
6971 FN_REFLEN, fl_dir, file->name, NullS);
6972 buff->names[buff->entries].length= (name_ptr -
6973 buff->names[buff->entries].str);
6974 buff->statuses[buff->entries]= st;
6975 buff->entries++;
6976 }
6977
6978 iterator->buffer= buff;
6979 iterator->next= &fl_log_iterator_next;
6980 iterator->destroy= &fl_log_iterator_destroy;
6981 my_dirend(dirp);
6982 return HA_ITERATOR_OK;
6983 }
6984
6985
6986 /* An example of a iterator creator */
6987 enum handler_create_iterator_result
fl_create_iterator(enum handler_iterator_type type,struct handler_iterator * iterator)6988 fl_create_iterator(enum handler_iterator_type type,
6989 struct handler_iterator *iterator)
6990 {
6991 switch(type) {
6992 case HA_TRANSACTLOG_ITERATOR:
6993 return fl_log_iterator_buffer_init(iterator);
6994 default:
6995 return HA_ITERATOR_UNSUPPORTED;
6996 }
6997 }
6998 #endif /*TRANS_LOG_MGM_EXAMPLE_CODE*/
6999
7000
check_conflicting_charset_declarations(CHARSET_INFO * cs)7001 bool HA_CREATE_INFO::check_conflicting_charset_declarations(CHARSET_INFO *cs)
7002 {
7003 if ((used_fields & HA_CREATE_USED_DEFAULT_CHARSET) &&
7004 /* DEFAULT vs explicit, or explicit vs DEFAULT */
7005 (((default_table_charset == NULL) != (cs == NULL)) ||
7006 /* Two different explicit character sets */
7007 (default_table_charset && cs &&
7008 !my_charset_same(default_table_charset, cs))))
7009 {
7010 my_error(ER_CONFLICTING_DECLARATIONS, MYF(0),
7011 "CHARACTER SET ", default_table_charset ?
7012 default_table_charset->csname : "DEFAULT",
7013 "CHARACTER SET ", cs ? cs->csname : "DEFAULT");
7014 return true;
7015 }
7016 return false;
7017 }
7018
7019 /* Remove all indexes for a given table from global index statistics */
7020
7021 static
del_global_index_stats_for_table(THD * thd,uchar * cache_key,size_t cache_key_length)7022 int del_global_index_stats_for_table(THD *thd, uchar* cache_key, size_t cache_key_length)
7023 {
7024 int res = 0;
7025 DBUG_ENTER("del_global_index_stats_for_table");
7026
7027 mysql_mutex_lock(&LOCK_global_index_stats);
7028
7029 for (uint i= 0; i < global_index_stats.records;)
7030 {
7031 INDEX_STATS *index_stats =
7032 (INDEX_STATS*) my_hash_element(&global_index_stats, i);
7033
7034 /* We search correct db\0table_name\0 string */
7035 if (index_stats &&
7036 index_stats->index_name_length >= cache_key_length &&
7037 !memcmp(index_stats->index, cache_key, cache_key_length))
7038 {
7039 res= my_hash_delete(&global_index_stats, (uchar*)index_stats);
7040 /*
7041 In our HASH implementation on deletion one elements
7042 is moved into a place where a deleted element was,
7043 and the last element is moved into the empty space.
7044 Thus we need to re-examine the current element, but
7045 we don't have to restart the search from the beginning.
7046 */
7047 }
7048 else
7049 i++;
7050 }
7051
7052 mysql_mutex_unlock(&LOCK_global_index_stats);
7053 DBUG_RETURN(res);
7054 }
7055
7056 /* Remove a table from global table statistics */
7057
del_global_table_stat(THD * thd,const LEX_CSTRING * db,const LEX_CSTRING * table)7058 int del_global_table_stat(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table)
7059 {
7060 TABLE_STATS *table_stats;
7061 int res = 0;
7062 uchar *cache_key;
7063 size_t cache_key_length;
7064 DBUG_ENTER("del_global_table_stat");
7065
7066 cache_key_length= db->length + 1 + table->length + 1;
7067
7068 if(!(cache_key= (uchar *)my_malloc(cache_key_length,
7069 MYF(MY_WME | MY_ZEROFILL))))
7070 {
7071 /* Out of memory error already given */
7072 res = 1;
7073 goto end;
7074 }
7075
7076 memcpy(cache_key, db->str, db->length);
7077 memcpy(cache_key + db->length + 1, table->str, table->length);
7078
7079 res= del_global_index_stats_for_table(thd, cache_key, cache_key_length);
7080
7081 mysql_mutex_lock(&LOCK_global_table_stats);
7082
7083 if((table_stats= (TABLE_STATS*) my_hash_search(&global_table_stats,
7084 cache_key,
7085 cache_key_length)))
7086 res= my_hash_delete(&global_table_stats, (uchar*)table_stats);
7087
7088 my_free(cache_key);
7089 mysql_mutex_unlock(&LOCK_global_table_stats);
7090
7091 end:
7092 DBUG_RETURN(res);
7093 }
7094
7095 /* Remove a index from global index statistics */
7096
del_global_index_stat(THD * thd,TABLE * table,KEY * key_info)7097 int del_global_index_stat(THD *thd, TABLE* table, KEY* key_info)
7098 {
7099 INDEX_STATS *index_stats;
7100 size_t key_length= table->s->table_cache_key.length + key_info->name.length + 1;
7101 int res = 0;
7102 DBUG_ENTER("del_global_index_stat");
7103 mysql_mutex_lock(&LOCK_global_index_stats);
7104
7105 if((index_stats= (INDEX_STATS*) my_hash_search(&global_index_stats,
7106 key_info->cache_name,
7107 key_length)))
7108 res= my_hash_delete(&global_index_stats, (uchar*)index_stats);
7109
7110 mysql_mutex_unlock(&LOCK_global_index_stats);
7111 DBUG_RETURN(res);
7112 }
7113
is_start(const char * name) const7114 bool Vers_parse_info::is_start(const char *name) const
7115 {
7116 DBUG_ASSERT(name);
7117 return as_row.start && as_row.start.streq(name);
7118 }
is_end(const char * name) const7119 bool Vers_parse_info::is_end(const char *name) const
7120 {
7121 DBUG_ASSERT(name);
7122 return as_row.end && as_row.end.streq(name);
7123 }
is_start(const Create_field & f) const7124 bool Vers_parse_info::is_start(const Create_field &f) const
7125 {
7126 return f.flags & VERS_ROW_START;
7127 }
is_end(const Create_field & f) const7128 bool Vers_parse_info::is_end(const Create_field &f) const
7129 {
7130 return f.flags & VERS_ROW_END;
7131 }
7132
vers_init_sys_field(THD * thd,const char * field_name,int flags,bool integer)7133 static Create_field *vers_init_sys_field(THD *thd, const char *field_name, int flags, bool integer)
7134 {
7135 Create_field *f= new (thd->mem_root) Create_field();
7136 if (!f)
7137 return NULL;
7138
7139 f->field_name.str= field_name;
7140 f->field_name.length= strlen(field_name);
7141 f->charset= system_charset_info;
7142 f->flags= flags | NOT_NULL_FLAG;
7143 if (integer)
7144 {
7145 DBUG_ASSERT(0); // Not implemented yet
7146 f->set_handler(&type_handler_vers_trx_id);
7147 f->length= MY_INT64_NUM_DECIMAL_DIGITS - 1;
7148 f->flags|= UNSIGNED_FLAG;
7149 }
7150 else
7151 {
7152 f->set_handler(&type_handler_timestamp2);
7153 f->length= MAX_DATETIME_PRECISION;
7154 }
7155 f->invisible= DBUG_EVALUATE_IF("sysvers_show", VISIBLE, INVISIBLE_SYSTEM);
7156
7157 if (f->check(thd))
7158 return NULL;
7159
7160 return f;
7161 }
7162
vers_create_sys_field(THD * thd,const char * field_name,Alter_info * alter_info,int flags)7163 static bool vers_create_sys_field(THD *thd, const char *field_name,
7164 Alter_info *alter_info, int flags)
7165 {
7166 Create_field *f= vers_init_sys_field(thd, field_name, flags, false);
7167 if (!f)
7168 return true;
7169
7170 alter_info->flags|= ALTER_PARSER_ADD_COLUMN;
7171 alter_info->create_list.push_back(f);
7172
7173 return false;
7174 }
7175
7176 const Lex_ident Vers_parse_info::default_start= "row_start";
7177 const Lex_ident Vers_parse_info::default_end= "row_end";
7178
fix_implicit(THD * thd,Alter_info * alter_info)7179 bool Vers_parse_info::fix_implicit(THD *thd, Alter_info *alter_info)
7180 {
7181 // If user specified some of these he must specify the others too. Do nothing.
7182 if (*this)
7183 return false;
7184
7185 alter_info->flags|= ALTER_PARSER_ADD_COLUMN;
7186
7187 system_time= start_end_t(default_start, default_end);
7188 as_row= system_time;
7189
7190 if (vers_create_sys_field(thd, default_start, alter_info, VERS_ROW_START) ||
7191 vers_create_sys_field(thd, default_end, alter_info, VERS_ROW_END))
7192 {
7193 return true;
7194 }
7195 return false;
7196 }
7197
7198
vers_fix_system_fields(THD * thd,Alter_info * alter_info,const TABLE_LIST & create_table)7199 bool Table_scope_and_contents_source_st::vers_fix_system_fields(
7200 THD *thd, Alter_info *alter_info, const TABLE_LIST &create_table)
7201 {
7202 DBUG_ASSERT(!(alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING));
7203
7204 DBUG_EXECUTE_IF("sysvers_force", if (!tmp_table()) {
7205 alter_info->flags|= ALTER_ADD_SYSTEM_VERSIONING;
7206 options|= HA_VERSIONED_TABLE; });
7207
7208 if (!vers_info.need_check(alter_info))
7209 return false;
7210
7211 const bool add_versioning= alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING;
7212
7213 if (!vers_info.versioned_fields && vers_info.unversioned_fields && !add_versioning)
7214 {
7215 // All is correct but this table is not versioned.
7216 options&= ~HA_VERSIONED_TABLE;
7217 return false;
7218 }
7219
7220 if (!add_versioning && vers_info && !vers_info.versioned_fields)
7221 {
7222 my_error(ER_MISSING, MYF(0), create_table.table_name.str,
7223 "WITH SYSTEM VERSIONING");
7224 return true;
7225 }
7226
7227 List_iterator<Create_field> it(alter_info->create_list);
7228 while (Create_field *f= it++)
7229 {
7230 if (f->vers_sys_field())
7231 continue;
7232 if ((f->versioning == Column_definition::VERSIONING_NOT_SET && !add_versioning) ||
7233 f->versioning == Column_definition::WITHOUT_VERSIONING)
7234 {
7235 f->flags|= VERS_UPDATE_UNVERSIONED_FLAG;
7236 }
7237 } // while (Create_field *f= it++)
7238
7239 if (vers_info.fix_implicit(thd, alter_info))
7240 return true;
7241
7242 return false;
7243 }
7244
7245
vers_check_system_fields(THD * thd,Alter_info * alter_info,const Lex_table_name & table_name,const Lex_table_name & db,int select_count)7246 bool Table_scope_and_contents_source_st::vers_check_system_fields(
7247 THD *thd, Alter_info *alter_info, const Lex_table_name &table_name,
7248 const Lex_table_name &db, int select_count)
7249 {
7250 if (!(options & HA_VERSIONED_TABLE))
7251 return false;
7252
7253 uint versioned_fields= 0;
7254
7255 if (!(alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING))
7256 {
7257 uint fieldnr= 0;
7258 List_iterator<Create_field> field_it(alter_info->create_list);
7259 while (Create_field *f= field_it++)
7260 {
7261 /*
7262 The field from the CREATE part can be duplicated in the SELECT part of
7263 CREATE...SELECT. In that case double counts should be avoided.
7264 select_create::create_table_from_items just pushes the fields back into
7265 the create_list, without additional manipulations, so the fields from
7266 SELECT go last there.
7267 */
7268 bool is_dup= false;
7269 if (fieldnr >= alter_info->create_list.elements - select_count)
7270 {
7271 List_iterator<Create_field> dup_it(alter_info->create_list);
7272 for (Create_field *dup= dup_it++; !is_dup && dup != f; dup= dup_it++)
7273 is_dup= Lex_ident(dup->field_name).streq(f->field_name);
7274 }
7275
7276 if (!(f->flags & VERS_UPDATE_UNVERSIONED_FLAG) && !is_dup)
7277 versioned_fields++;
7278 fieldnr++;
7279 }
7280 if (versioned_fields == VERSIONING_FIELDS)
7281 {
7282 my_error(ER_VERS_TABLE_MUST_HAVE_COLUMNS, MYF(0), table_name.str);
7283 return true;
7284 }
7285 }
7286
7287 if (!(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING) && !versioned_fields)
7288 return false;
7289
7290 bool can_native= ha_check_storage_engine_flag(db_type,
7291 HTON_NATIVE_SYS_VERSIONING)
7292 || db_type->db_type == DB_TYPE_PARTITION_DB;
7293
7294 return vers_info.check_sys_fields(table_name, db, alter_info, can_native);
7295 }
7296
7297
fix_alter_info(THD * thd,Alter_info * alter_info,HA_CREATE_INFO * create_info,TABLE * table)7298 bool Vers_parse_info::fix_alter_info(THD *thd, Alter_info *alter_info,
7299 HA_CREATE_INFO *create_info, TABLE *table)
7300 {
7301 TABLE_SHARE *share= table->s;
7302 const char *table_name= share->table_name.str;
7303
7304 if (!need_check(alter_info) && !share->versioned)
7305 return false;
7306
7307 if (DBUG_EVALUATE_IF("sysvers_force", 0, share->tmp_table))
7308 {
7309 my_error(ER_VERS_TEMPORARY, MYF(0));
7310 return true;
7311 }
7312
7313 if (alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING &&
7314 table->versioned())
7315 {
7316 my_error(ER_VERS_ALREADY_VERSIONED, MYF(0), table_name);
7317 return true;
7318 }
7319
7320 if (alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING)
7321 {
7322 if (!share->versioned)
7323 {
7324 my_error(ER_VERS_NOT_VERSIONED, MYF(0), table_name);
7325 return true;
7326 }
7327 #ifdef WITH_PARTITION_STORAGE_ENGINE
7328 if (table->part_info &&
7329 table->part_info->part_type == VERSIONING_PARTITION)
7330 {
7331 my_error(ER_DROP_VERSIONING_SYSTEM_TIME_PARTITION, MYF(0), table_name);
7332 return true;
7333 }
7334 #endif
7335
7336 return false;
7337 }
7338
7339 if (!(alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING))
7340 {
7341 List_iterator_fast<Create_field> it(alter_info->create_list);
7342 while (Create_field *f= it++)
7343 {
7344 if (f->flags & VERS_SYSTEM_FIELD)
7345 {
7346 if (!table->versioned())
7347 {
7348 my_error(ER_VERS_NOT_VERSIONED, MYF(0), table->s->table_name.str);
7349 return true;
7350 }
7351 my_error(ER_VERS_DUPLICATE_ROW_START_END, MYF(0),
7352 f->flags & VERS_ROW_START ? "START" : "END", f->field_name.str);
7353 return true;
7354 }
7355 }
7356 }
7357
7358 if ((alter_info->flags & ALTER_DROP_PERIOD ||
7359 versioned_fields || unversioned_fields) && !share->versioned)
7360 {
7361 my_error(ER_VERS_NOT_VERSIONED, MYF(0), table_name);
7362 return true;
7363 }
7364
7365 if (share->versioned)
7366 {
7367 if (alter_info->flags & ALTER_ADD_PERIOD)
7368 {
7369 my_error(ER_VERS_ALREADY_VERSIONED, MYF(0), table_name);
7370 return true;
7371 }
7372
7373 // copy info from existing table
7374 create_info->options|= HA_VERSIONED_TABLE;
7375
7376 DBUG_ASSERT(share->vers_start_field());
7377 DBUG_ASSERT(share->vers_end_field());
7378 Lex_ident start(share->vers_start_field()->field_name);
7379 Lex_ident end(share->vers_end_field()->field_name);
7380 DBUG_ASSERT(start.str);
7381 DBUG_ASSERT(end.str);
7382
7383 as_row= start_end_t(start, end);
7384 system_time= as_row;
7385
7386 if (alter_info->create_list.elements)
7387 {
7388 List_iterator_fast<Create_field> it(alter_info->create_list);
7389 while (Create_field *f= it++)
7390 {
7391 if (f->versioning == Column_definition::WITHOUT_VERSIONING)
7392 f->flags|= VERS_UPDATE_UNVERSIONED_FLAG;
7393
7394 if (f->change.str && (start.streq(f->change) || end.streq(f->change)))
7395 {
7396 my_error(ER_VERS_ALTER_SYSTEM_FIELD, MYF(0), f->change.str);
7397 return true;
7398 }
7399 }
7400 }
7401
7402 return false;
7403 }
7404
7405 return fix_implicit(thd, alter_info);
7406 }
7407
7408 bool
fix_create_like(Alter_info & alter_info,HA_CREATE_INFO & create_info,TABLE_LIST & src_table,TABLE_LIST & table)7409 Vers_parse_info::fix_create_like(Alter_info &alter_info, HA_CREATE_INFO &create_info,
7410 TABLE_LIST &src_table, TABLE_LIST &table)
7411 {
7412 List_iterator<Create_field> it(alter_info.create_list);
7413 List_iterator<Key> key_it(alter_info.key_list);
7414 List_iterator<Key_part_spec> kp_it;
7415 Create_field *f, *f_start=NULL, *f_end= NULL;
7416
7417 DBUG_ASSERT(alter_info.create_list.elements > 2);
7418
7419 if (create_info.tmp_table())
7420 {
7421 int remove= 2;
7422 while (remove && (f= it++))
7423 {
7424 if (f->flags & VERS_SYSTEM_FIELD)
7425 {
7426 it.remove();
7427 remove--;
7428 }
7429 key_it.rewind();
7430 while (Key *key= key_it++)
7431 {
7432 kp_it.init(key->columns);
7433 while (Key_part_spec *kp= kp_it++)
7434 {
7435 if (0 == lex_string_cmp(system_charset_info, &kp->field_name,
7436 &f->field_name))
7437 {
7438 kp_it.remove();
7439 }
7440 }
7441 if (0 == key->columns.elements)
7442 {
7443 key_it.remove();
7444 }
7445 }
7446 }
7447 DBUG_ASSERT(remove == 0);
7448 push_warning_printf(current_thd, Sql_condition::WARN_LEVEL_WARN,
7449 ER_UNKNOWN_ERROR,
7450 "System versioning is stripped from temporary `%s.%s`",
7451 table.db.str, table.table_name.str);
7452 return false;
7453 }
7454
7455 while ((f= it++))
7456 {
7457 if (f->flags & VERS_ROW_START)
7458 {
7459 f_start= f;
7460 if (f_end)
7461 break;
7462 }
7463 else if (f->flags & VERS_ROW_END)
7464 {
7465 f_end= f;
7466 if (f_start)
7467 break;
7468 }
7469 }
7470
7471 if (!f_start || !f_end)
7472 {
7473 my_error(ER_MISSING, MYF(0), src_table.table_name.str,
7474 f_start ? "AS ROW END" : "AS ROW START");
7475 return true;
7476 }
7477
7478 as_row= start_end_t(f_start->field_name, f_end->field_name);
7479 system_time= as_row;
7480
7481 create_info.options|= HA_VERSIONED_TABLE;
7482 return false;
7483 }
7484
need_check(const Alter_info * alter_info) const7485 bool Vers_parse_info::need_check(const Alter_info *alter_info) const
7486 {
7487 return versioned_fields || unversioned_fields ||
7488 alter_info->flags & ALTER_ADD_PERIOD ||
7489 alter_info->flags & ALTER_DROP_PERIOD ||
7490 alter_info->flags & ALTER_ADD_SYSTEM_VERSIONING ||
7491 alter_info->flags & ALTER_DROP_SYSTEM_VERSIONING || *this;
7492 }
7493
check_conditions(const Lex_table_name & table_name,const Lex_table_name & db) const7494 bool Vers_parse_info::check_conditions(const Lex_table_name &table_name,
7495 const Lex_table_name &db) const
7496 {
7497 if (!as_row.start || !as_row.end)
7498 {
7499 my_error(ER_MISSING, MYF(0), table_name.str,
7500 as_row.start ? "AS ROW END" : "AS ROW START");
7501 return true;
7502 }
7503
7504 if (!system_time.start || !system_time.end)
7505 {
7506 my_error(ER_MISSING, MYF(0), table_name.str, "PERIOD FOR SYSTEM_TIME");
7507 return true;
7508 }
7509
7510 if (!as_row.start.streq(system_time.start) ||
7511 !as_row.end.streq(system_time.end))
7512 {
7513 my_error(ER_VERS_PERIOD_COLUMNS, MYF(0), as_row.start.str, as_row.end.str);
7514 return true;
7515 }
7516
7517 if (db.streq(MYSQL_SCHEMA_NAME))
7518 {
7519 my_error(ER_VERS_DB_NOT_SUPPORTED, MYF(0), MYSQL_SCHEMA_NAME.str);
7520 return true;
7521 }
7522 return false;
7523 }
7524
7525
vers_check_timestamp(const Lex_table_name & table_name) const7526 bool Create_field::vers_check_timestamp(const Lex_table_name &table_name) const
7527 {
7528 if (type_handler() == &type_handler_timestamp2 &&
7529 length == MAX_DATETIME_FULL_WIDTH)
7530 return false;
7531
7532 my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), field_name.str, "TIMESTAMP(6)",
7533 table_name.str);
7534 return true;
7535 }
7536
7537
vers_check_bigint(const Lex_table_name & table_name) const7538 bool Create_field::vers_check_bigint(const Lex_table_name &table_name) const
7539 {
7540 if (is_some_bigint() && flags & UNSIGNED_FLAG &&
7541 length == MY_INT64_NUM_DECIMAL_DIGITS - 1)
7542 return false;
7543
7544 my_error(ER_VERS_FIELD_WRONG_TYPE, MYF(0), field_name.str,
7545 "BIGINT(20) UNSIGNED", table_name.str);
7546 return true;
7547 }
7548
7549
check_sys_fields(const Lex_table_name & table_name,const Lex_table_name & db,Alter_info * alter_info,bool can_native) const7550 bool Vers_parse_info::check_sys_fields(const Lex_table_name &table_name,
7551 const Lex_table_name &db,
7552 Alter_info *alter_info,
7553 bool can_native) const
7554 {
7555 if (check_conditions(table_name, db))
7556 return true;
7557
7558 const Create_field *row_start= NULL;
7559 const Create_field *row_end= NULL;
7560
7561 List_iterator<Create_field> it(alter_info->create_list);
7562 while (Create_field *f= it++)
7563 {
7564 if (!row_start && f->flags & VERS_ROW_START)
7565 row_start= f;
7566 else if (!row_end && f->flags & VERS_ROW_END)
7567 row_end= f;
7568 }
7569
7570 if (!row_start || !row_end)
7571 {
7572 my_error(ER_VERS_PERIOD_COLUMNS, MYF(0), as_row.start.str, as_row.end.str);
7573 return true;
7574 }
7575
7576 if (!can_native ||
7577 !row_start->is_some_bigint() ||
7578 !row_end->is_some_bigint())
7579 {
7580 if (row_start->vers_check_timestamp(table_name) ||
7581 row_end->vers_check_timestamp(table_name))
7582 return true;
7583 }
7584 else
7585 {
7586 if (row_start->vers_check_bigint(table_name) ||
7587 row_end->vers_check_bigint(table_name))
7588 return true;
7589
7590 if (!TR_table::use_transaction_registry)
7591 {
7592 my_error(ER_VERS_TRT_IS_DISABLED, MYF(0));
7593 return true;
7594 }
7595 }
7596
7597 return false;
7598 }
7599