1 /* Copyright (c) 2010, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "rpl_info_table.h"
24
25 #include "dynamic_ids.h" // Server_ids
26 #include "log.h" // sql_print_error
27 #include "rpl_info_table_access.h" // Rpl_info_table_access
28 #include "rpl_info_values.h" // Rpl_info_values
29 #include "sql_class.h" // THD
30
31
Rpl_info_table(uint nparam,const char * param_schema,const char * param_table,const uint param_n_pk_fields,const uint * param_pk_field_indexes)32 Rpl_info_table::Rpl_info_table(uint nparam,
33 const char* param_schema,
34 const char *param_table,
35 const uint param_n_pk_fields,
36 const uint *param_pk_field_indexes)
37 :Rpl_info_handler(nparam), is_transactional(FALSE)
38 {
39 str_schema.str= str_table.str= NULL;
40 str_schema.length= str_table.length= 0;
41
42 size_t schema_length= strlen(param_schema);
43 if ((str_schema.str= (char *) my_malloc(key_memory_Rpl_info_table,
44 schema_length + 1, MYF(0))))
45 {
46 str_schema.length= schema_length;
47 strmake(str_schema.str, param_schema, schema_length);
48 }
49
50 size_t table_length= strlen(param_table);
51 if ((str_table.str= (char *) my_malloc(key_memory_Rpl_info_table,
52 table_length + 1, MYF(0))))
53 {
54 str_table.length= table_length;
55 strmake(str_table.str, param_table, table_length);
56 }
57
58 if ((description= (char *)
59 my_malloc(key_memory_Rpl_info_table,
60 str_schema.length + str_table.length + 2, MYF(0))))
61 {
62 char *pos= my_stpcpy(description, param_schema);
63 pos= my_stpcpy(pos, ".");
64 pos= my_stpcpy(pos, param_table);
65 }
66
67 m_n_pk_fields= param_n_pk_fields;
68 m_pk_field_indexes= param_pk_field_indexes;
69
70 access= new Rpl_info_table_access();
71 }
72
~Rpl_info_table()73 Rpl_info_table::~Rpl_info_table()
74 {
75 delete access;
76
77 my_free(description);
78
79 my_free(str_table.str);
80
81 my_free(str_schema.str);
82 }
83
do_init_info()84 int Rpl_info_table::do_init_info()
85 {
86 return do_init_info(FIND_KEY, 0);
87 }
88
do_init_info(uint instance)89 int Rpl_info_table::do_init_info(uint instance)
90 {
91 return do_init_info(FIND_KEY, instance);
92 }
93
do_init_info(enum_find_method method,uint instance)94 int Rpl_info_table::do_init_info(enum_find_method method, uint instance)
95 {
96 int error= 1;
97 enum enum_return_id res= FOUND_ID;
98 TABLE *table= NULL;
99 sql_mode_t saved_mode;
100 Open_tables_backup backup;
101
102 DBUG_ENTER("Rlp_info_table::do_init_info");
103
104 THD *thd= access->create_thd();
105
106 saved_mode= thd->variables.sql_mode;
107 tmp_disable_binlog(thd);
108
109 /*
110 Opens and locks the rpl_info table before accessing it.
111 */
112 if (access->open_table(thd, str_schema, str_table,
113 get_number_info(), TL_WRITE,
114 &table, &backup))
115 goto end;
116
117 if (verify_table_primary_key_fields(table))
118 goto end;
119
120 /*
121 Points the cursor at the row to be read according to the
122 keys.
123 */
124 switch (method)
125 {
126 case FIND_KEY:
127 res= access->find_info(field_values, table);
128 break;
129
130 case FIND_SCAN:
131 res= access->scan_info(table, instance);
132 break;
133
134 default:
135 assert(0);
136 break;
137 }
138
139 if (res == FOUND_ID)
140 {
141 /*
142 Reads the information stored in the rpl_info table into a
143 set of variables. If there is a failure, an error is returned.
144 */
145 if (access->load_info_values(get_number_info(), table->field,
146 field_values))
147 goto end;
148 }
149 error= (res == ERROR_ID);
150 end:
151 /*
152 Unlocks and closes the rpl_info table.
153 */
154 error= access->close_table(thd, table, &backup, error) || error;
155 reenable_binlog(thd);
156 thd->variables.sql_mode= saved_mode;
157 access->drop_thd(thd);
158 DBUG_RETURN(error);
159 }
160
do_flush_info(const bool force)161 int Rpl_info_table::do_flush_info(const bool force)
162 {
163 int error= 1;
164 enum enum_return_id res= FOUND_ID;
165 TABLE *table= NULL;
166 sql_mode_t saved_mode;
167 Open_tables_backup backup;
168
169 DBUG_ENTER("Rpl_info_table::do_flush_info");
170
171 if (!(force || (sync_period &&
172 ++(sync_counter) >= sync_period)))
173 DBUG_RETURN(0);
174
175 THD *thd= access->create_thd();
176
177 sync_counter= 0;
178 saved_mode= thd->variables.sql_mode;
179 tmp_disable_binlog(thd);
180 thd->is_operating_substatement_implicitly= true;
181
182 /*
183 Opens and locks the rpl_info table before accessing it.
184 */
185 if (access->open_table(thd, str_schema, str_table,
186 get_number_info(), TL_WRITE,
187 &table, &backup))
188 goto end;
189
190 /*
191 Points the cursor at the row to be read according to the
192 keys. If the row is not found an error is reported.
193 */
194 if ((res= access->find_info(field_values, table)) == NOT_FOUND_ID)
195 {
196 /*
197 Prepares the information to be stored before calling ha_write_row.
198 */
199 empty_record(table);
200 if (access->store_info_values(get_number_info(), table->field,
201 field_values))
202 goto end;
203
204 /*
205 Inserts a new row into rpl_info table.
206 */
207 if ((error= table->file->ha_write_row(table->record[0])))
208 {
209 table->file->print_error(error, MYF(0));
210 /*
211 This makes sure that the error is 1 and not the status returned
212 by the handler.
213 */
214 error= 1;
215 goto end;
216 }
217 error= 0;
218 }
219 else if (res == FOUND_ID)
220 {
221 /*
222 Prepares the information to be stored before calling ha_update_row.
223 */
224 store_record(table, record[1]);
225 if (access->store_info_values(get_number_info(), table->field,
226 field_values))
227 goto end;
228
229 /*
230 Updates a row in the rpl_info table.
231 */
232 if ((error= table->file->ha_update_row(table->record[1], table->record[0])) &&
233 error != HA_ERR_RECORD_IS_THE_SAME)
234 {
235 table->file->print_error(error, MYF(0));
236 /*
237 This makes sure that the error is 1 and not the status returned
238 by the handler.
239 */
240 error= 1;
241 goto end;
242 }
243 error= 0;
244 }
245
246 end:
247 DBUG_EXECUTE_IF("mts_debug_concurrent_access",
248 {
249 while (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER &&
250 mts_debug_concurrent_access < 2 && mts_debug_concurrent_access > 0)
251 {
252 DBUG_PRINT("mts", ("Waiting while locks are acquired to show "
253 "concurrency in mts: %u %u\n",
254 mts_debug_concurrent_access,
255 thd->thread_id()));
256 my_sleep(6000000);
257 }
258 };
259 );
260
261 /*
262 Unlocks and closes the rpl_info table.
263 */
264 error= access->close_table(thd, table, &backup, error) || error;
265 thd->is_operating_substatement_implicitly= false;
266 reenable_binlog(thd);
267 thd->variables.sql_mode= saved_mode;
268 access->drop_thd(thd);
269 DBUG_RETURN(error);
270 }
271
do_remove_info()272 int Rpl_info_table::do_remove_info()
273 {
274 return do_clean_info();
275 }
276
do_clean_info()277 int Rpl_info_table::do_clean_info()
278 {
279 int error= 1;
280 enum enum_return_id res= FOUND_ID;
281 TABLE *table= NULL;
282 sql_mode_t saved_mode;
283 Open_tables_backup backup;
284
285 DBUG_ENTER("Rpl_info_table::do_remove_info");
286
287 THD *thd= access->create_thd();
288
289 saved_mode= thd->variables.sql_mode;
290 tmp_disable_binlog(thd);
291
292 /*
293 Opens and locks the rpl_info table before accessing it.
294 */
295 if (access->open_table(thd, str_schema, str_table,
296 get_number_info(), TL_WRITE,
297 &table, &backup))
298 goto end;
299
300 /*
301 Points the cursor at the row to be deleted according to the
302 keys. If the row is not found, the execution proceeds normally.
303 */
304 if ((res= access->find_info(field_values, table)) == FOUND_ID)
305 {
306 /*
307 Deletes a row in the rpl_info table.
308 */
309 if ((error= table->file->ha_delete_row(table->record[0])))
310 {
311 table->file->print_error(error, MYF(0));
312 goto end;
313 }
314 }
315 error= (res == ERROR_ID);
316 end:
317 /*
318 Unlocks and closes the rpl_info table.
319 */
320 error= access->close_table(thd, table, &backup, error) || error;
321 reenable_binlog(thd);
322 thd->variables.sql_mode= saved_mode;
323 access->drop_thd(thd);
324 DBUG_RETURN(error);
325 }
326
327 /**
328 Removes records belonging to the channel_name parameter's channel.
329
330 @param nparam number of fields in the table
331 @param param_schema schema name
332 @param param_table table name
333 @param channel_name channel name
334 @param channel_field_idx channel name field index
335
336 @return 0 on success
337 1 when a failure happens
338 */
do_reset_info(uint nparam,const char * param_schema,const char * param_table,const char * channel_name,uint channel_field_idx)339 int Rpl_info_table::do_reset_info(uint nparam,
340 const char* param_schema,
341 const char *param_table,
342 const char *channel_name,
343 uint channel_field_idx)
344 {
345 int error= 0;
346 TABLE *table= NULL;
347 sql_mode_t saved_mode;
348 Open_tables_backup backup;
349 Rpl_info_table *info= NULL;
350 THD *thd= NULL;
351 int handler_error= 0;
352
353 DBUG_ENTER("Rpl_info_table::do_reset_info");
354
355 if (!(info= new Rpl_info_table(nparam, param_schema,
356 param_table)))
357 DBUG_RETURN(1);
358
359 thd= info->access->create_thd();
360 saved_mode= thd->variables.sql_mode;
361 tmp_disable_binlog(thd);
362
363 /*
364 Opens and locks the rpl_info table before accessing it.
365 */
366 if (info->access->open_table(thd, info->str_schema, info->str_table,
367 info->get_number_info(), TL_WRITE,
368 &table, &backup))
369 {
370 error= 1;
371 goto end;
372 }
373
374 if (!(handler_error= table->file->ha_index_init(0, 1)))
375 {
376 KEY *key_info= table->key_info;
377
378 /*
379 Currently this method is used only for Worker info table
380 resetting.
381 todo: for another table in future, consider to make use of the
382 passed parameter to locate the lookup key.
383 */
384 assert(strcmp(info->str_table.str, "slave_worker_info") == 0);
385
386 if (info->verify_table_primary_key_fields(table))
387 {
388 error= 1;
389 table->file->ha_index_end();
390 goto end;
391 }
392
393 uint fieldnr= key_info->key_part[0].fieldnr - 1;
394 table->field[fieldnr]->store(channel_name,
395 strlen(channel_name),
396 &my_charset_bin);
397 uint key_len= key_info->key_part[0].store_length;
398 uchar *key_buf= table->field[fieldnr]->ptr;
399
400 if (!(handler_error= table->file->ha_index_read_map(table->record[0],
401 key_buf,
402 (key_part_map) 1,
403 HA_READ_KEY_EXACT)))
404 {
405 do
406 {
407 if ((handler_error= table->file->ha_delete_row(table->record[0])))
408 break;
409 }
410 while (!(handler_error= table->file->ha_index_next_same(table->record[0],
411 key_buf,
412 key_len)));
413 if (handler_error != HA_ERR_END_OF_FILE)
414 error= 1;
415 }
416 else
417 {
418 /*
419 Being reset table can be even empty, and that's benign.
420 */
421 if (handler_error != HA_ERR_KEY_NOT_FOUND)
422 error= 1;
423 }
424
425 if (error)
426 table->file->print_error(handler_error, MYF(0));
427 table->file->ha_index_end();
428 }
429 end:
430 /*
431 Unlocks and closes the rpl_info table.
432 */
433 error= info->access->close_table(thd, table, &backup, error) || error;
434 reenable_binlog(thd);
435 thd->variables.sql_mode= saved_mode;
436 info->access->drop_thd(thd);
437 delete info;
438 DBUG_RETURN(error);
439 }
440
do_check_info()441 enum_return_check Rpl_info_table::do_check_info()
442 {
443 TABLE *table= NULL;
444 sql_mode_t saved_mode;
445 Open_tables_backup backup;
446 enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
447
448 DBUG_ENTER("Rpl_info_table::do_check_info");
449
450 THD *thd= access->create_thd();
451 saved_mode= thd->variables.sql_mode;
452
453 /*
454 Opens and locks the rpl_info table before accessing it.
455 */
456 if (access->open_table(thd, str_schema, str_table,
457 get_number_info(), TL_READ,
458 &table, &backup))
459 {
460 sql_print_warning("Info table is not ready to be used. Table "
461 "'%s.%s' cannot be opened.", str_schema.str,
462 str_table.str);
463
464 return_check= ERROR_CHECKING_REPOSITORY;
465 goto end;
466 }
467
468 /*
469 Points the cursor at the row to be read according to the
470 keys.
471 */
472 if (access->find_info(field_values, table) != FOUND_ID)
473 {
474 /*
475 We cannot simply call my_error here because it does not
476 really means that there was a failure but only that the
477 record was not found.
478 */
479 return_check= REPOSITORY_DOES_NOT_EXIST;
480 goto end;
481 }
482 return_check= REPOSITORY_EXISTS;
483
484
485 end:
486 /*
487 Unlocks and closes the rpl_info table.
488 */
489 access->close_table(thd, table, &backup,
490 return_check == ERROR_CHECKING_REPOSITORY);
491 thd->variables.sql_mode= saved_mode;
492 access->drop_thd(thd);
493 DBUG_RETURN(return_check);
494 }
495
do_check_info(uint instance)496 enum_return_check Rpl_info_table::do_check_info(uint instance)
497 {
498 TABLE *table= NULL;
499 sql_mode_t saved_mode;
500 Open_tables_backup backup;
501 enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
502
503 DBUG_ENTER("Rpl_info_table::do_check_info");
504
505 THD *thd= access->create_thd();
506 saved_mode= thd->variables.sql_mode;
507
508 /*
509 Opens and locks the rpl_info table before accessing it.
510 */
511 if (access->open_table(thd, str_schema, str_table,
512 get_number_info(), TL_READ,
513 &table, &backup))
514 {
515 sql_print_warning("Info table is not ready to be used. Table "
516 "'%s.%s' cannot be opened.", str_schema.str,
517 str_table.str);
518
519 return_check= ERROR_CHECKING_REPOSITORY;
520 goto end;
521 }
522
523 if (verify_table_primary_key_fields(table))
524 {
525 return_check= ERROR_CHECKING_REPOSITORY;
526 goto end;
527 }
528
529 /*
530 Points the cursor at the row to be read according to the
531 keys.
532 */
533 if (access->scan_info(table, instance) != FOUND_ID)
534 {
535 /*
536 We cannot simply call my_error here because it does not
537 really means that there was a failure but only that the
538 record was not found.
539 */
540 return_check= REPOSITORY_DOES_NOT_EXIST;
541 goto end;
542 }
543 return_check= REPOSITORY_EXISTS;
544
545
546 end:
547 /*
548 Unlocks and closes the rpl_info table.
549 */
550 access->close_table(thd, table, &backup,
551 return_check == ERROR_CHECKING_REPOSITORY);
552 thd->variables.sql_mode= saved_mode;
553 access->drop_thd(thd);
554 DBUG_RETURN(return_check);
555 }
556
do_count_info(uint nparam,const char * param_schema,const char * param_table,uint * counter)557 bool Rpl_info_table::do_count_info(uint nparam,
558 const char* param_schema,
559 const char *param_table,
560 uint* counter)
561 {
562 int error= 1;
563 TABLE *table= NULL;
564 sql_mode_t saved_mode;
565 Open_tables_backup backup;
566 Rpl_info_table *info= NULL;
567 THD *thd= NULL;
568
569 DBUG_ENTER("Rpl_info_table::do_count_info");
570
571 if (!(info= new Rpl_info_table(nparam, param_schema, param_table)))
572 DBUG_RETURN(true);
573
574 thd= info->access->create_thd();
575 saved_mode= thd->variables.sql_mode;
576
577 /*
578 Opens and locks the rpl_info table before accessing it.
579 */
580 if (info->access->open_table(thd, info->str_schema, info->str_table,
581 info->get_number_info(), TL_READ,
582 &table, &backup))
583 {
584 /*
585 We cannot simply print out a warning message at this
586 point because this may represent a bootstrap.
587 */
588 error= 0;
589 goto end;
590 }
591
592 /*
593 Counts entries in the rpl_info table.
594 */
595 if (info->access->count_info(table, counter))
596 {
597 sql_print_warning("Info table is not ready to be used. Table "
598 "'%s.%s' cannot be scanned.", info->str_schema.str,
599 info->str_table.str);
600 goto end;
601 }
602 error= 0;
603
604 end:
605 /*
606 Unlocks and closes the rpl_info table.
607 */
608 error= info->access->close_table(thd, table, &backup, error) || error;
609 thd->variables.sql_mode= saved_mode;
610 info->access->drop_thd(thd);
611 delete info;
612 DBUG_RETURN(error);
613 }
614
do_end_info()615 void Rpl_info_table::do_end_info()
616 {
617 }
618
do_prepare_info_for_read()619 int Rpl_info_table::do_prepare_info_for_read()
620 {
621 if (!field_values)
622 return TRUE;
623
624 cursor= 0;
625 prv_error= FALSE;
626
627 return FALSE;
628 }
629
do_prepare_info_for_write()630 int Rpl_info_table::do_prepare_info_for_write()
631 {
632 return(do_prepare_info_for_read());
633 }
634
do_get_rpl_info_type()635 uint Rpl_info_table::do_get_rpl_info_type()
636 {
637 return INFO_REPOSITORY_TABLE;
638 }
639
do_set_info(const int pos,const char * value)640 bool Rpl_info_table::do_set_info(const int pos, const char *value)
641 {
642 return (field_values->value[pos].copy(value, strlen(value),
643 &my_charset_bin));
644 }
645
do_set_info(const int pos,const uchar * value,const size_t size)646 bool Rpl_info_table::do_set_info(const int pos, const uchar *value,
647 const size_t size)
648 {
649 return (field_values->value[pos].copy((char *) value, size,
650 &my_charset_bin));
651 }
652
do_set_info(const int pos,const ulong value)653 bool Rpl_info_table::do_set_info(const int pos, const ulong value)
654 {
655 return (field_values->value[pos].set_int(value, TRUE,
656 &my_charset_bin));
657 }
658
do_set_info(const int pos,const int value)659 bool Rpl_info_table::do_set_info(const int pos, const int value)
660 {
661 return (field_values->value[pos].set_int(value, FALSE,
662 &my_charset_bin));
663 }
664
do_set_info(const int pos,const float value)665 bool Rpl_info_table::do_set_info(const int pos, const float value)
666 {
667 return (field_values->value[pos].set_real(value, NOT_FIXED_DEC,
668 &my_charset_bin));
669 }
670
do_set_info(const int pos,const Server_ids * value)671 bool Rpl_info_table::do_set_info(const int pos, const Server_ids *value)
672 {
673 if (const_cast<Server_ids*>(value)->pack_dynamic_ids(&field_values->value[pos]))
674 return TRUE;
675
676 return FALSE;
677 }
678
do_get_info(const int pos,char * value,const size_t size,const char * default_value)679 bool Rpl_info_table::do_get_info(const int pos, char *value, const size_t size,
680 const char *default_value)
681 {
682 if (field_values->value[pos].length())
683 strmake(value, field_values->value[pos].c_ptr_safe(),
684 field_values->value[pos].length());
685 else if (default_value)
686 strmake(value, default_value, strlen(default_value));
687 else
688 *value= '\0';
689
690 return FALSE;
691 }
692
do_get_info(const int pos,uchar * value,const size_t size,const uchar * default_value MY_ATTRIBUTE ((unused)))693 bool Rpl_info_table::do_get_info(const int pos, uchar *value, const size_t size,
694 const uchar *default_value MY_ATTRIBUTE((unused)))
695 {
696 if (field_values->value[pos].length() == size)
697 return (!memcpy((char *) value,
698 field_values->value[pos].c_ptr_safe(), size));
699 return TRUE;
700 }
701
do_get_info(const int pos,ulong * value,const ulong default_value)702 bool Rpl_info_table::do_get_info(const int pos, ulong *value,
703 const ulong default_value)
704 {
705 if (field_values->value[pos].length())
706 {
707 *value= strtoul(field_values->value[pos].c_ptr_safe(), 0, 10);
708 return FALSE;
709 }
710 else if (default_value)
711 {
712 *value= default_value;
713 return FALSE;
714 }
715
716 return TRUE;
717 }
718
do_get_info(const int pos,int * value,const int default_value)719 bool Rpl_info_table::do_get_info(const int pos, int *value,
720 const int default_value)
721 {
722 if (field_values->value[pos].length())
723 {
724 *value= atoi(field_values->value[pos].c_ptr_safe());
725 return FALSE;
726 }
727 else if (default_value)
728 {
729 *value= default_value;
730 return FALSE;
731 }
732
733 return TRUE;
734 }
735
do_get_info(const int pos,float * value,const float default_value)736 bool Rpl_info_table::do_get_info(const int pos, float *value,
737 const float default_value)
738 {
739 if (field_values->value[pos].length())
740 {
741 if (sscanf(field_values->value[pos].c_ptr_safe(), "%f", value) != 1)
742 return TRUE;
743 return FALSE;
744 }
745 else if (default_value != 0.0)
746 {
747 *value= default_value;
748 return FALSE;
749 }
750
751 return TRUE;
752 }
753
do_get_info(const int pos,Server_ids * value,const Server_ids * default_value MY_ATTRIBUTE ((unused)))754 bool Rpl_info_table::do_get_info(const int pos, Server_ids *value,
755 const Server_ids *default_value MY_ATTRIBUTE((unused)))
756 {
757 if (value->unpack_dynamic_ids(field_values->value[pos].c_ptr_safe()))
758 return TRUE;
759
760 return FALSE;
761 }
762
do_get_description_info()763 char* Rpl_info_table::do_get_description_info()
764 {
765 return description;
766 }
767
do_is_transactional()768 bool Rpl_info_table::do_is_transactional()
769 {
770 return is_transactional;
771 }
772
do_update_is_transactional()773 bool Rpl_info_table::do_update_is_transactional()
774 {
775 bool error= TRUE;
776 sql_mode_t saved_mode;
777 TABLE *table= NULL;
778 Open_tables_backup backup;
779
780 DBUG_ENTER("Rpl_info_table::do_update_is_transactional");
781 DBUG_EXECUTE_IF("simulate_update_is_transactional_error",
782 {
783 DBUG_RETURN(TRUE);
784 });
785
786 THD *thd= access->create_thd();
787 saved_mode= thd->variables.sql_mode;
788 tmp_disable_binlog(thd);
789
790 /*
791 Opens and locks the rpl_info table before accessing it.
792 */
793 if (access->open_table(thd, str_schema, str_table,
794 get_number_info(), TL_READ,
795 &table, &backup))
796 goto end;
797
798 is_transactional= table->file->has_transactions();
799 error= FALSE;
800
801 end:
802 error= access->close_table(thd, table, &backup, 0) || error;
803 reenable_binlog(thd);
804 thd->variables.sql_mode= saved_mode;
805 access->drop_thd(thd);
806 DBUG_RETURN(error);
807 }
808
verify_table_primary_key_fields(TABLE * table)809 bool Rpl_info_table::verify_table_primary_key_fields(TABLE *table)
810 {
811 DBUG_ENTER("Rpl_info_table::verify_table_primary_key_fields");
812 KEY *key_info= table->key_info;
813 bool error;
814
815 /*
816 If the table has no keys or has less key fields than expected,
817 it must be corrupted.
818 */
819 if ((error= !key_info ||
820 key_info->user_defined_key_parts == 0 ||
821 (m_n_pk_fields > 0 &&
822 key_info->user_defined_key_parts != m_n_pk_fields)))
823 {
824 sql_print_error("Corrupted table %s.%s. Check out table definition.",
825 str_schema.str, str_table.str);
826 }
827
828 if (!error && m_n_pk_fields && m_pk_field_indexes)
829 {
830 /*
831 If any of its primary key fields are not at the expected position,
832 the table must be corrupted.
833 */
834 for (uint idx= 0; idx < m_n_pk_fields; idx++)
835 {
836 if (key_info->key_part[idx].field != table->field[m_pk_field_indexes[idx]])
837 {
838 const char *key_field_name= key_info->key_part[idx].field->field_name;
839 const char *table_field_name= table->field[m_pk_field_indexes[idx]]->field_name;
840 sql_print_error("Info table has a problem with its key field(s). "
841 "Table '%s.%s' expected field #%u to be '%s' but "
842 "found '%s' instead.",
843 str_schema.str, str_table.str,
844 m_pk_field_indexes[idx],
845 key_field_name,
846 table_field_name);
847 error= true;
848 break;
849 }
850 }
851 }
852
853 DBUG_RETURN(error);
854 }
855