1 /* Copyright (c) 2010, 2014, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "rpl_info_table_access.h"
24 #include "rpl_utility.h"
25 #include "handler.h"
26 #include "sql_parse.h"
27 
28 /**
29   Opens and locks a table.
30 
31   It's assumed that the caller knows what they are doing:
32   - whether it was necessary to reset-and-backup the open tables state
33   - whether the requested lock does not lead to a deadlock
34   - whether this open mode would work under LOCK TABLES, or inside a
35   stored function or trigger.
36 
37   Note that if the table can't be locked successfully this operation will
38   close it. Therefore it provides guarantee that it either opens and locks
39   table or fails without leaving any tables open.
40 
41   @param[in]  thd           Thread requesting to open the table
42   @param[in]  dbstr         Database where the table resides
43   @param[in]  tbstr         Table to be openned
44   @param[in]  max_num_field Maximum number of fields
45   @param[in]  lock_type     How to lock the table
46   @param[out] table         We will store the open table here
47   @param[out] backup        Save the lock info. here
48 
49   @return
50     @retval TRUE open and lock failed - an error message is pushed into the
51                                         stack
52     @retval FALSE success
53 */
open_table(THD * thd,const LEX_STRING dbstr,const LEX_STRING tbstr,uint max_num_field,enum thr_lock_type lock_type,TABLE ** table,Open_tables_backup * backup)54 bool Rpl_info_table_access::open_table(THD* thd, const LEX_STRING dbstr,
55                                        const LEX_STRING tbstr,
56                                        uint max_num_field,
57                                        enum thr_lock_type lock_type,
58                                        TABLE** table,
59                                        Open_tables_backup* backup)
60 {
61   TABLE_LIST tables;
62   Query_tables_list query_tables_list_backup;
63 
64   uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
65                MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
66                MYSQL_OPEN_IGNORE_FLUSH |
67                MYSQL_LOCK_IGNORE_TIMEOUT |
68                MYSQL_LOCK_RPL_INFO_TABLE);
69 
70   DBUG_ENTER("Rpl_info_table_access::open_table");
71 
72   /*
73     This is equivalent to a new "statement". For that reason, we call both
74     lex_start() and mysql_reset_thd_for_next_command.
75   */
76   if (thd->slave_thread || !current_thd)
77   {
78     lex_start(thd);
79     mysql_reset_thd_for_next_command(thd);
80   }
81 
82   /*
83     We need to use new Open_tables_state in order not to be affected
84     by LOCK TABLES/prelocked mode.
85     Also in order not to break execution of current statement we also
86     have to backup/reset/restore Query_tables_list part of LEX, which
87     is accessed and updated in the process of opening and locking
88     tables.
89   */
90   thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
91   thd->reset_n_backup_open_tables_state(backup);
92 
93   tables.init_one_table(dbstr.str, dbstr.length, tbstr.str, tbstr.length,
94                         tbstr.str, lock_type);
95 
96   if (!open_n_lock_single_table(thd, &tables, tables.lock_type, flags))
97   {
98     close_thread_tables(thd);
99     thd->restore_backup_open_tables_state(backup);
100     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
101     my_error(ER_NO_SUCH_TABLE, MYF(0), dbstr.str, tbstr.str);
102     DBUG_RETURN(TRUE);
103   }
104 
105   DBUG_ASSERT(tables.table->s->table_category == TABLE_CATEGORY_RPL_INFO);
106 
107   if (tables.table->s->fields < max_num_field)
108   {
109     /*
110       Safety: this can only happen if someone started the server and then
111       altered the table.
112     */
113     ha_rollback_trans(thd, FALSE);
114     close_thread_tables(thd);
115     thd->restore_backup_open_tables_state(backup);
116     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
117     my_error(ER_COL_COUNT_DOESNT_MATCH_CORRUPTED_V2, MYF(0),
118              tables.table->s->db.str, tables.table->s->table_name.str,
119              max_num_field, tables.table->s->fields);
120     DBUG_RETURN(TRUE);
121   }
122 
123   thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
124 
125   *table= tables.table;
126   tables.table->use_all_columns();
127   DBUG_RETURN(FALSE);
128 }
129 
130 /**
131   Commits the changes, unlocks the table and closes it. This method
132   needs to be called even if the open_table fails, in order to ensure
133   the lock info is properly restored.
134 
135   @param[in] thd    Thread requesting to close the table
136   @param[in] table  Table to be closed
137   @param[in] backup Restore the lock info from here
138   @param[in] error  If there was an error while updating
139                     the table
140 
141   If there is an error, rolls back the current statement. Otherwise,
142   commits it. However, if a new thread was created and there is an
143   error, the transaction must be rolled back. Otherwise, it must be
144   committed. In this case, the changes were not done on behalf of
145   any user transaction and if not finished, there would be pending
146   changes.
147 
148   @return
149     @retval FALSE No error
150     @retval TRUE  Failure
151 */
close_table(THD * thd,TABLE * table,Open_tables_backup * backup,bool error)152 bool Rpl_info_table_access::close_table(THD *thd, TABLE* table,
153                                         Open_tables_backup *backup,
154                                         bool error)
155 {
156   Query_tables_list query_tables_list_backup;
157 
158   DBUG_ENTER("Rpl_info_table_access::close_table");
159 
160   if (table)
161   {
162     if (error)
163       ha_rollback_trans(thd, FALSE);
164     else
165     {
166       /*
167         To make the commit not to block with global read lock set
168         "ignore_global_read_lock" flag to true.
169        */
170       ha_commit_trans(thd, FALSE, TRUE);
171     }
172     if (thd_created)
173     {
174       if (error)
175         ha_rollback_trans(thd, TRUE);
176       else
177       {
178         /*
179           To make the commit not to block with global read lock set
180           "ignore_global_read_lock" flag to true.
181          */
182         ha_commit_trans(thd, TRUE, TRUE);
183       }
184     }
185     /*
186       In order not to break execution of current statement we have to
187       backup/reset/restore Query_tables_list part of LEX, which is
188       accessed and updated in the process of closing tables.
189     */
190     thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
191     close_thread_tables(thd);
192     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
193     thd->restore_backup_open_tables_state(backup);
194   }
195 
196   DBUG_RETURN(FALSE);
197 }
198 
199 /**
200   Positions the internal pointer of `table` according to the primary
201   key.
202 
203   If the search succeeds, the table cursor points to the found row.
204 
205   @param[in,out]  field_values The sequence of values
206   @param[in,out]  table        Table
207 
208   @return
209     @retval FOUND     The row was found.
210     @retval NOT_FOUND The row was not found.
211     @retval ERROR     There was a failure.
212 */
find_info(Rpl_info_values * field_values,TABLE * table)213 enum enum_return_id Rpl_info_table_access::find_info(Rpl_info_values *field_values,
214                                                      TABLE *table)
215 {
216   KEY* keyinfo= NULL;
217   uchar key[MAX_KEY_LENGTH];
218 
219   DBUG_ENTER("Rpl_info_table_access::find_info");
220 
221   /*
222     Checks if the table has a primary key as expected.
223   */
224   if (table->s->primary_key >= MAX_KEY ||
225       !table->s->keys_in_use.is_set(table->s->primary_key))
226   {
227     /*
228       This is not supposed to happen and means that someone
229       has changed the table or disabled the keys.
230     */
231     DBUG_RETURN(ERROR_ID);
232   }
233 
234   keyinfo= table->s->key_info + (uint) table->s->primary_key;
235   for (uint idx= 0; idx < keyinfo->user_defined_key_parts; idx++)
236   {
237     uint fieldnr= keyinfo->key_part[idx].fieldnr - 1;
238 
239     /*
240       The size of the field must be great to store data.
241     */
242     if (field_values->value[fieldnr].length() >
243         table->field[fieldnr]->field_length)
244       DBUG_RETURN(ERROR_ID);
245 
246     table->field[fieldnr]->store(field_values->value[fieldnr].c_ptr_safe(),
247                                  field_values->value[fieldnr].length(),
248                                  &my_charset_bin);
249   }
250   key_copy(key, table->record[0], table->key_info, table->key_info->key_length);
251 
252   if (table->file->ha_index_read_idx_map(table->record[0], 0, key, HA_WHOLE_KEY,
253                                          HA_READ_KEY_EXACT))
254     DBUG_RETURN(NOT_FOUND_ID);
255 
256   DBUG_RETURN(FOUND_ID);
257 }
258 
259 /**
260   Positions the internal pointer of `table` to the n-instance row.
261 
262   @param[in]  table Reference to a table object.
263   @param[in]  instance n-instance row.
264 
265   The code built on top of this function needs to ensure there is
266   no concurrent threads trying to update the table. So if an error
267   different from HA_ERR_END_OF_FILE is returned, we abort with an
268   error because this implies that someone has manualy and
269   concurrently changed something.
270 
271   @return
272     @retval FOUND     The row was found.
273     @retval NOT_FOUND The row was not found.
274     @retval ERROR     There was a failure.
275 */
scan_info(TABLE * table,uint instance)276 enum enum_return_id Rpl_info_table_access::scan_info(TABLE* table,
277                                                      uint instance)
278 {
279   int error= 0;
280   uint counter= 0;
281   enum enum_return_id ret= NOT_FOUND_ID;
282 
283   DBUG_ENTER("Rpl_info_table_access::scan_info");
284 
285   if ((error= table->file->ha_rnd_init(TRUE)))
286     DBUG_RETURN(ERROR_ID);
287 
288   do
289   {
290     error= table->file->ha_rnd_next(table->record[0]);
291     switch (error)
292     {
293       case 0:
294         counter++;
295         if (counter == instance)
296         {
297           ret= FOUND_ID;
298           error= HA_ERR_END_OF_FILE;
299         }
300       break;
301 
302       case HA_ERR_END_OF_FILE:
303         ret= NOT_FOUND_ID;
304       break;
305 
306       default:
307         DBUG_PRINT("info", ("Failed to get next record"
308                             " (ha_rnd_next returns %d)", error));
309         ret= ERROR_ID;
310       break;
311     }
312   }
313   while (!error);
314 
315   table->file->ha_rnd_end();
316 
317   DBUG_RETURN(ret);
318 }
319 
320 /**
321   Returns the number of entries in table.
322 
323   The code built on top of this function needs to ensure there is
324   no concurrent threads trying to update the table. So if an error
325   different from HA_ERR_END_OF_FILE is returned, we abort with an
326   error because this implies that someone has manualy and
327   concurrently changed something.
328 
329   @param[in]  table   Table
330   @param[out] counter Registers the number of entries.
331 
332   @return
333     @retval false No error
334     @retval true  Failure
335 */
count_info(TABLE * table,uint * counter)336 bool Rpl_info_table_access::count_info(TABLE* table, uint* counter)
337 {
338   bool end= false;
339   int error= 0;
340 
341   DBUG_ENTER("Rpl_info_table_access::count_info");
342 
343   if ((error= table->file->ha_rnd_init(true)))
344     DBUG_RETURN(true);
345 
346   do
347   {
348     error= table->file->ha_rnd_next(table->record[0]);
349     switch (error)
350     {
351       case 0:
352         (*counter)++;
353       break;
354 
355       case HA_ERR_END_OF_FILE:
356         end= true;
357       break;
358 
359       default:
360         DBUG_PRINT("info", ("Failed to get next record"
361                             " (ha_rnd_next returns %d)", error));
362       break;
363     }
364   }
365   while (!error);
366 
367   table->file->ha_rnd_end();
368 
369   DBUG_RETURN(end ? false : true);
370 }
371 
372 /**
373   Reads information from a sequence of fields into a set of LEX_STRING
374   structures, where the sequence of values is specified through the object
375   Rpl_info_values.
376 
377   @param[in] max_num_field Maximum number of fields
378   @param[in] fields        The sequence of fields
379   @param[in] field_values  The sequence of values
380 
381   @return
382     @retval FALSE No error
383     @retval TRUE  Failure
384 */
load_info_values(uint max_num_field,Field ** fields,Rpl_info_values * field_values)385 bool Rpl_info_table_access::load_info_values(uint max_num_field, Field **fields,
386                                              Rpl_info_values *field_values)
387 {
388   DBUG_ENTER("Rpl_info_table_access::load_info_values");
389   char buff[MAX_FIELD_WIDTH];
390   String str(buff, sizeof(buff), &my_charset_bin);
391 
392   uint field_idx= 0;
393   while (field_idx < max_num_field)
394   {
395     fields[field_idx]->val_str(&str);
396     field_values->value[field_idx].copy(str.c_ptr_safe(), str.length(),
397                                         &my_charset_bin);
398     field_idx++;
399   }
400 
401   DBUG_RETURN(FALSE);
402 }
403 
404 /**
405   Stores information from a sequence of fields into a set of LEX_STRING
406   structures, where the sequence of values is specified through the object
407   Rpl_info_values.
408 
409   @param[in] max_num_field Maximum number of fields
410   @param[in] fields        The sequence of fields
411   @param[in] field_values  The sequence of values
412 
413   @return
414     @retval FALSE No error
415     @retval TRUE  Failure
416  */
store_info_values(uint max_num_field,Field ** fields,Rpl_info_values * field_values)417 bool Rpl_info_table_access::store_info_values(uint max_num_field, Field **fields,
418                                               Rpl_info_values *field_values)
419 {
420   DBUG_ENTER("Rpl_info_table_access::store_info_values");
421   uint field_idx= 0;
422 
423   while (field_idx < max_num_field)
424   {
425     fields[field_idx]->set_notnull();
426 
427     if (fields[field_idx]->store(field_values->value[field_idx].c_ptr_safe(),
428                                  field_values->value[field_idx].length(),
429                                  &my_charset_bin))
430     {
431       my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0),
432                fields[field_idx]->field_name);
433       DBUG_RETURN(TRUE);
434     }
435     field_idx++;
436   }
437 
438   DBUG_RETURN(FALSE);
439 }
440 
441 /**
442   Creates a new thread if necessary. In the bootstrap process or in
443   the mysqld startup, a thread is created in order to be able to
444   access a table. Otherwise, the current thread is used.
445 
446   @return
447     @retval THD* Pointer to thread structure
448 */
create_thd()449 THD *Rpl_info_table_access::create_thd()
450 {
451   THD *thd= current_thd;
452 
453   if (!thd)
454   {
455     thd= new THD;
456     thd->thread_stack= (char*) &thd;
457     thd->store_globals();
458     thd->security_ctx->skip_grants();
459     thd->system_thread= SYSTEM_THREAD_INFO_REPOSITORY;
460     thd_created= true;
461   }
462 
463   return(thd);
464 }
465 
466 /**
467   Destroys the created thread if necessary and restores the
468   system_thread information.
469 
470   @param[in] thd Thread requesting to be destroyed
471 
472   @return
473     @retval FALSE No error
474     @retval TRUE  Failure
475 */
drop_thd(THD * thd)476 bool Rpl_info_table_access::drop_thd(THD *thd)
477 {
478   DBUG_ENTER("Rpl_info::drop_thd");
479 
480   if (thd_created)
481   {
482     delete thd;
483     my_pthread_setspecific_ptr(THR_THD,  NULL);
484     thd_created= false;
485   }
486 
487   DBUG_RETURN(FALSE);
488 }
489