1 /* Copyright (c) 2010, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "rpl_info_table.h"
24 
25 #include "dynamic_ids.h"            // Server_ids
26 #include "log.h"                    // sql_print_error
27 #include "rpl_info_table_access.h"  // Rpl_info_table_access
28 #include "rpl_info_values.h"        // Rpl_info_values
29 #include "sql_class.h"              // THD
30 
31 
Rpl_info_table(uint nparam,const char * param_schema,const char * param_table,const uint param_n_pk_fields,const uint * param_pk_field_indexes)32 Rpl_info_table::Rpl_info_table(uint nparam,
33                                const char* param_schema,
34                                const char *param_table,
35                                const uint param_n_pk_fields,
36                                const uint *param_pk_field_indexes)
37 :Rpl_info_handler(nparam), is_transactional(FALSE)
38 {
39   str_schema.str= str_table.str= NULL;
40   str_schema.length= str_table.length= 0;
41 
42   size_t schema_length= strlen(param_schema);
43   if ((str_schema.str= (char *) my_malloc(key_memory_Rpl_info_table,
44                                           schema_length + 1, MYF(0))))
45   {
46     str_schema.length= schema_length;
47     strmake(str_schema.str, param_schema, schema_length);
48   }
49 
50   size_t table_length= strlen(param_table);
51   if ((str_table.str= (char *) my_malloc(key_memory_Rpl_info_table,
52                                          table_length + 1, MYF(0))))
53   {
54     str_table.length= table_length;
55     strmake(str_table.str, param_table, table_length);
56   }
57 
58   if ((description= (char *)
59       my_malloc(key_memory_Rpl_info_table,
60                 str_schema.length + str_table.length + 2, MYF(0))))
61   {
62     char *pos= my_stpcpy(description, param_schema);
63     pos= my_stpcpy(pos, ".");
64     pos= my_stpcpy(pos, param_table);
65   }
66 
67   m_n_pk_fields= param_n_pk_fields;
68   m_pk_field_indexes= param_pk_field_indexes;
69 
70   access= new Rpl_info_table_access();
71 }
72 
~Rpl_info_table()73 Rpl_info_table::~Rpl_info_table()
74 {
75   delete access;
76 
77   my_free(description);
78 
79   my_free(str_table.str);
80 
81   my_free(str_schema.str);
82 }
83 
do_init_info()84 int Rpl_info_table::do_init_info()
85 {
86   return do_init_info(FIND_KEY, 0);
87 }
88 
do_init_info(uint instance)89 int Rpl_info_table::do_init_info(uint instance)
90 {
91   return do_init_info(FIND_KEY, instance);
92 }
93 
do_init_info(enum_find_method method,uint instance)94 int Rpl_info_table::do_init_info(enum_find_method method, uint instance)
95 {
96   int error= 1;
97   enum enum_return_id res= FOUND_ID;
98   TABLE *table= NULL;
99   sql_mode_t saved_mode;
100   Open_tables_backup backup;
101 
102   DBUG_ENTER("Rlp_info_table::do_init_info");
103 
104   THD *thd= access->create_thd();
105 
106   saved_mode= thd->variables.sql_mode;
107   tmp_disable_binlog(thd);
108 
109   /*
110     Opens and locks the rpl_info table before accessing it.
111   */
112   if (access->open_table(thd, str_schema, str_table,
113                          get_number_info(), TL_WRITE,
114                          &table, &backup))
115     goto end;
116 
117   if (verify_table_primary_key_fields(table))
118     goto end;
119 
120   /*
121     Points the cursor at the row to be read according to the
122     keys.
123   */
124   switch (method)
125   {
126     case FIND_KEY:
127       res= access->find_info(field_values, table);
128     break;
129 
130     case FIND_SCAN:
131       res= access->scan_info(table, instance);
132     break;
133 
134     default:
135       assert(0);
136     break;
137   }
138 
139   if (res == FOUND_ID)
140   {
141     /*
142       Reads the information stored in the rpl_info table into a
143       set of variables. If there is a failure, an error is returned.
144     */
145     if (access->load_info_values(get_number_info(), table->field,
146                                  field_values))
147       goto end;
148   }
149   error= (res == ERROR_ID);
150 end:
151   /*
152     Unlocks and closes the rpl_info table.
153   */
154   error= access->close_table(thd, table, &backup, error) || error;
155   reenable_binlog(thd);
156   thd->variables.sql_mode= saved_mode;
157   access->drop_thd(thd);
158   DBUG_RETURN(error);
159 }
160 
do_flush_info(const bool force)161 int Rpl_info_table::do_flush_info(const bool force)
162 {
163   int error= 1;
164   enum enum_return_id res= FOUND_ID;
165   TABLE *table= NULL;
166   sql_mode_t saved_mode;
167   Open_tables_backup backup;
168 
169   DBUG_ENTER("Rpl_info_table::do_flush_info");
170 
171   if (!(force || (sync_period &&
172       ++(sync_counter) >= sync_period)))
173     DBUG_RETURN(0);
174 
175   THD *thd= access->create_thd();
176 
177   sync_counter= 0;
178   saved_mode= thd->variables.sql_mode;
179   tmp_disable_binlog(thd);
180   thd->is_operating_substatement_implicitly= true;
181 
182   /*
183     Opens and locks the rpl_info table before accessing it.
184   */
185   if (access->open_table(thd, str_schema, str_table,
186                          get_number_info(), TL_WRITE,
187                          &table, &backup))
188     goto end;
189 
190   /*
191     Points the cursor at the row to be read according to the
192     keys. If the row is not found an error is reported.
193   */
194   if ((res= access->find_info(field_values, table)) == NOT_FOUND_ID)
195   {
196     /*
197       Prepares the information to be stored before calling ha_write_row.
198     */
199     empty_record(table);
200     if (access->store_info_values(get_number_info(), table->field,
201                                   field_values))
202       goto end;
203 
204     /*
205       Inserts a new row into rpl_info table.
206     */
207     if ((error= table->file->ha_write_row(table->record[0])))
208     {
209       table->file->print_error(error, MYF(0));
210       /*
211         This makes sure that the error is 1 and not the status returned
212         by the handler.
213       */
214       error= 1;
215       goto end;
216     }
217     error= 0;
218   }
219   else if (res == FOUND_ID)
220   {
221     /*
222       Prepares the information to be stored before calling ha_update_row.
223     */
224     store_record(table, record[1]);
225     if (access->store_info_values(get_number_info(), table->field,
226                                   field_values))
227       goto end;
228 
229     /*
230       Updates a row in the rpl_info table.
231     */
232     if ((error= table->file->ha_update_row(table->record[1], table->record[0])) &&
233         error != HA_ERR_RECORD_IS_THE_SAME)
234     {
235       table->file->print_error(error, MYF(0));
236       /*
237         This makes sure that the error is 1 and not the status returned
238         by the handler.
239       */
240       error= 1;
241       goto end;
242     }
243     error= 0;
244   }
245 
246 end:
247   DBUG_EXECUTE_IF("mts_debug_concurrent_access",
248     {
249       while (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER &&
250              mts_debug_concurrent_access < 2 && mts_debug_concurrent_access >  0)
251       {
252         DBUG_PRINT("mts", ("Waiting while locks are acquired to show "
253                            "concurrency in mts: %u %u\n",
254                            mts_debug_concurrent_access,
255                            thd->thread_id()));
256         my_sleep(6000000);
257       }
258     };
259   );
260 
261   /*
262     Unlocks and closes the rpl_info table.
263   */
264   error= access->close_table(thd, table, &backup, error) || error;
265   thd->is_operating_substatement_implicitly= false;
266   reenable_binlog(thd);
267   thd->variables.sql_mode= saved_mode;
268   access->drop_thd(thd);
269   DBUG_RETURN(error);
270 }
271 
do_remove_info()272 int Rpl_info_table::do_remove_info()
273 {
274   return do_clean_info();
275 }
276 
do_clean_info()277 int Rpl_info_table::do_clean_info()
278 {
279   int error= 1;
280   enum enum_return_id res= FOUND_ID;
281   TABLE *table= NULL;
282   sql_mode_t saved_mode;
283   Open_tables_backup backup;
284 
285   DBUG_ENTER("Rpl_info_table::do_remove_info");
286 
287   THD *thd= access->create_thd();
288 
289   saved_mode= thd->variables.sql_mode;
290   tmp_disable_binlog(thd);
291 
292   /*
293     Opens and locks the rpl_info table before accessing it.
294   */
295   if (access->open_table(thd, str_schema, str_table,
296                          get_number_info(), TL_WRITE,
297                          &table, &backup))
298     goto end;
299 
300   /*
301     Points the cursor at the row to be deleted according to the
302     keys. If the row is not found, the execution proceeds normally.
303   */
304   if ((res= access->find_info(field_values, table)) == FOUND_ID)
305   {
306     /*
307       Deletes a row in the rpl_info table.
308     */
309     if ((error= table->file->ha_delete_row(table->record[0])))
310     {
311       table->file->print_error(error, MYF(0));
312       goto end;
313     }
314   }
315   error= (res == ERROR_ID);
316 end:
317   /*
318     Unlocks and closes the rpl_info table.
319   */
320   error= access->close_table(thd, table, &backup, error) || error;
321   reenable_binlog(thd);
322   thd->variables.sql_mode= saved_mode;
323   access->drop_thd(thd);
324   DBUG_RETURN(error);
325 }
326 
327 /**
328    Removes records belonging to the channel_name parameter's channel.
329 
330    @param nparam             number of fields in the table
331    @param param_schema       schema name
332    @param param_table        table name
333    @param channel_name       channel name
334    @param channel_field_idx  channel name field index
335 
336    @return 0   on success
337            1   when a failure happens
338 */
do_reset_info(uint nparam,const char * param_schema,const char * param_table,const char * channel_name,uint channel_field_idx)339 int Rpl_info_table::do_reset_info(uint nparam,
340                                   const char* param_schema,
341                                   const char *param_table,
342                                   const char *channel_name,
343                                   uint  channel_field_idx)
344 {
345   int error= 0;
346   TABLE *table= NULL;
347   sql_mode_t saved_mode;
348   Open_tables_backup backup;
349   Rpl_info_table *info= NULL;
350   THD *thd= NULL;
351   int handler_error= 0;
352 
353   DBUG_ENTER("Rpl_info_table::do_reset_info");
354 
355   if (!(info= new Rpl_info_table(nparam, param_schema,
356                                  param_table)))
357     DBUG_RETURN(1);
358 
359   thd= info->access->create_thd();
360   saved_mode= thd->variables.sql_mode;
361   tmp_disable_binlog(thd);
362 
363   /*
364     Opens and locks the rpl_info table before accessing it.
365   */
366   if (info->access->open_table(thd, info->str_schema, info->str_table,
367                                info->get_number_info(), TL_WRITE,
368                                &table, &backup))
369   {
370     error= 1;
371     goto end;
372   }
373 
374   if (!(handler_error= table->file->ha_index_init(0, 1)))
375   {
376     KEY *key_info= table->key_info;
377 
378     /*
379       Currently this method is used only for Worker info table
380       resetting.
381       todo: for another table in future, consider to make use of the
382       passed parameter to locate the lookup key.
383     */
384     assert(strcmp(info->str_table.str, "slave_worker_info") == 0);
385 
386     if (info->verify_table_primary_key_fields(table))
387     {
388       error= 1;
389       table->file->ha_index_end();
390       goto end;
391     }
392 
393     uint fieldnr= key_info->key_part[0].fieldnr - 1;
394     table->field[fieldnr]->store(channel_name,
395                                  strlen(channel_name),
396                                  &my_charset_bin);
397     uint key_len= key_info->key_part[0].store_length;
398     uchar *key_buf= table->field[fieldnr]->ptr;
399 
400     if (!(handler_error= table->file->ha_index_read_map(table->record[0],
401                                                         key_buf,
402                                                         (key_part_map) 1,
403                                                         HA_READ_KEY_EXACT)))
404     {
405       do
406       {
407         if ((handler_error= table->file->ha_delete_row(table->record[0])))
408           break;
409       }
410       while (!(handler_error= table->file->ha_index_next_same(table->record[0],
411                                                            key_buf,
412                                                            key_len)));
413       if (handler_error != HA_ERR_END_OF_FILE)
414         error= 1;
415     }
416     else
417     {
418       /*
419         Being reset table can be even empty, and that's benign.
420       */
421       if (handler_error != HA_ERR_KEY_NOT_FOUND)
422         error= 1;
423     }
424 
425     if (error)
426       table->file->print_error(handler_error, MYF(0));
427     table->file->ha_index_end();
428   }
429 end:
430   /*
431     Unlocks and closes the rpl_info table.
432   */
433   error= info->access->close_table(thd, table, &backup, error) || error;
434   reenable_binlog(thd);
435   thd->variables.sql_mode= saved_mode;
436   info->access->drop_thd(thd);
437   delete info;
438   DBUG_RETURN(error);
439 }
440 
do_check_info()441 enum_return_check Rpl_info_table::do_check_info()
442 {
443   TABLE *table= NULL;
444   sql_mode_t saved_mode;
445   Open_tables_backup backup;
446   enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
447 
448   DBUG_ENTER("Rpl_info_table::do_check_info");
449 
450   THD *thd= access->create_thd();
451   saved_mode= thd->variables.sql_mode;
452 
453   /*
454     Opens and locks the rpl_info table before accessing it.
455   */
456   if (access->open_table(thd, str_schema, str_table,
457                          get_number_info(), TL_READ,
458                          &table, &backup))
459   {
460     sql_print_warning("Info table is not ready to be used. Table "
461                       "'%s.%s' cannot be opened.", str_schema.str,
462                       str_table.str);
463 
464     return_check= ERROR_CHECKING_REPOSITORY;
465     goto end;
466   }
467 
468   /*
469     Points the cursor at the row to be read according to the
470     keys.
471   */
472   if (access->find_info(field_values, table) != FOUND_ID)
473   {
474     /*
475        We cannot simply call my_error here because it does not
476        really means that there was a failure but only that the
477        record was not found.
478     */
479     return_check= REPOSITORY_DOES_NOT_EXIST;
480     goto end;
481   }
482   return_check= REPOSITORY_EXISTS;
483 
484 
485 end:
486   /*
487     Unlocks and closes the rpl_info table.
488   */
489   access->close_table(thd, table, &backup,
490                       return_check == ERROR_CHECKING_REPOSITORY);
491   thd->variables.sql_mode= saved_mode;
492   access->drop_thd(thd);
493   DBUG_RETURN(return_check);
494 }
495 
do_check_info(uint instance)496 enum_return_check Rpl_info_table::do_check_info(uint instance)
497 {
498   TABLE *table= NULL;
499   sql_mode_t saved_mode;
500   Open_tables_backup backup;
501   enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
502 
503   DBUG_ENTER("Rpl_info_table::do_check_info");
504 
505   THD *thd= access->create_thd();
506   saved_mode= thd->variables.sql_mode;
507 
508   /*
509     Opens and locks the rpl_info table before accessing it.
510   */
511   if (access->open_table(thd, str_schema, str_table,
512                          get_number_info(), TL_READ,
513                          &table, &backup))
514   {
515     sql_print_warning("Info table is not ready to be used. Table "
516                       "'%s.%s' cannot be opened.", str_schema.str,
517                       str_table.str);
518 
519     return_check= ERROR_CHECKING_REPOSITORY;
520     goto end;
521   }
522 
523   if (verify_table_primary_key_fields(table))
524   {
525     return_check= ERROR_CHECKING_REPOSITORY;
526     goto end;
527   }
528 
529   /*
530     Points the cursor at the row to be read according to the
531     keys.
532   */
533   if (access->scan_info(table, instance) != FOUND_ID)
534   {
535     /*
536        We cannot simply call my_error here because it does not
537        really means that there was a failure but only that the
538        record was not found.
539     */
540     return_check= REPOSITORY_DOES_NOT_EXIST;
541     goto end;
542   }
543   return_check= REPOSITORY_EXISTS;
544 
545 
546 end:
547   /*
548     Unlocks and closes the rpl_info table.
549   */
550   access->close_table(thd, table, &backup,
551                       return_check == ERROR_CHECKING_REPOSITORY);
552   thd->variables.sql_mode= saved_mode;
553   access->drop_thd(thd);
554   DBUG_RETURN(return_check);
555 }
556 
do_count_info(uint nparam,const char * param_schema,const char * param_table,uint * counter)557 bool Rpl_info_table::do_count_info(uint nparam,
558                                    const char* param_schema,
559                                    const char *param_table,
560                                    uint* counter)
561 {
562   int error= 1;
563   TABLE *table= NULL;
564   sql_mode_t saved_mode;
565   Open_tables_backup backup;
566   Rpl_info_table *info= NULL;
567   THD *thd= NULL;
568 
569   DBUG_ENTER("Rpl_info_table::do_count_info");
570 
571   if (!(info= new Rpl_info_table(nparam, param_schema, param_table)))
572     DBUG_RETURN(true);
573 
574   thd= info->access->create_thd();
575   saved_mode= thd->variables.sql_mode;
576 
577   /*
578     Opens and locks the rpl_info table before accessing it.
579   */
580   if (info->access->open_table(thd, info->str_schema, info->str_table,
581                                info->get_number_info(), TL_READ,
582                                &table, &backup))
583   {
584     /*
585       We cannot simply print out a warning message at this
586       point because this may represent a bootstrap.
587     */
588     error= 0;
589     goto end;
590   }
591 
592   /*
593     Counts entries in the rpl_info table.
594   */
595   if (info->access->count_info(table, counter))
596   {
597     sql_print_warning("Info table is not ready to be used. Table "
598                       "'%s.%s' cannot be scanned.", info->str_schema.str,
599                       info->str_table.str);
600     goto end;
601   }
602   error= 0;
603 
604 end:
605   /*
606     Unlocks and closes the rpl_info table.
607   */
608   error= info->access->close_table(thd, table, &backup, error) || error;
609   thd->variables.sql_mode= saved_mode;
610   info->access->drop_thd(thd);
611   delete info;
612   DBUG_RETURN(error);
613 }
614 
do_end_info()615 void Rpl_info_table::do_end_info()
616 {
617 }
618 
do_prepare_info_for_read()619 int Rpl_info_table::do_prepare_info_for_read()
620 {
621   if (!field_values)
622     return TRUE;
623 
624   cursor= 0;
625   prv_error= FALSE;
626 
627   return FALSE;
628 }
629 
do_prepare_info_for_write()630 int Rpl_info_table::do_prepare_info_for_write()
631 {
632   return(do_prepare_info_for_read());
633 }
634 
do_get_rpl_info_type()635 uint Rpl_info_table::do_get_rpl_info_type()
636 {
637   return INFO_REPOSITORY_TABLE;
638 }
639 
do_set_info(const int pos,const char * value)640 bool Rpl_info_table::do_set_info(const int pos, const char *value)
641 {
642   return (field_values->value[pos].copy(value, strlen(value),
643                                         &my_charset_bin));
644 }
645 
do_set_info(const int pos,const uchar * value,const size_t size)646 bool Rpl_info_table::do_set_info(const int pos, const uchar *value,
647                                  const size_t size)
648 {
649   return (field_values->value[pos].copy((char *) value, size,
650                                         &my_charset_bin));
651 }
652 
do_set_info(const int pos,const ulong value)653 bool Rpl_info_table::do_set_info(const int pos, const ulong value)
654 {
655   return (field_values->value[pos].set_int(value, TRUE,
656                                            &my_charset_bin));
657 }
658 
do_set_info(const int pos,const int value)659 bool Rpl_info_table::do_set_info(const int pos, const int value)
660 {
661   return (field_values->value[pos].set_int(value, FALSE,
662                                            &my_charset_bin));
663 }
664 
do_set_info(const int pos,const float value)665 bool Rpl_info_table::do_set_info(const int pos, const float value)
666 {
667   return (field_values->value[pos].set_real(value, NOT_FIXED_DEC,
668                                             &my_charset_bin));
669 }
670 
do_set_info(const int pos,const Server_ids * value)671 bool Rpl_info_table::do_set_info(const int pos, const Server_ids *value)
672 {
673   if (const_cast<Server_ids*>(value)->pack_dynamic_ids(&field_values->value[pos]))
674     return TRUE;
675 
676   return FALSE;
677 }
678 
do_get_info(const int pos,char * value,const size_t size,const char * default_value)679 bool Rpl_info_table::do_get_info(const int pos, char *value, const size_t size,
680                                  const char *default_value)
681 {
682   if (field_values->value[pos].length())
683     strmake(value, field_values->value[pos].c_ptr_safe(),
684             field_values->value[pos].length());
685   else if (default_value)
686     strmake(value, default_value, strlen(default_value));
687   else
688     *value= '\0';
689 
690   return FALSE;
691 }
692 
do_get_info(const int pos,uchar * value,const size_t size,const uchar * default_value MY_ATTRIBUTE ((unused)))693 bool Rpl_info_table::do_get_info(const int pos, uchar *value, const size_t size,
694                                  const uchar *default_value MY_ATTRIBUTE((unused)))
695 {
696   if (field_values->value[pos].length() == size)
697     return (!memcpy((char *) value,
698             field_values->value[pos].c_ptr_safe(), size));
699   return TRUE;
700 }
701 
do_get_info(const int pos,ulong * value,const ulong default_value)702 bool Rpl_info_table::do_get_info(const int pos, ulong *value,
703                                  const ulong default_value)
704 {
705   if (field_values->value[pos].length())
706   {
707     *value= strtoul(field_values->value[pos].c_ptr_safe(), 0, 10);
708     return FALSE;
709   }
710   else if (default_value)
711   {
712     *value= default_value;
713     return FALSE;
714   }
715 
716   return TRUE;
717 }
718 
do_get_info(const int pos,int * value,const int default_value)719 bool Rpl_info_table::do_get_info(const int pos, int *value,
720                                  const int default_value)
721 {
722   if (field_values->value[pos].length())
723   {
724     *value=  atoi(field_values->value[pos].c_ptr_safe());
725     return FALSE;
726   }
727   else if (default_value)
728   {
729     *value= default_value;
730     return FALSE;
731   }
732 
733   return TRUE;
734 }
735 
do_get_info(const int pos,float * value,const float default_value)736 bool Rpl_info_table::do_get_info(const int pos, float *value,
737                                  const float default_value)
738 {
739   if (field_values->value[pos].length())
740   {
741     if (sscanf(field_values->value[pos].c_ptr_safe(), "%f", value) != 1)
742       return TRUE;
743     return FALSE;
744   }
745   else if (default_value != 0.0)
746   {
747     *value= default_value;
748     return FALSE;
749   }
750 
751   return TRUE;
752 }
753 
do_get_info(const int pos,Server_ids * value,const Server_ids * default_value MY_ATTRIBUTE ((unused)))754 bool Rpl_info_table::do_get_info(const int pos, Server_ids *value,
755                                  const Server_ids *default_value MY_ATTRIBUTE((unused)))
756 {
757   if (value->unpack_dynamic_ids(field_values->value[pos].c_ptr_safe()))
758     return TRUE;
759 
760   return FALSE;
761 }
762 
do_get_description_info()763 char* Rpl_info_table::do_get_description_info()
764 {
765   return description;
766 }
767 
do_is_transactional()768 bool Rpl_info_table::do_is_transactional()
769 {
770   return is_transactional;
771 }
772 
do_update_is_transactional()773 bool Rpl_info_table::do_update_is_transactional()
774 {
775   bool error= TRUE;
776   sql_mode_t saved_mode;
777   TABLE *table= NULL;
778   Open_tables_backup backup;
779 
780   DBUG_ENTER("Rpl_info_table::do_update_is_transactional");
781   DBUG_EXECUTE_IF("simulate_update_is_transactional_error",
782                   {
783                     DBUG_RETURN(TRUE);
784                   });
785 
786   THD *thd= access->create_thd();
787   saved_mode= thd->variables.sql_mode;
788   tmp_disable_binlog(thd);
789 
790   /*
791     Opens and locks the rpl_info table before accessing it.
792   */
793   if (access->open_table(thd, str_schema, str_table,
794                          get_number_info(), TL_READ,
795                          &table, &backup))
796     goto end;
797 
798   is_transactional= table->file->has_transactions();
799   error= FALSE;
800 
801 end:
802   error= access->close_table(thd, table, &backup, 0) || error;
803   reenable_binlog(thd);
804   thd->variables.sql_mode= saved_mode;
805   access->drop_thd(thd);
806   DBUG_RETURN(error);
807 }
808 
verify_table_primary_key_fields(TABLE * table)809 bool Rpl_info_table::verify_table_primary_key_fields(TABLE *table)
810 {
811   DBUG_ENTER("Rpl_info_table::verify_table_primary_key_fields");
812   KEY *key_info= table->key_info;
813   bool error;
814 
815   /*
816     If the table has no keys or has less key fields than expected,
817     it must be corrupted.
818   */
819   if ((error= !key_info ||
820               key_info->user_defined_key_parts == 0 ||
821               (m_n_pk_fields > 0 &&
822                key_info->user_defined_key_parts != m_n_pk_fields)))
823   {
824     sql_print_error("Corrupted table %s.%s. Check out table definition.",
825                     str_schema.str, str_table.str);
826   }
827 
828   if (!error && m_n_pk_fields && m_pk_field_indexes)
829   {
830     /*
831       If any of its primary key fields are not at the expected position,
832       the table must be corrupted.
833     */
834     for (uint idx= 0; idx < m_n_pk_fields; idx++)
835     {
836       if (key_info->key_part[idx].field != table->field[m_pk_field_indexes[idx]])
837       {
838         const char *key_field_name= key_info->key_part[idx].field->field_name;
839         const char *table_field_name= table->field[m_pk_field_indexes[idx]]->field_name;
840         sql_print_error("Info table has a problem with its key field(s). "
841                         "Table '%s.%s' expected field #%u to be '%s' but "
842                         "found '%s' instead.",
843                         str_schema.str, str_table.str,
844                         m_pk_field_indexes[idx],
845                         key_field_name,
846                         table_field_name);
847         error= true;
848         break;
849       }
850     }
851   }
852 
853   DBUG_RETURN(error);
854 }
855