1 /* Copyright (c) 2010, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/rpl_info_table_access.h"
24
25 #include <stddef.h>
26
27 #include "libbinlogevents/include/binlog_event.h"
28 #include "m_ctype.h"
29 #include "my_base.h"
30 #include "my_dbug.h"
31 #include "my_inttypes.h"
32 #include "my_sqlcommand.h"
33 #include "my_sys.h"
34 #include "mysql/thread_type.h"
35 #include "mysqld_error.h"
36 #include "sql/current_thd.h"
37 #include "sql/field.h"
38 #include "sql/handler.h"
39 #include "sql/key.h"
40 #include "sql/log_event.h"
41 #include "sql/rpl_info_values.h" // Rpl_info_values
42 #include "sql/rpl_rli.h"
43 #include "sql/sql_base.h" // MYSQL_OPEN_IGNORE_FLUSH
44 #include "sql/sql_bitmap.h"
45 #include "sql/sql_class.h" // THD
46 #include "sql/sql_const.h"
47 #include "sql/sql_lex.h"
48 #include "sql/sql_parse.h" // mysql_reset_thd_for_next_command
49 #include "sql/table.h" // TABLE
50 #include "sql_string.h"
51
before_open(THD * thd)52 void Rpl_info_table_access::before_open(THD *thd) {
53 DBUG_TRACE;
54
55 m_flags = (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
56 MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY | MYSQL_OPEN_IGNORE_FLUSH |
57 MYSQL_LOCK_IGNORE_TIMEOUT | MYSQL_LOCK_RPL_INFO_TABLE);
58
59 /*
60 This is equivalent to a new "statement". For that reason, we call both
61 lex_start() and mysql_reset_thd_for_next_command.
62 Notice in case of atomic DDL the new statement has to be initiated
63 with a special care to preserve LEX of the top level statement.
64 On the other hand non-atomic DDL behaves "natively" to reset.
65 */
66 if (!current_thd ||
67 (thd->slave_thread &&
68 ((!thd->rli_slave || !thd->rli_slave->current_event ||
69 thd->rli_slave->current_event->get_type_code() !=
70 binary_log::QUERY_EVENT) ||
71 !static_cast<Query_log_event *>(thd->rli_slave->current_event)
72 ->has_ddl_committed))) {
73 lex_start(thd);
74 mysql_reset_thd_for_next_command(thd);
75 }
76 }
77
78 /**
79 Commits the changes, unlocks the table and closes it. This method
80 needs to be called even if the open_table fails, in order to ensure
81 the lock info is properly restored.
82
83 @param[in] thd Thread requesting to close the table
84 @param[in] table Table to be closed
85 @param[in] backup Restore the lock info from here
86 @param[in] error If there was an error while updating
87 the table
88
89 If there is an error, rolls back the current statement. Otherwise,
90 commits it. However, if a new thread was created and there is an
91 error, the transaction must be rolled back. Otherwise, it must be
92 committed. In this case, the changes were not done on behalf of
93 any user transaction and if not finished, there would be pending
94 changes.
95
96 @retval false Success
97 @retval true Failure
98 */
close_table(THD * thd,TABLE * table,Open_tables_backup * backup,bool error)99 bool Rpl_info_table_access::close_table(THD *thd, TABLE *table,
100 Open_tables_backup *backup,
101 bool error) {
102 DBUG_TRACE;
103 bool res =
104 System_table_access::close_table(thd, table, backup, error, thd_created);
105
106 DBUG_EXECUTE_IF("slave_crash_after_commit_no_atomic_ddl", {
107 if (thd->slave_thread && thd->rli_slave && thd->rli_slave->current_event &&
108 thd->rli_slave->current_event->get_type_code() ==
109 binary_log::QUERY_EVENT &&
110 !static_cast<Query_log_event *>(thd->rli_slave->current_event)
111 ->has_ddl_committed) {
112 DBUG_ASSERT(thd->lex->sql_command == SQLCOM_END);
113 DBUG_SUICIDE();
114 }
115 });
116
117 return res;
118 }
119
120 /**
121 Positions the internal pointer of `table` according to the primary
122 key.
123
124 If the search succeeds, the table cursor points to the found row.
125
126 @param[in,out] field_values The sequence of values
127 @param[in,out] table Table
128
129 @retval FOUND The row was found.
130 @retval NOT_FOUND The row was not found.
131 @retval ERROR There was a failure.
132 */
find_info(Rpl_info_values * field_values,TABLE * table)133 enum enum_return_id Rpl_info_table_access::find_info(
134 Rpl_info_values *field_values, TABLE *table) {
135 KEY *keyinfo = nullptr;
136 uchar key[MAX_KEY_LENGTH];
137
138 DBUG_TRACE;
139
140 /*
141 Checks if the table has a primary key as expected.
142 */
143 if (table->s->primary_key >= MAX_KEY ||
144 !table->s->keys_in_use.is_set(table->s->primary_key)) {
145 /*
146 This is not supposed to happen and means that someone
147 has changed the table or disabled the keys.
148 */
149 return ERROR_ID;
150 }
151
152 keyinfo = table->s->key_info + table->s->primary_key;
153 for (uint idx = 0; idx < keyinfo->user_defined_key_parts; idx++) {
154 uint fieldnr = keyinfo->key_part[idx].fieldnr - 1;
155
156 /*
157 The size of the field must be great to store data.
158 */
159 if (field_values->value[fieldnr].length() >
160 table->field[fieldnr]->field_length)
161 return ERROR_ID;
162
163 table->field[fieldnr]->store(field_values->value[fieldnr].c_ptr_safe(),
164 field_values->value[fieldnr].length(),
165 &my_charset_bin);
166 }
167 key_copy(key, table->record[0], table->key_info, table->key_info->key_length);
168
169 if (table->file->ha_index_read_idx_map(table->record[0], 0, key, HA_WHOLE_KEY,
170 HA_READ_KEY_EXACT))
171 return NOT_FOUND_ID;
172
173 return FOUND_ID;
174 }
175
176 /**
177 Positions the internal pointer of `table` to the n-instance row.
178
179 @param[in] table Reference to a table object.
180 @param[in] instance n-instance row.
181
182 The code built on top of this function needs to ensure there is
183 no concurrent threads trying to update the table. So if an error
184 different from HA_ERR_END_OF_FILE is returned, we abort with an
185 error because this implies that someone has manualy and
186 concurrently changed something.
187
188 @retval FOUND The row was found.
189 @retval NOT_FOUND The row was not found.
190 @retval ERROR There was a failure.
191 */
scan_info(TABLE * table,uint instance)192 enum enum_return_id Rpl_info_table_access::scan_info(TABLE *table,
193 uint instance) {
194 int error = 0;
195 uint counter = 0;
196 enum enum_return_id ret = NOT_FOUND_ID;
197
198 DBUG_TRACE;
199
200 if ((error = table->file->ha_rnd_init(true))) return ERROR_ID;
201
202 do {
203 error = table->file->ha_rnd_next(table->record[0]);
204 switch (error) {
205 case 0:
206 counter++;
207 if (counter == instance) {
208 ret = FOUND_ID;
209 error = HA_ERR_END_OF_FILE;
210 }
211 break;
212
213 case HA_ERR_END_OF_FILE:
214 ret = NOT_FOUND_ID;
215 break;
216
217 default:
218 DBUG_PRINT("info", ("Failed to get next record"
219 " (ha_rnd_next returns %d)",
220 error));
221 ret = ERROR_ID;
222 break;
223 }
224 } while (!error);
225
226 table->file->ha_rnd_end();
227
228 return ret;
229 }
230
231 /**
232 Returns the number of entries in table.
233
234 The code built on top of this function needs to ensure there is
235 no concurrent threads trying to update the table. So if an error
236 different from HA_ERR_END_OF_FILE is returned, we abort with an
237 error because this implies that someone has manualy and
238 concurrently changed something.
239
240 @param[in] table Table
241 @param[out] counter Registers the number of entries.
242
243 @retval false No error
244 @retval true Failure
245 */
count_info(TABLE * table,uint * counter)246 bool Rpl_info_table_access::count_info(TABLE *table, uint *counter) {
247 bool end = false;
248 int error = 0;
249
250 DBUG_TRACE;
251
252 if ((error = table->file->ha_rnd_init(true))) return true;
253
254 do {
255 error = table->file->ha_rnd_next(table->record[0]);
256 switch (error) {
257 case 0:
258 (*counter)++;
259 break;
260
261 case HA_ERR_END_OF_FILE:
262 end = true;
263 break;
264
265 default:
266 DBUG_PRINT("info", ("Failed to get next record"
267 " (ha_rnd_next returns %d)",
268 error));
269 break;
270 }
271 } while (!error);
272
273 table->file->ha_rnd_end();
274
275 return end ? false : true;
276 }
277
278 /**
279 Reads information from a sequence of fields into a set of LEX_STRING
280 structures, where the sequence of values is specified through the object
281 Rpl_info_values.
282
283 @param[in] max_num_field Maximum number of fields
284 @param[in] fields The sequence of fields
285 @param[in] field_values The sequence of values
286
287 @retval false No error
288 @retval true Failure
289 */
load_info_values(uint max_num_field,Field ** fields,Rpl_info_values * field_values)290 bool Rpl_info_table_access::load_info_values(uint max_num_field, Field **fields,
291 Rpl_info_values *field_values) {
292 DBUG_TRACE;
293 char buff[MAX_FIELD_WIDTH];
294 String str(buff, sizeof(buff), &my_charset_bin);
295
296 uint field_idx = 0;
297 while (field_idx < max_num_field) {
298 if (fields[field_idx]->is_null()) {
299 bitmap_set_bit(&field_values->is_null, field_idx);
300 } else {
301 if (fields[field_idx]->real_type() == MYSQL_TYPE_ENUM) {
302 longlong enum_value = fields[field_idx]->val_int();
303 str.set_int(enum_value, false, &my_charset_bin);
304 } else
305 fields[field_idx]->val_str(&str);
306 field_values->value[field_idx].copy(str.c_ptr_safe(), str.length(),
307 &my_charset_bin);
308 bitmap_clear_bit(&field_values->is_null, field_idx);
309 }
310 field_idx++;
311 }
312
313 return false;
314 }
315
316 /**
317 Stores information from a sequence of fields into a set of LEX_STRING
318 structures, where the sequence of values is specified through the object
319 Rpl_info_values.
320
321 @param[in] max_num_field Maximum number of fields
322 @param[in] fields The sequence of fields
323 @param[in] field_values The sequence of values
324
325 @retval false No error
326 @retval true Failure
327 */
store_info_values(uint max_num_field,Field ** fields,Rpl_info_values * field_values)328 bool Rpl_info_table_access::store_info_values(uint max_num_field,
329 Field **fields,
330 Rpl_info_values *field_values) {
331 DBUG_TRACE;
332 uint field_idx = 0;
333
334 while (field_idx < max_num_field) {
335 if (bitmap_is_set(&field_values->is_null, field_idx)) {
336 fields[field_idx]->set_null();
337 } else {
338 fields[field_idx]->set_notnull();
339
340 if (fields[field_idx]->store(field_values->value[field_idx].c_ptr_safe(),
341 field_values->value[field_idx].length(),
342 &my_charset_bin)) {
343 my_error(ER_RPL_INFO_DATA_TOO_LONG, MYF(0),
344 fields[field_idx]->field_name);
345 return true;
346 }
347 }
348 field_idx++;
349 }
350
351 return false;
352 }
353
354 /**
355 Creates a new thread if necessary. In the bootstrap process or in
356 the mysqld startup, a thread is created in order to be able to
357 access a table. Otherwise, the current thread is used.
358
359 @returns THD* Pointer to thread structure
360 */
create_thd()361 THD *Rpl_info_table_access::create_thd() {
362 THD *thd = current_thd;
363
364 if (!thd) {
365 thd = System_table_access::create_thd();
366 thd->system_thread = SYSTEM_THREAD_INFO_REPOSITORY;
367 /*
368 Set the skip_readonly_check flag as this thread should not be
369 blocked by super_read_only check during ha_commit_trans.
370 */
371 thd->set_skip_readonly_check();
372 thd_created = true;
373 }
374
375 return (thd);
376 }
377
378 /**
379 Destroys the created thread if necessary and restores the
380 system_thread information.
381
382 @param[in] thd Thread requesting to be destroyed
383 */
drop_thd(THD * thd)384 void Rpl_info_table_access::drop_thd(THD *thd) {
385 DBUG_TRACE;
386
387 if (thd_created) {
388 thd->reset_skip_readonly_check();
389 System_table_access::drop_thd(thd);
390 thd_created = false;
391 }
392 }
393