1 /* Copyright (c) 2010, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/rpl_info_table_access.h"
24 
25 #include <stddef.h>
26 
27 #include "libbinlogevents/include/binlog_event.h"
28 #include "m_ctype.h"
29 #include "my_base.h"
30 #include "my_dbug.h"
31 #include "my_inttypes.h"
32 #include "my_sqlcommand.h"
33 #include "my_sys.h"
34 #include "mysql/thread_type.h"
35 #include "mysqld_error.h"
36 #include "sql/current_thd.h"
37 #include "sql/field.h"
38 #include "sql/handler.h"
39 #include "sql/key.h"
40 #include "sql/log_event.h"
41 #include "sql/rpl_info_values.h"  // Rpl_info_values
42 #include "sql/rpl_rli.h"
43 #include "sql/sql_base.h"  // MYSQL_OPEN_IGNORE_FLUSH
44 #include "sql/sql_bitmap.h"
45 #include "sql/sql_class.h"  // THD
46 #include "sql/sql_const.h"
47 #include "sql/sql_lex.h"
48 #include "sql/sql_parse.h"  // mysql_reset_thd_for_next_command
49 #include "sql/table.h"      // TABLE
50 #include "sql_string.h"
51 
before_open(THD * thd)52 void Rpl_info_table_access::before_open(THD *thd) {
53   DBUG_TRACE;
54 
55   m_flags = (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
56              MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY | MYSQL_OPEN_IGNORE_FLUSH |
57              MYSQL_LOCK_IGNORE_TIMEOUT | MYSQL_LOCK_RPL_INFO_TABLE);
58 
59   /*
60     This is equivalent to a new "statement". For that reason, we call both
61     lex_start() and mysql_reset_thd_for_next_command.
62     Notice in case of atomic DDL the new statement has to be initiated
63     with a special care to preserve LEX of the top level statement.
64     On the other hand non-atomic DDL behaves "natively" to reset.
65   */
66   if (!current_thd ||
67       (thd->slave_thread &&
68        ((!thd->rli_slave || !thd->rli_slave->current_event ||
69          thd->rli_slave->current_event->get_type_code() !=
70              binary_log::QUERY_EVENT) ||
71         !static_cast<Query_log_event *>(thd->rli_slave->current_event)
72              ->has_ddl_committed))) {
73     lex_start(thd);
74     mysql_reset_thd_for_next_command(thd);
75   }
76 }
77 
78 /**
79   Commits the changes, unlocks the table and closes it. This method
80   needs to be called even if the open_table fails, in order to ensure
81   the lock info is properly restored.
82 
83   @param[in] thd    Thread requesting to close the table
84   @param[in] table  Table to be closed
85   @param[in] backup Restore the lock info from here
86   @param[in] error  If there was an error while updating
87                     the table
88 
89   If there is an error, rolls back the current statement. Otherwise,
90   commits it. However, if a new thread was created and there is an
91   error, the transaction must be rolled back. Otherwise, it must be
92   committed. In this case, the changes were not done on behalf of
93   any user transaction and if not finished, there would be pending
94   changes.
95 
96   @retval false Success
97   @retval true  Failure
98 */
close_table(THD * thd,TABLE * table,Open_tables_backup * backup,bool error)99 bool Rpl_info_table_access::close_table(THD *thd, TABLE *table,
100                                         Open_tables_backup *backup,
101                                         bool error) {
102   DBUG_TRACE;
103   bool res =
104       System_table_access::close_table(thd, table, backup, error, thd_created);
105 
106   DBUG_EXECUTE_IF("slave_crash_after_commit_no_atomic_ddl", {
107     if (thd->slave_thread && thd->rli_slave && thd->rli_slave->current_event &&
108         thd->rli_slave->current_event->get_type_code() ==
109             binary_log::QUERY_EVENT &&
110         !static_cast<Query_log_event *>(thd->rli_slave->current_event)
111              ->has_ddl_committed) {
112       DBUG_ASSERT(thd->lex->sql_command == SQLCOM_END);
113       DBUG_SUICIDE();
114     }
115   });
116 
117   return res;
118 }
119 
120 /**
121   Positions the internal pointer of `table` according to the primary
122   key.
123 
124   If the search succeeds, the table cursor points to the found row.
125 
126   @param[in,out]  field_values The sequence of values
127   @param[in,out]  table        Table
128 
129   @retval FOUND     The row was found.
130   @retval NOT_FOUND The row was not found.
131   @retval ERROR     There was a failure.
132 */
find_info(Rpl_info_values * field_values,TABLE * table)133 enum enum_return_id Rpl_info_table_access::find_info(
134     Rpl_info_values *field_values, TABLE *table) {
135   KEY *keyinfo = nullptr;
136   uchar key[MAX_KEY_LENGTH];
137 
138   DBUG_TRACE;
139 
140   /*
141     Checks if the table has a primary key as expected.
142   */
143   if (table->s->primary_key >= MAX_KEY ||
144       !table->s->keys_in_use.is_set(table->s->primary_key)) {
145     /*
146       This is not supposed to happen and means that someone
147       has changed the table or disabled the keys.
148     */
149     return ERROR_ID;
150   }
151 
152   keyinfo = table->s->key_info + table->s->primary_key;
153   for (uint idx = 0; idx < keyinfo->user_defined_key_parts; idx++) {
154     uint fieldnr = keyinfo->key_part[idx].fieldnr - 1;
155 
156     /*
157       The size of the field must be great to store data.
158     */
159     if (field_values->value[fieldnr].length() >
160         table->field[fieldnr]->field_length)
161       return ERROR_ID;
162 
163     table->field[fieldnr]->store(field_values->value[fieldnr].c_ptr_safe(),
164                                  field_values->value[fieldnr].length(),
165                                  &my_charset_bin);
166   }
167   key_copy(key, table->record[0], table->key_info, table->key_info->key_length);
168 
169   if (table->file->ha_index_read_idx_map(table->record[0], 0, key, HA_WHOLE_KEY,
170                                          HA_READ_KEY_EXACT))
171     return NOT_FOUND_ID;
172 
173   return FOUND_ID;
174 }
175 
176 /**
177   Positions the internal pointer of `table` to the n-instance row.
178 
179   @param[in]  table Reference to a table object.
180   @param[in]  instance n-instance row.
181 
182   The code built on top of this function needs to ensure there is
183   no concurrent threads trying to update the table. So if an error
184   different from HA_ERR_END_OF_FILE is returned, we abort with an
185   error because this implies that someone has manualy and
186   concurrently changed something.
187 
188   @retval FOUND     The row was found.
189   @retval NOT_FOUND The row was not found.
190   @retval ERROR     There was a failure.
191 */
scan_info(TABLE * table,uint instance)192 enum enum_return_id Rpl_info_table_access::scan_info(TABLE *table,
193                                                      uint instance) {
194   int error = 0;
195   uint counter = 0;
196   enum enum_return_id ret = NOT_FOUND_ID;
197 
198   DBUG_TRACE;
199 
200   if ((error = table->file->ha_rnd_init(true))) return ERROR_ID;
201 
202   do {
203     error = table->file->ha_rnd_next(table->record[0]);
204     switch (error) {
205       case 0:
206         counter++;
207         if (counter == instance) {
208           ret = FOUND_ID;
209           error = HA_ERR_END_OF_FILE;
210         }
211         break;
212 
213       case HA_ERR_END_OF_FILE:
214         ret = NOT_FOUND_ID;
215         break;
216 
217       default:
218         DBUG_PRINT("info", ("Failed to get next record"
219                             " (ha_rnd_next returns %d)",
220                             error));
221         ret = ERROR_ID;
222         break;
223     }
224   } while (!error);
225 
226   table->file->ha_rnd_end();
227 
228   return ret;
229 }
230 
231 /**
232   Returns the number of entries in table.
233 
234   The code built on top of this function needs to ensure there is
235   no concurrent threads trying to update the table. So if an error
236   different from HA_ERR_END_OF_FILE is returned, we abort with an
237   error because this implies that someone has manualy and
238   concurrently changed something.
239 
240   @param[in]  table   Table
241   @param[out] counter Registers the number of entries.
242 
243   @retval false No error
244   @retval true  Failure
245 */
count_info(TABLE * table,uint * counter)246 bool Rpl_info_table_access::count_info(TABLE *table, uint *counter) {
247   bool end = false;
248   int error = 0;
249 
250   DBUG_TRACE;
251 
252   if ((error = table->file->ha_rnd_init(true))) return true;
253 
254   do {
255     error = table->file->ha_rnd_next(table->record[0]);
256     switch (error) {
257       case 0:
258         (*counter)++;
259         break;
260 
261       case HA_ERR_END_OF_FILE:
262         end = true;
263         break;
264 
265       default:
266         DBUG_PRINT("info", ("Failed to get next record"
267                             " (ha_rnd_next returns %d)",
268                             error));
269         break;
270     }
271   } while (!error);
272 
273   table->file->ha_rnd_end();
274 
275   return end ? false : true;
276 }
277 
278 /**
279   Reads information from a sequence of fields into a set of LEX_STRING
280   structures, where the sequence of values is specified through the object
281   Rpl_info_values.
282 
283   @param[in] max_num_field Maximum number of fields
284   @param[in] fields        The sequence of fields
285   @param[in] field_values  The sequence of values
286 
287   @retval false No error
288   @retval true  Failure
289 */
load_info_values(uint max_num_field,Field ** fields,Rpl_info_values * field_values)290 bool Rpl_info_table_access::load_info_values(uint max_num_field, Field **fields,
291                                              Rpl_info_values *field_values) {
292   DBUG_TRACE;
293   char buff[MAX_FIELD_WIDTH];
294   String str(buff, sizeof(buff), &my_charset_bin);
295 
296   uint field_idx = 0;
297   while (field_idx < max_num_field) {
298     if (fields[field_idx]->is_null()) {
299       bitmap_set_bit(&field_values->is_null, field_idx);
300     } else {
301       if (fields[field_idx]->real_type() == MYSQL_TYPE_ENUM) {
302         longlong enum_value = fields[field_idx]->val_int();
303         str.set_int(enum_value, false, &my_charset_bin);
304       } else
305         fields[field_idx]->val_str(&str);
306       field_values->value[field_idx].copy(str.c_ptr_safe(), str.length(),
307                                           &my_charset_bin);
308       bitmap_clear_bit(&field_values->is_null, field_idx);
309     }
310     field_idx++;
311   }
312 
313   return false;
314 }
315 
316 /**
317   Stores information from a sequence of fields into a set of LEX_STRING
318   structures, where the sequence of values is specified through the object
319   Rpl_info_values.
320 
321   @param[in] max_num_field Maximum number of fields
322   @param[in] fields        The sequence of fields
323   @param[in] field_values  The sequence of values
324 
325   @retval false No error
326   @retval true  Failure
327  */
store_info_values(uint max_num_field,Field ** fields,Rpl_info_values * field_values)328 bool Rpl_info_table_access::store_info_values(uint max_num_field,
329                                               Field **fields,
330                                               Rpl_info_values *field_values) {
331   DBUG_TRACE;
332   uint field_idx = 0;
333 
334   while (field_idx < max_num_field) {
335     if (bitmap_is_set(&field_values->is_null, field_idx)) {
336       fields[field_idx]->set_null();
337     } else {
338       fields[field_idx]->set_notnull();
339 
340       if (fields[field_idx]->store(field_values->value[field_idx].c_ptr_safe(),
341                                    field_values->value[field_idx].length(),
342                                    &my_charset_bin)) {
343         my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0),
344                  fields[field_idx]->field_name);
345         return true;
346       }
347     }
348     field_idx++;
349   }
350 
351   return false;
352 }
353 
354 /**
355   Creates a new thread if necessary. In the bootstrap process or in
356   the mysqld startup, a thread is created in order to be able to
357   access a table. Otherwise, the current thread is used.
358 
359   @returns THD* Pointer to thread structure
360 */
create_thd()361 THD *Rpl_info_table_access::create_thd() {
362   THD *thd = current_thd;
363 
364   if (!thd) {
365     thd = System_table_access::create_thd();
366     thd->system_thread = SYSTEM_THREAD_INFO_REPOSITORY;
367     /*
368        Set the skip_readonly_check flag as this thread should not be
369        blocked by super_read_only check during ha_commit_trans.
370     */
371     thd->set_skip_readonly_check();
372     thd_created = true;
373   }
374 
375   return (thd);
376 }
377 
378 /**
379   Destroys the created thread if necessary and restores the
380   system_thread information.
381 
382   @param[in] thd Thread requesting to be destroyed
383 */
drop_thd(THD * thd)384 void Rpl_info_table_access::drop_thd(THD *thd) {
385   DBUG_TRACE;
386 
387   if (thd_created) {
388     thd->reset_skip_readonly_check();
389     System_table_access::drop_thd(thd);
390     thd_created = false;
391   }
392 }
393