1 /* Copyright (C) 2015-2021 Codership Oy <info@codership.com>
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License along
13 with this program; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
15
16 #include "mariadb.h"
17
18 #include "table.h"
19 #include "key.h"
20 #include "sql_base.h"
21 #include "sql_parse.h"
22 #include "sql_update.h"
23 #include "transaction.h"
24
25 #include "mysql/service_wsrep.h"
26 #include "wsrep_schema.h"
27 #include "wsrep_applier.h"
28 #include "wsrep_xid.h"
29 #include "wsrep_binlog.h"
30 #include "wsrep_high_priority_service.h"
31 #include "wsrep_storage_service.h"
32 #include "wsrep_thd.h"
33
34 #include <string>
35 #include <sstream>
36
37 #define WSREP_SCHEMA "mysql"
38 #define WSREP_STREAMING_TABLE "wsrep_streaming_log"
39 #define WSREP_CLUSTER_TABLE "wsrep_cluster"
40 #define WSREP_MEMBERS_TABLE "wsrep_cluster_members"
41
42 const char* wsrep_sr_table_name_full= WSREP_SCHEMA "/" WSREP_STREAMING_TABLE;
43
44 static const std::string wsrep_schema_str= WSREP_SCHEMA;
45 static const std::string sr_table_str= WSREP_STREAMING_TABLE;
46 static const std::string cluster_table_str= WSREP_CLUSTER_TABLE;
47 static const std::string members_table_str= WSREP_MEMBERS_TABLE;
48
49 static const std::string create_cluster_table_str=
50 "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_table_str +
51 "("
52 "cluster_uuid CHAR(36) PRIMARY KEY,"
53 "view_id BIGINT NOT NULL,"
54 "view_seqno BIGINT NOT NULL,"
55 "protocol_version INT NOT NULL,"
56 "capabilities INT NOT NULL"
57 ") ENGINE=InnoDB STATS_PERSISTENT=0";
58
59 static const std::string create_members_table_str=
60 "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + members_table_str +
61 "("
62 "node_uuid CHAR(36) PRIMARY KEY,"
63 "cluster_uuid CHAR(36) NOT NULL,"
64 "node_name CHAR(32) NOT NULL,"
65 "node_incoming_address VARCHAR(256) NOT NULL"
66 ") ENGINE=InnoDB STATS_PERSISTENT=0";
67
68 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
69 static const std::string cluster_member_history_table_str= "wsrep_cluster_member_history";
70 static const std::string create_members_history_table_str=
71 "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_member_history_table_str +
72 "("
73 "node_uuid CHAR(36) PRIMARY KEY,"
74 "cluster_uuid CHAR(36) NOT NULL,"
75 "last_view_id BIGINT NOT NULL,"
76 "last_view_seqno BIGINT NOT NULL,"
77 "node_name CHAR(32) NOT NULL,"
78 "node_incoming_address VARCHAR(256) NOT NULL"
79 ") ENGINE=InnoDB STATS_PERSISTENT=0";
80 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
81
82 static const std::string create_frag_table_str=
83 "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + sr_table_str +
84 "("
85 "node_uuid CHAR(36), "
86 "trx_id BIGINT, "
87 "seqno BIGINT, "
88 "flags INT NOT NULL, "
89 "frag LONGBLOB NOT NULL, "
90 "PRIMARY KEY (node_uuid, trx_id, seqno)"
91 ") ENGINE=InnoDB STATS_PERSISTENT=0";
92
93 static const std::string delete_from_cluster_table=
94 "DELETE FROM " + wsrep_schema_str + "." + cluster_table_str;
95
96 static const std::string delete_from_members_table=
97 "DELETE FROM " + wsrep_schema_str + "." + members_table_str;
98
99 /* For rolling upgrade we need to use ALTER. We do not want
100 persistent statistics to be collected from these tables. */
101 static const std::string alter_cluster_table=
102 "ALTER TABLE " + wsrep_schema_str + "." + cluster_table_str +
103 " STATS_PERSISTENT=0";
104
105 static const std::string alter_members_table=
106 "ALTER TABLE " + wsrep_schema_str + "." + members_table_str +
107 " STATS_PERSISTENT=0";
108
109 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
110 static const std::string alter_members_history_table=
111 "ALTER TABLE " + wsrep_schema_str + "." + members_history_table_str +
112 " STATS_PERSISTENT=0";
113 #endif
114
115 static const std::string alter_frag_table=
116 "ALTER TABLE " + wsrep_schema_str + "." + sr_table_str +
117 " STATS_PERSISTENT=0";
118
119 namespace Wsrep_schema_impl
120 {
121
122 class binlog_off
123 {
124 public:
binlog_off(THD * thd)125 binlog_off(THD* thd)
126 : m_thd(thd)
127 , m_option_bits(thd->variables.option_bits)
128 , m_sql_log_bin(thd->variables.sql_log_bin)
129 {
130 thd->variables.option_bits&= ~OPTION_BIN_LOG;
131 thd->variables.sql_log_bin= 0;
132 }
~binlog_off()133 ~binlog_off()
134 {
135 m_thd->variables.option_bits= m_option_bits;
136 m_thd->variables.sql_log_bin= m_sql_log_bin;
137 }
138 private:
139 THD* m_thd;
140 ulonglong m_option_bits;
141 my_bool m_sql_log_bin;
142 };
143
144 class wsrep_off
145 {
146 public:
wsrep_off(THD * thd)147 wsrep_off(THD* thd)
148 : m_thd(thd)
149 , m_wsrep_on(thd->variables.wsrep_on)
150 {
151 thd->variables.wsrep_on= 0;
152 }
~wsrep_off()153 ~wsrep_off()
154 {
155 m_thd->variables.wsrep_on= m_wsrep_on;
156 }
157 private:
158 THD* m_thd;
159 my_bool m_wsrep_on;
160 };
161
162 class thd_context_switch
163 {
164 public:
thd_context_switch(THD * orig_thd,THD * cur_thd)165 thd_context_switch(THD *orig_thd, THD *cur_thd)
166 : m_orig_thd(orig_thd)
167 , m_cur_thd(cur_thd)
168 {
169 wsrep_reset_threadvars(m_orig_thd);
170 wsrep_store_threadvars(m_cur_thd);
171 }
~thd_context_switch()172 ~thd_context_switch()
173 {
174 wsrep_reset_threadvars(m_cur_thd);
175 wsrep_store_threadvars(m_orig_thd);
176 }
177 private:
178 THD *m_orig_thd;
179 THD *m_cur_thd;
180 };
181
182 class sql_safe_updates
183 {
184 public:
sql_safe_updates(THD * thd)185 sql_safe_updates(THD* thd)
186 : m_thd(thd)
187 , m_option_bits(thd->variables.option_bits)
188 {
189 thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
190 }
~sql_safe_updates()191 ~sql_safe_updates()
192 {
193 m_thd->variables.option_bits= m_option_bits;
194 }
195 private:
196 THD* m_thd;
197 ulonglong m_option_bits;
198 };
199
execute_SQL(THD * thd,const char * sql,uint length)200 static int execute_SQL(THD* thd, const char* sql, uint length) {
201 DBUG_ENTER("Wsrep_schema::execute_SQL()");
202 int err= 0;
203
204 PSI_statement_locker *parent_locker= thd->m_statement_psi;
205 Parser_state parser_state;
206
207 WSREP_DEBUG("SQL: %d %s thd: %lld", length, sql, (long long)thd->thread_id);
208
209 if (parser_state.init(thd, (char*)sql, length) == 0) {
210 thd->reset_for_next_command();
211 lex_start(thd);
212
213 thd->m_statement_psi= NULL;
214
215 thd->set_query((char*)sql, length);
216 thd->set_query_id(next_query_id());
217
218 mysql_parse(thd, (char*)sql, length, & parser_state, FALSE, FALSE);
219
220 if (thd->is_error()) {
221 WSREP_WARN("Wsrep_schema::execute_sql() failed, %d %s\nSQL: %s",
222 thd->get_stmt_da()->sql_errno(),
223 thd->get_stmt_da()->message(),
224 sql);
225 err= 1;
226 }
227 thd->m_statement_psi= parent_locker;
228 thd->end_statement();
229 thd->reset_query();
230 close_thread_tables(thd);
231 delete_explain_query(thd->lex);
232 }
233 else {
234 WSREP_WARN("SR init failure");
235 }
236 thd->cleanup_after_query();
237 DBUG_RETURN(err);
238 }
239
240 /*
241 Initialize thd for next "statement"
242 */
init_stmt(THD * thd)243 static void init_stmt(THD* thd) {
244 thd->reset_for_next_command();
245 }
246
finish_stmt(THD * thd)247 static void finish_stmt(THD* thd) {
248 trans_commit_stmt(thd);
249 close_thread_tables(thd);
250 }
251
open_table(THD * thd,const LEX_CSTRING * schema_name,const LEX_CSTRING * table_name,enum thr_lock_type const lock_type,TABLE ** table)252 static int open_table(THD* thd,
253 const LEX_CSTRING *schema_name,
254 const LEX_CSTRING *table_name,
255 enum thr_lock_type const lock_type,
256 TABLE** table) {
257 assert(table);
258 *table= NULL;
259
260 DBUG_ENTER("Wsrep_schema::open_table()");
261
262 TABLE_LIST tables;
263 uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
264 MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
265 MYSQL_OPEN_IGNORE_FLUSH |
266 MYSQL_LOCK_IGNORE_TIMEOUT);
267
268 tables.init_one_table(schema_name,
269 table_name,
270 NULL, lock_type);
271 thd->lex->query_tables_own_last= 0;
272
273 if (!open_n_lock_single_table(thd, &tables, tables.lock_type, flags)) {
274 close_thread_tables(thd);
275 DBUG_RETURN(1);
276 }
277
278 *table= tables.table;
279 (*table)->use_all_columns();
280
281 DBUG_RETURN(0);
282 }
283
284
open_for_write(THD * thd,const char * table_name,TABLE ** table)285 static int open_for_write(THD* thd, const char* table_name, TABLE** table) {
286 LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
287 LEX_CSTRING table_str= { table_name, strlen(table_name) };
288 if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_WRITE,
289 table)) {
290 // No need to log an error if the query was bf aborted,
291 // thd client will get ER_LOCK_DEADLOCK in the end.
292 const bool interrupted= thd->killed ||
293 (thd->is_error() &&
294 (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED));
295 if (!interrupted) {
296 WSREP_ERROR("Failed to open table %s.%s for writing",
297 schema_str.str, table_name);
298 }
299 return 1;
300 }
301 empty_record(*table);
302 (*table)->use_all_columns();
303 restore_record(*table, s->default_values);
304 return 0;
305 }
306
store(TABLE * table,uint field,const Wsrep_id & id)307 static void store(TABLE* table, uint field, const Wsrep_id& id) {
308 assert(field < table->s->fields);
309 std::ostringstream os;
310 os << id;
311 table->field[field]->store(os.str().c_str(),
312 os.str().size(),
313 &my_charset_bin);
314 }
315
316
317 template <typename INTTYPE>
store(TABLE * table,uint field,const INTTYPE val)318 static void store(TABLE* table, uint field, const INTTYPE val) {
319 assert(field < table->s->fields);
320 table->field[field]->store(val);
321 }
322
323 template <typename CHARTYPE>
store(TABLE * table,uint field,const CHARTYPE * str,size_t str_len)324 static void store(TABLE* table, uint field, const CHARTYPE* str, size_t str_len) {
325 assert(field < table->s->fields);
326 table->field[field]->store((const char*)str,
327 str_len,
328 &my_charset_bin);
329 }
330
store(TABLE * table,uint field,const std::string & str)331 static void store(TABLE* table, uint field, const std::string& str)
332 {
333 store(table, field, str.c_str(), str.size());
334 }
335
update_or_insert(TABLE * table)336 static int update_or_insert(TABLE* table) {
337 DBUG_ENTER("Wsrep_schema::update_or_insert()");
338 int ret= 0;
339 char* key;
340 int error;
341
342 /*
343 Verify that the table has primary key defined.
344 */
345 if (table->s->primary_key >= MAX_KEY ||
346 !table->s->keys_in_use.is_set(table->s->primary_key)) {
347 WSREP_ERROR("No primary key for %s.%s",
348 table->s->db.str, table->s->table_name.str);
349 DBUG_RETURN(1);
350 }
351
352 /*
353 Find the record and update or insert a new one if not found.
354 */
355 if (!(key= (char*) my_safe_alloca(table->s->max_unique_length))) {
356 WSREP_ERROR("Error allocating %ud bytes for key",
357 table->s->max_unique_length);
358 DBUG_RETURN(1);
359 }
360
361 key_copy((uchar*) key, table->record[0],
362 table->key_info + table->s->primary_key, 0);
363
364 if ((error= table->file->ha_index_read_idx_map(table->record[1],
365 table->s->primary_key,
366 (uchar*) key,
367 HA_WHOLE_KEY,
368 HA_READ_KEY_EXACT))) {
369 /*
370 Row not found, insert a new one.
371 */
372 if ((error= table->file->ha_write_row(table->record[0]))) {
373 WSREP_ERROR("Error writing into %s.%s: %d",
374 table->s->db.str,
375 table->s->table_name.str,
376 error);
377 ret= 1;
378 }
379 }
380 else if (!records_are_comparable(table) || compare_record(table)) {
381 /*
382 Record has changed
383 */
384 if ((error= table->file->ha_update_row(table->record[1],
385 table->record[0])) &&
386 error != HA_ERR_RECORD_IS_THE_SAME) {
387 WSREP_ERROR("Error updating record in %s.%s: %d",
388 table->s->db.str,
389 table->s->table_name.str,
390 error);
391 ret= 1;
392 }
393 }
394
395 my_safe_afree(key, table->s->max_unique_length);
396
397 DBUG_RETURN(ret);
398 }
399
insert(TABLE * table)400 static int insert(TABLE* table) {
401 DBUG_ENTER("Wsrep_schema::insert()");
402 int ret= 0;
403 int error;
404
405 /*
406 Verify that the table has primary key defined.
407 */
408 if (table->s->primary_key >= MAX_KEY ||
409 !table->s->keys_in_use.is_set(table->s->primary_key)) {
410 WSREP_ERROR("No primary key for %s.%s",
411 table->s->db.str, table->s->table_name.str);
412 DBUG_RETURN(1);
413 }
414
415 if ((error= table->file->ha_write_row(table->record[0]))) {
416 WSREP_ERROR("Error writing into %s.%s: %d",
417 table->s->db.str,
418 table->s->table_name.str,
419 error);
420 ret= 1;
421 }
422
423 DBUG_RETURN(ret);
424 }
425
delete_row(TABLE * table)426 static int delete_row(TABLE* table) {
427 int error;
428 int retry= 3;
429
430 do {
431 error= table->file->ha_delete_row(table->record[0]);
432 retry--;
433 } while (error && retry);
434
435 if (error) {
436 WSREP_ERROR("Error deleting row from %s.%s: %d",
437 table->s->db.str,
438 table->s->table_name.str,
439 error);
440 return 1;
441 }
442 return 0;
443 }
444
open_for_read(THD * thd,const char * table_name,TABLE ** table)445 static int open_for_read(THD* thd, const char* table_name, TABLE** table) {
446
447 LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
448 LEX_CSTRING table_str= { table_name, strlen(table_name) };
449 if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_READ,
450 table)) {
451 WSREP_ERROR("Failed to open table %s.%s for reading",
452 schema_str.str, table_name);
453 return 1;
454 }
455 empty_record(*table);
456 (*table)->use_all_columns();
457 restore_record(*table, s->default_values);
458 return 0;
459 }
460
461 /*
462 Init table for sequential scan.
463
464 @return 0 in case of success, 1 in case of error.
465 */
init_for_scan(TABLE * table)466 static int init_for_scan(TABLE* table) {
467 int error;
468 if ((error= table->file->ha_rnd_init(TRUE))) {
469 WSREP_ERROR("Failed to init table for scan: %d", error);
470 return 1;
471 }
472 return 0;
473 }
474 /*
475 Scan next record. For return codes see handler::ha_rnd_next()
476
477 @return 0 in case of success, error code in case of error
478 */
next_record(TABLE * table)479 static int next_record(TABLE* table) {
480 int error;
481 if ((error= table->file->ha_rnd_next(table->record[0])) &&
482 error != HA_ERR_END_OF_FILE) {
483 WSREP_ERROR("Failed to read next record: %d", error);
484 }
485 return error;
486 }
487
488 /*
489 End scan.
490
491 @return 0 in case of success, 1 in case of error.
492 */
end_scan(TABLE * table)493 static int end_scan(TABLE* table) {
494 int error;
495 if ((error= table->file->ha_rnd_end())) {
496 WSREP_ERROR("Failed to end scan: %d", error);
497 return 1;
498 }
499 return 0;
500 }
501
scan(TABLE * table,uint field,wsrep::id & id)502 static int scan(TABLE* table, uint field, wsrep::id& id)
503 {
504 assert(field < table->s->fields);
505 String uuid_str;
506 (void)table->field[field]->val_str(&uuid_str);
507 id= wsrep::id(std::string(uuid_str.c_ptr(), uuid_str.length()));
508 return 0;
509 }
510
511 template <typename INTTYPE>
scan(TABLE * table,uint field,INTTYPE & val)512 static int scan(TABLE* table, uint field, INTTYPE& val)
513 {
514 assert(field < table->s->fields);
515 val= table->field[field]->val_int();
516 return 0;
517 }
518
scan(TABLE * table,uint field,char * strbuf,uint strbuf_len)519 static int scan(TABLE* table, uint field, char* strbuf, uint strbuf_len)
520 {
521 String str;
522 (void)table->field[field]->val_str(&str);
523 LEX_CSTRING tmp= str.lex_cstring();
524 uint len = tmp.length;
525 strncpy(strbuf, tmp.str, std::min(len, strbuf_len));
526 strbuf[strbuf_len - 1]= '\0';
527 return 0;
528 }
529
530 /*
531 Scan member
532 TODO: filter members by cluster UUID
533 */
scan_member(TABLE * table,const Wsrep_id & cluster_uuid,std::vector<Wsrep_view::member> & members)534 static int scan_member(TABLE* table,
535 const Wsrep_id& cluster_uuid,
536 std::vector<Wsrep_view::member>& members)
537 {
538 Wsrep_id member_id;
539 char member_name[128]= { 0, };
540 char member_incoming[128]= { 0, };
541
542 if (scan(table, 0, member_id) ||
543 scan(table, 2, member_name, sizeof(member_name)) ||
544 scan(table, 3, member_incoming, sizeof(member_incoming))) {
545 return 1;
546 }
547
548 if (members.empty() == false) {
549 assert(members.rbegin()->id() < member_id);
550 }
551
552 try {
553 members.push_back(Wsrep_view::member(member_id,
554 member_name,
555 member_incoming));
556 }
557 catch (...) {
558 WSREP_ERROR("Caught exception while scanning members table");
559 return 1;
560 }
561 return 0;
562 }
563
564 /*
565 Init table for index scan and retrieve first record
566
567 @return 0 in case of success, error code in case of error.
568 */
init_for_index_scan(TABLE * table,const uchar * key,key_part_map map)569 static int init_for_index_scan(TABLE* table, const uchar* key,
570 key_part_map map) {
571 int error;
572 if ((error= table->file->ha_index_init(table->s->primary_key, true))) {
573 WSREP_ERROR("Failed to init table for index scan: %d", error);
574 return error;
575 }
576
577 error= table->file->ha_index_read_map(table->record[0],
578 key, map, HA_READ_KEY_EXACT);
579 switch(error) {
580 case 0:
581 case HA_ERR_END_OF_FILE:
582 case HA_ERR_KEY_NOT_FOUND:
583 case HA_ERR_ABORTED_BY_USER:
584 break;
585 case -1:
586 WSREP_DEBUG("init_for_index_scan interrupted");
587 break;
588 default:
589 WSREP_ERROR("init_for_index_scan failed to read first record, error %d", error);
590 }
591 return error;
592 }
593
594 /*
595 End index scan.
596
597 @return 0 in case of success, 1 in case of error.
598 */
end_index_scan(TABLE * table)599 static int end_index_scan(TABLE* table) {
600 int error;
601 if (table->file->inited) {
602 if ((error= table->file->ha_index_end())) {
603 WSREP_ERROR("Failed to end scan: %d", error);
604 return 1;
605 }
606 }
607 return 0;
608 }
609
make_key(TABLE * table,uchar ** key,key_part_map * map,int parts)610 static void make_key(TABLE* table, uchar** key, key_part_map* map, int parts) {
611 uint prefix_length= 0;
612 KEY_PART_INFO* key_part= table->key_info->key_part;
613
614 for (int i=0; i < parts; i++)
615 prefix_length += key_part[i].store_length;
616
617 *map= make_prev_keypart_map(parts);
618
619 if (!(*key= (uchar *) my_malloc(prefix_length + 1, MYF(MY_WME))))
620 {
621 WSREP_ERROR("Failed to allocate memory for key prefix_length %u", prefix_length);
622 assert(0);
623 }
624
625 key_copy(*key, table->record[0], table->key_info, prefix_length);
626 }
627
628 } /* namespace Wsrep_schema_impl */
629
630
Wsrep_schema()631 Wsrep_schema::Wsrep_schema()
632 {
633 }
634
~Wsrep_schema()635 Wsrep_schema::~Wsrep_schema()
636 { }
637
wsrep_init_thd_for_schema(THD * thd)638 static void wsrep_init_thd_for_schema(THD *thd)
639 {
640 thd->security_ctx->skip_grants();
641 thd->system_thread= SYSTEM_THREAD_GENERIC;
642
643 thd->real_id=pthread_self(); // Keep purify happy
644
645 thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
646
647 /* No Galera replication */
648 thd->variables.wsrep_on= 0;
649 /* No binlogging */
650 thd->variables.sql_log_bin= 0;
651 thd->variables.option_bits&= ~OPTION_BIN_LOG;
652 /* No safe updates */
653 thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
654 /* No general log */
655 thd->variables.option_bits|= OPTION_LOG_OFF;
656 /* Read committed isolation to avoid gap locking */
657 thd->variables.tx_isolation= ISO_READ_COMMITTED;
658 wsrep_assign_from_threadvars(thd);
659 wsrep_store_threadvars(thd);
660 }
661
init()662 int Wsrep_schema::init()
663 {
664 DBUG_ENTER("Wsrep_schema::init()");
665 int ret;
666 THD* thd= new THD(next_thread_id());
667 if (!thd) {
668 WSREP_ERROR("Unable to get thd");
669 DBUG_RETURN(1);
670 }
671 thd->thread_stack= (char*)&thd;
672 wsrep_init_thd_for_schema(thd);
673
674 if (Wsrep_schema_impl::execute_SQL(thd, create_cluster_table_str.c_str(),
675 create_cluster_table_str.size()) ||
676 Wsrep_schema_impl::execute_SQL(thd, create_members_table_str.c_str(),
677 create_members_table_str.size()) ||
678 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
679 Wsrep_schema_impl::execute_SQL(thd,
680 create_members_history_table_str.c_str(),
681 create_members_history_table_str.size()) ||
682 Wsrep_schema_impl::execute_SQL(thd,
683 alter_members_history_table.c_str(),
684 alter_members_history_table.size()) ||
685 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
686 Wsrep_schema_impl::execute_SQL(thd,
687 create_frag_table_str.c_str(),
688 create_frag_table_str.size()) ||
689 Wsrep_schema_impl::execute_SQL(thd,
690 alter_cluster_table.c_str(),
691 alter_cluster_table.size()) ||
692 Wsrep_schema_impl::execute_SQL(thd,
693 alter_members_table.c_str(),
694 alter_members_table.size()) ||
695 Wsrep_schema_impl::execute_SQL(thd,
696 alter_frag_table.c_str(),
697 alter_frag_table.size()))
698 {
699 ret= 1;
700 }
701 else
702 {
703 ret= 0;
704 }
705
706 delete thd;
707 DBUG_RETURN(ret);
708 }
709
store_view(THD * thd,const Wsrep_view & view)710 int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view)
711 {
712 DBUG_ENTER("Wsrep_schema::store_view()");
713 assert(view.status() == Wsrep_view::primary);
714 int ret= 1;
715 int error;
716 TABLE* cluster_table= 0;
717 TABLE* members_table= 0;
718 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
719 TABLE* members_history_table= 0;
720 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
721
722 Wsrep_schema_impl::wsrep_off wsrep_off(thd);
723 Wsrep_schema_impl::binlog_off binlog_off(thd);
724 Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
725
726 /*
727 Clean up cluster table and members table.
728 */
729 if (Wsrep_schema_impl::execute_SQL(thd,
730 delete_from_cluster_table.c_str(),
731 delete_from_cluster_table.size()) ||
732 Wsrep_schema_impl::execute_SQL(thd,
733 delete_from_members_table.c_str(),
734 delete_from_members_table.size())) {
735 goto out;
736 }
737
738 /*
739 Store cluster view info
740 */
741 Wsrep_schema_impl::init_stmt(thd);
742 if (Wsrep_schema_impl::open_for_write(thd, cluster_table_str.c_str(), &cluster_table))
743 {
744 goto out;
745 }
746
747 Wsrep_schema_impl::store(cluster_table, 0, view.state_id().id());
748 Wsrep_schema_impl::store(cluster_table, 1, view.view_seqno().get());
749 Wsrep_schema_impl::store(cluster_table, 2, view.state_id().seqno().get());
750 Wsrep_schema_impl::store(cluster_table, 3, view.protocol_version());
751 Wsrep_schema_impl::store(cluster_table, 4, view.capabilities());
752
753 if ((error= Wsrep_schema_impl::update_or_insert(cluster_table)))
754 {
755 WSREP_ERROR("failed to write to cluster table: %d", error);
756 goto out;
757 }
758
759 Wsrep_schema_impl::finish_stmt(thd);
760
761 /*
762 Store info about current members
763 */
764 Wsrep_schema_impl::init_stmt(thd);
765 if (Wsrep_schema_impl::open_for_write(thd, members_table_str.c_str(),
766 &members_table))
767 {
768 WSREP_ERROR("failed to open wsrep.members table");
769 goto out;
770 }
771
772 for (size_t i= 0; i < view.members().size(); ++i)
773 {
774 Wsrep_schema_impl::store(members_table, 0, view.members()[i].id());
775 Wsrep_schema_impl::store(members_table, 1, view.state_id().id());
776 Wsrep_schema_impl::store(members_table, 2, view.members()[i].name());
777 Wsrep_schema_impl::store(members_table, 3, view.members()[i].incoming());
778 if ((error= Wsrep_schema_impl::update_or_insert(members_table)))
779 {
780 WSREP_ERROR("failed to write wsrep.members table: %d", error);
781 goto out;
782 }
783 }
784 Wsrep_schema_impl::finish_stmt(thd);
785
786 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
787 /*
788 Store members history
789 */
790 Wsrep_schema_impl::init_stmt(thd);
791 if (Wsrep_schema_impl::open_for_write(thd, cluster_member_history.c_str(),
792 &members_history_table)) {
793 WSREP_ERROR("failed to open wsrep.members table");
794 goto out;
795 }
796
797 for (size_t i= 0; i < view.members().size(); ++i) {
798 Wsrep_schema_impl::store(members_history_table, 0, view.members()[i].id());
799 Wsrep_schema_impl::store(members_history_table, 1, view.state_id().id());
800 Wsrep_schema_impl::store(members_history_table, 2, view.view_seqno());
801 Wsrep_schema_impl::store(members_history_table, 3, view.state_id().seqno());
802 Wsrep_schema_impl::store(members_history_table, 4,
803 view.members()[i].name());
804 Wsrep_schema_impl::store(members_history_table, 5,
805 view.members()[i].incoming());
806 if ((error= Wsrep_schema_impl::update_or_insert(members_history_table))) {
807 WSREP_ERROR("failed to write wsrep_cluster_member_history table: %d", error);
808 goto out;
809 }
810 }
811 Wsrep_schema_impl::finish_stmt(thd);
812 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
813 ret= 0;
814 out:
815
816 DBUG_RETURN(ret);
817 }
818
restore_view(THD * thd,const Wsrep_id & own_id) const819 Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const {
820 DBUG_ENTER("Wsrep_schema::restore_view()");
821
822 int ret= 1;
823 int error;
824
825 TABLE* cluster_table= 0;
826 bool end_cluster_scan= false;
827 TABLE* members_table= 0;
828 bool end_members_scan= false;
829
830 /* variables below need to be initialized in case cluster table is empty */
831 Wsrep_id cluster_uuid;
832 wsrep_seqno_t view_id= -1;
833 wsrep_seqno_t view_seqno= -1;
834 int my_idx= -1;
835 int proto_ver= 0;
836 wsrep_cap_t capabilities= 0;
837 std::vector<Wsrep_view::member> members;
838
839 // we don't want causal waits for reading non-replicated private data
840 int const wsrep_sync_wait_saved= thd->variables.wsrep_sync_wait;
841 thd->variables.wsrep_sync_wait= 0;
842
843 if (trans_begin(thd, MYSQL_START_TRANS_OPT_READ_ONLY)) {
844 WSREP_ERROR("wsrep_schema::restore_view(): Failed to start transaction");
845 goto out;
846 }
847
848 /*
849 Read cluster info from cluster table
850 */
851 Wsrep_schema_impl::init_stmt(thd);
852 if (Wsrep_schema_impl::open_for_read(thd, cluster_table_str.c_str(), &cluster_table) ||
853 Wsrep_schema_impl::init_for_scan(cluster_table)) {
854 goto out;
855 }
856
857 if (((error= Wsrep_schema_impl::next_record(cluster_table)) != 0 ||
858 Wsrep_schema_impl::scan(cluster_table, 0, cluster_uuid) ||
859 Wsrep_schema_impl::scan(cluster_table, 1, view_id) ||
860 Wsrep_schema_impl::scan(cluster_table, 2, view_seqno) ||
861 Wsrep_schema_impl::scan(cluster_table, 3, proto_ver) ||
862 Wsrep_schema_impl::scan(cluster_table, 4, capabilities)) &&
863 error != HA_ERR_END_OF_FILE) {
864 end_cluster_scan= true;
865 goto out;
866 }
867
868 if (Wsrep_schema_impl::end_scan(cluster_table)) {
869 goto out;
870 }
871 Wsrep_schema_impl::finish_stmt(thd);
872
873 /*
874 Read members from members table
875 */
876 Wsrep_schema_impl::init_stmt(thd);
877 if (Wsrep_schema_impl::open_for_read(thd, members_table_str.c_str(), &members_table) ||
878 Wsrep_schema_impl::init_for_scan(members_table)) {
879 goto out;
880 }
881 end_members_scan= true;
882
883 while (true) {
884 if ((error= Wsrep_schema_impl::next_record(members_table)) == 0) {
885 if (Wsrep_schema_impl::scan_member(members_table,
886 cluster_uuid,
887 members)) {
888 goto out;
889 }
890 }
891 else if (error == HA_ERR_END_OF_FILE) {
892 break;
893 }
894 else {
895 goto out;
896 }
897 }
898
899 end_members_scan= false;
900 if (Wsrep_schema_impl::end_scan(members_table)) {
901 goto out;
902 }
903 Wsrep_schema_impl::finish_stmt(thd);
904
905 if (own_id.is_undefined() == false) {
906 for (uint i= 0; i < members.size(); ++i) {
907 if (members[i].id() == own_id) {
908 my_idx= i;
909 break;
910 }
911 }
912 }
913
914 (void)trans_commit(thd);
915 ret= 0; /* Success*/
916 out:
917
918 if (end_cluster_scan) Wsrep_schema_impl::end_scan(cluster_table);
919 if (end_members_scan) Wsrep_schema_impl::end_scan(members_table);
920
921 if (0 != ret) {
922 trans_rollback_stmt(thd);
923 if (!trans_rollback(thd)) {
924 close_thread_tables(thd);
925 }
926 }
927 thd->release_transactional_locks();
928
929 thd->variables.wsrep_sync_wait= wsrep_sync_wait_saved;
930
931 if (0 == ret) {
932 Wsrep_view ret_view(
933 wsrep::gtid(cluster_uuid, Wsrep_seqno(view_seqno)),
934 Wsrep_seqno(view_id),
935 wsrep::view::primary,
936 capabilities,
937 my_idx,
938 proto_ver,
939 members
940 );
941
942 if (wsrep_debug) {
943 std::ostringstream os;
944 os << "Restored cluster view:\n" << ret_view;
945 WSREP_INFO("%s", os.str().c_str());
946 }
947 DBUG_RETURN(ret_view);
948 }
949 else
950 {
951 WSREP_ERROR("wsrep_schema::restore_view() failed.");
952 Wsrep_view ret_view;
953 DBUG_RETURN(ret_view);
954 }
955 }
956
append_fragment(THD * thd,const wsrep::id & server_id,wsrep::transaction_id transaction_id,wsrep::seqno seqno,int flags,const wsrep::const_buffer & data)957 int Wsrep_schema::append_fragment(THD* thd,
958 const wsrep::id& server_id,
959 wsrep::transaction_id transaction_id,
960 wsrep::seqno seqno,
961 int flags,
962 const wsrep::const_buffer& data)
963 {
964 DBUG_ENTER("Wsrep_schema::append_fragment");
965 std::ostringstream os;
966 os << server_id;
967 WSREP_DEBUG("Append fragment(%llu) %s, %llu",
968 thd->thread_id,
969 os.str().c_str(),
970 transaction_id.get());
971 /* use private query table list for the duration of fragment storing,
972 populated query table list from "parent DML" may cause problems .e.g
973 for virtual column handling
974 */
975 Query_tables_list query_tables_list_backup;
976 thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
977
978 Wsrep_schema_impl::binlog_off binlog_off(thd);
979 Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
980 Wsrep_schema_impl::init_stmt(thd);
981
982 TABLE* frag_table= 0;
983 if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
984 {
985 trans_rollback_stmt(thd);
986 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
987 DBUG_RETURN(1);
988 }
989
990 Wsrep_schema_impl::store(frag_table, 0, server_id);
991 Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
992 Wsrep_schema_impl::store(frag_table, 2, seqno.get());
993 Wsrep_schema_impl::store(frag_table, 3, flags);
994 Wsrep_schema_impl::store(frag_table, 4, data.data(), data.size());
995
996 int error;
997 if ((error= Wsrep_schema_impl::insert(frag_table))) {
998 WSREP_ERROR("Failed to write to frag table: %d", error);
999 trans_rollback_stmt(thd);
1000 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1001 DBUG_RETURN(1);
1002 }
1003 Wsrep_schema_impl::finish_stmt(thd);
1004 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1005 DBUG_RETURN(0);
1006 }
1007
update_fragment_meta(THD * thd,const wsrep::ws_meta & ws_meta)1008 int Wsrep_schema::update_fragment_meta(THD* thd,
1009 const wsrep::ws_meta& ws_meta)
1010 {
1011 DBUG_ENTER("Wsrep_schema::update_fragment_meta");
1012 std::ostringstream os;
1013 os << ws_meta.server_id();
1014 WSREP_DEBUG("update_frag_seqno(%llu) %s, %llu, seqno %lld",
1015 thd->thread_id,
1016 os.str().c_str(),
1017 ws_meta.transaction_id().get(),
1018 ws_meta.seqno().get());
1019 DBUG_ASSERT(ws_meta.seqno().is_undefined() == false);
1020
1021 /* use private query table list for the duration of fragment storing,
1022 populated query table list from "parent DML" may cause problems .e.g
1023 for virtual column handling
1024 */
1025 Query_tables_list query_tables_list_backup;
1026 thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
1027
1028 Wsrep_schema_impl::binlog_off binlog_off(thd);
1029 Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
1030 int error;
1031 uchar *key=NULL;
1032 key_part_map key_map= 0;
1033 TABLE* frag_table= 0;
1034
1035 Wsrep_schema_impl::init_stmt(thd);
1036 if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
1037 {
1038 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1039 DBUG_RETURN(1);
1040 }
1041
1042 /* Find record with the given uuid, trx id, and seqno -1 */
1043 Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
1044 Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
1045 Wsrep_schema_impl::store(frag_table, 2, -1);
1046 Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
1047
1048 if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1049 key, key_map)))
1050 {
1051 if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
1052 {
1053 WSREP_WARN("Record not found in %s.%s: %d",
1054 frag_table->s->db.str,
1055 frag_table->s->table_name.str,
1056 error);
1057 }
1058 Wsrep_schema_impl::finish_stmt(thd);
1059 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1060 my_free(key);
1061 DBUG_RETURN(1);
1062 }
1063
1064 my_free(key);
1065 /* Copy the original record to frag_table->record[1] */
1066 store_record(frag_table, record[1]);
1067
1068 /* Store seqno in frag_table->record[0] and update the row */
1069 Wsrep_schema_impl::store(frag_table, 2, ws_meta.seqno().get());
1070 if ((error= frag_table->file->ha_update_row(frag_table->record[1],
1071 frag_table->record[0]))) {
1072 WSREP_ERROR("Error updating record in %s.%s: %d",
1073 frag_table->s->db.str,
1074 frag_table->s->table_name.str,
1075 error);
1076 Wsrep_schema_impl::finish_stmt(thd);
1077 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1078 DBUG_RETURN(1);
1079 }
1080
1081 int ret= Wsrep_schema_impl::end_index_scan(frag_table);
1082 Wsrep_schema_impl::finish_stmt(thd);
1083 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1084 DBUG_RETURN(ret);
1085 }
1086
remove_fragment(THD * thd,TABLE * frag_table,const wsrep::id & server_id,wsrep::transaction_id transaction_id,wsrep::seqno seqno)1087 static int remove_fragment(THD* thd,
1088 TABLE* frag_table,
1089 const wsrep::id& server_id,
1090 wsrep::transaction_id transaction_id,
1091 wsrep::seqno seqno)
1092 {
1093 WSREP_DEBUG("remove_fragment(%llu) trx %llu, seqno %lld",
1094 thd->thread_id,
1095 transaction_id.get(),
1096 seqno.get());
1097 int ret= 0;
1098 int error;
1099 uchar *key= NULL;
1100 key_part_map key_map= 0;
1101
1102 DBUG_ASSERT(server_id.is_undefined() == false);
1103 DBUG_ASSERT(transaction_id.is_undefined() == false);
1104 DBUG_ASSERT(seqno.is_undefined() == false);
1105
1106 /*
1107 Remove record with the given uuid, trx id, and seqno.
1108 Using a complete key here avoids gap locks.
1109 */
1110 Wsrep_schema_impl::store(frag_table, 0, server_id);
1111 Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
1112 Wsrep_schema_impl::store(frag_table, 2, seqno.get());
1113 Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
1114
1115 if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1116 key,
1117 key_map)))
1118 {
1119 if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
1120 {
1121 WSREP_DEBUG("Record not found in %s.%s:trx %llu, seqno %lld, error %d",
1122 frag_table->s->db.str,
1123 frag_table->s->table_name.str,
1124 transaction_id.get(),
1125 seqno.get(),
1126 error);
1127 }
1128 ret= error;
1129 }
1130 else if (Wsrep_schema_impl::delete_row(frag_table))
1131 {
1132 ret= 1;
1133 }
1134
1135 if (key)
1136 my_free(key);
1137 Wsrep_schema_impl::end_index_scan(frag_table);
1138 return ret;
1139 }
1140
remove_fragments(THD * thd,const wsrep::id & server_id,wsrep::transaction_id transaction_id,const std::vector<wsrep::seqno> & fragments)1141 int Wsrep_schema::remove_fragments(THD* thd,
1142 const wsrep::id& server_id,
1143 wsrep::transaction_id transaction_id,
1144 const std::vector<wsrep::seqno>& fragments)
1145 {
1146 DBUG_ENTER("Wsrep_schema::remove_fragments");
1147 int ret= 0;
1148
1149 WSREP_DEBUG("Removing %zu fragments", fragments.size());
1150 Wsrep_schema_impl::wsrep_off wsrep_off(thd);
1151 Wsrep_schema_impl::binlog_off binlog_off(thd);
1152 Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
1153
1154 Query_tables_list query_tables_list_backup;
1155 Open_tables_backup open_tables_backup;
1156 thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
1157 thd->reset_n_backup_open_tables_state(&open_tables_backup);
1158
1159 TABLE* frag_table= 0;
1160 if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
1161 {
1162 ret= 1;
1163 }
1164 else
1165 {
1166 for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
1167 i != fragments.end(); ++i)
1168 {
1169 if (remove_fragment(thd,
1170 frag_table,
1171 server_id,
1172 transaction_id, *i))
1173 {
1174 ret= 1;
1175 break;
1176 }
1177 }
1178 }
1179 close_thread_tables(thd);
1180 thd->restore_backup_open_tables_state(&open_tables_backup);
1181 thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1182
1183 if (thd->wsrep_cs().mode() == wsrep::client_state::m_local &&
1184 !thd->in_multi_stmt_transaction_mode())
1185 {
1186 /*
1187 The ugly part: Locally executing autocommit statement is
1188 committing and it has removed a fragment from stable storage.
1189 Now calling finish_stmt() will call trans_commit_stmt(), which will
1190 actually commit the transaction, what we really don't want
1191 to do at this point.
1192
1193 Doing nothing at this point seems to work ok, this block is
1194 intentionally no-op and for documentation purposes only.
1195 */
1196 }
1197 else
1198 {
1199 Wsrep_schema_impl::finish_stmt(thd);
1200 }
1201
1202 DBUG_RETURN(ret);
1203 }
1204
replay_transaction(THD * orig_thd,Relay_log_info * rli,const wsrep::ws_meta & ws_meta,const std::vector<wsrep::seqno> & fragments)1205 int Wsrep_schema::replay_transaction(THD* orig_thd,
1206 Relay_log_info* rli,
1207 const wsrep::ws_meta& ws_meta,
1208 const std::vector<wsrep::seqno>& fragments)
1209 {
1210 DBUG_ENTER("Wsrep_schema::replay_transaction");
1211 DBUG_ASSERT(!fragments.empty());
1212
1213 THD thd(next_thread_id(), true);
1214 thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
1215 (char*) &thd);
1216 wsrep_assign_from_threadvars(&thd);
1217
1218 Wsrep_schema_impl::wsrep_off wsrep_off(&thd);
1219 Wsrep_schema_impl::binlog_off binlog_off(&thd);
1220 Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&thd);
1221 Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd);
1222
1223 int ret= 1;
1224 int error;
1225 TABLE* frag_table= 0;
1226 uchar *key=NULL;
1227 key_part_map key_map= 0;
1228
1229 for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
1230 i != fragments.end(); ++i)
1231 {
1232 Wsrep_schema_impl::init_stmt(&thd);
1233 if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table)))
1234 {
1235 WSREP_WARN("Could not open SR table for read: %d", error);
1236 Wsrep_schema_impl::finish_stmt(&thd);
1237 DBUG_RETURN(1);
1238 }
1239
1240 Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
1241 Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
1242 Wsrep_schema_impl::store(frag_table, 2, i->get());
1243 Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
1244
1245 int error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1246 key,
1247 key_map);
1248 if (error)
1249 {
1250 WSREP_WARN("Failed to init streaming log table for index scan: %d",
1251 error);
1252 Wsrep_schema_impl::end_index_scan(frag_table);
1253 ret= 1;
1254 break;
1255 }
1256
1257 int flags;
1258 Wsrep_schema_impl::scan(frag_table, 3, flags);
1259 WSREP_DEBUG("replay_fragment(%llu): seqno: %lld flags: %x",
1260 ws_meta.transaction_id().get(),
1261 i->get(),
1262 flags);
1263 String buf;
1264 frag_table->field[4]->val_str(&buf);
1265
1266 {
1267 Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd);
1268
1269 ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length());
1270 if (ret)
1271 {
1272 WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
1273 break;
1274 }
1275 }
1276
1277 Wsrep_schema_impl::end_index_scan(frag_table);
1278 Wsrep_schema_impl::finish_stmt(&thd);
1279
1280 Wsrep_schema_impl::init_stmt(&thd);
1281
1282 if ((error= Wsrep_schema_impl::open_for_write(&thd,
1283 sr_table_str.c_str(),
1284 &frag_table)))
1285 {
1286 WSREP_WARN("Could not open SR table for write: %d", error);
1287 Wsrep_schema_impl::finish_stmt(&thd);
1288 DBUG_RETURN(1);
1289 }
1290
1291 error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1292 key,
1293 key_map);
1294 if (error)
1295 {
1296 WSREP_WARN("Failed to init streaming log table for index scan: %d",
1297 error);
1298 Wsrep_schema_impl::end_index_scan(frag_table);
1299 ret= 1;
1300 break;
1301 }
1302
1303 error= Wsrep_schema_impl::delete_row(frag_table);
1304
1305 if (error)
1306 {
1307 WSREP_WARN("Could not delete row from streaming log table: %d", error);
1308 Wsrep_schema_impl::end_index_scan(frag_table);
1309 ret= 1;
1310 break;
1311 }
1312 Wsrep_schema_impl::end_index_scan(frag_table);
1313 Wsrep_schema_impl::finish_stmt(&thd);
1314 my_free(key);
1315 key= NULL;
1316 }
1317
1318 if (key)
1319 my_free(key);
1320 DBUG_RETURN(ret);
1321 }
1322
recover_sr_transactions(THD * orig_thd)1323 int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
1324 {
1325 DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
1326 THD storage_thd(next_thread_id(), true);
1327 storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
1328 (char*) &storage_thd);
1329 wsrep_assign_from_threadvars(&storage_thd);
1330 TABLE* frag_table= 0;
1331 TABLE* cluster_table= 0;
1332 Wsrep_storage_service storage_service(&storage_thd);
1333 Wsrep_schema_impl::binlog_off binlog_off(&storage_thd);
1334 Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd);
1335 Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&storage_thd);
1336 Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
1337 &storage_thd);
1338 Wsrep_server_state& server_state(Wsrep_server_state::instance());
1339
1340 int ret= 1;
1341 int error;
1342 wsrep::id cluster_id;
1343
1344 Wsrep_schema_impl::init_stmt(&storage_thd);
1345 storage_thd.wsrep_skip_locking= FALSE;
1346 if (Wsrep_schema_impl::open_for_read(&storage_thd,
1347 cluster_table_str.c_str(),
1348 &cluster_table) ||
1349 Wsrep_schema_impl::init_for_scan(cluster_table))
1350 {
1351 Wsrep_schema_impl::finish_stmt(&storage_thd);
1352 DBUG_RETURN(1);
1353 }
1354
1355 if ((error= Wsrep_schema_impl::next_record(cluster_table)))
1356 {
1357 Wsrep_schema_impl::end_scan(cluster_table);
1358 Wsrep_schema_impl::finish_stmt(&storage_thd);
1359 trans_commit(&storage_thd);
1360 if (error == HA_ERR_END_OF_FILE)
1361 {
1362 WSREP_INFO("Cluster table is empty, not recovering transactions");
1363 DBUG_RETURN(0);
1364 }
1365 else
1366 {
1367 WSREP_ERROR("Failed to read cluster table: %d", error);
1368 DBUG_RETURN(1);
1369 }
1370 }
1371
1372 Wsrep_schema_impl::scan(cluster_table, 0, cluster_id);
1373 Wsrep_schema_impl::end_scan(cluster_table);
1374 Wsrep_schema_impl::finish_stmt(&storage_thd);
1375
1376 std::ostringstream os;
1377 os << cluster_id;
1378 WSREP_INFO("Recovered cluster id %s", os.str().c_str());
1379
1380 storage_thd.wsrep_skip_locking= TRUE;
1381 Wsrep_schema_impl::init_stmt(&storage_thd);
1382
1383 /*
1384 Open the table for reading and writing so that fragments without
1385 valid seqno can be deleted.
1386 */
1387 if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) ||
1388 Wsrep_schema_impl::init_for_scan(frag_table))
1389 {
1390 WSREP_ERROR("Failed to open SR table for write");
1391 goto out;
1392 }
1393
1394 while (true)
1395 {
1396 if ((error= Wsrep_schema_impl::next_record(frag_table)) == 0)
1397 {
1398 wsrep::id server_id;
1399 Wsrep_schema_impl::scan(frag_table, 0, server_id);
1400 wsrep::client_id client_id;
1401 unsigned long long transaction_id_ull;
1402 Wsrep_schema_impl::scan(frag_table, 1, transaction_id_ull);
1403 wsrep::transaction_id transaction_id(transaction_id_ull);
1404 long long seqno_ll;
1405 Wsrep_schema_impl::scan(frag_table, 2, seqno_ll);
1406 wsrep::seqno seqno(seqno_ll);
1407
1408 /* This is possible if the server crashes between inserting the
1409 fragment into table and updating the fragment seqno after
1410 certification. */
1411 if (seqno.is_undefined())
1412 {
1413 Wsrep_schema_impl::delete_row(frag_table);
1414 continue;
1415 }
1416
1417 wsrep::gtid gtid(cluster_id, seqno);
1418 int flags;
1419 Wsrep_schema_impl::scan(frag_table, 3, flags);
1420 String data_str;
1421
1422 (void)frag_table->field[4]->val_str(&data_str);
1423 wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length());
1424 wsrep::ws_meta ws_meta(gtid,
1425 wsrep::stid(server_id,
1426 transaction_id,
1427 client_id),
1428 wsrep::seqno::undefined(),
1429 flags);
1430
1431 wsrep::high_priority_service* applier;
1432 if (!(applier= server_state.find_streaming_applier(server_id,
1433 transaction_id)))
1434 {
1435 DBUG_ASSERT(wsrep::starts_transaction(flags));
1436 applier = wsrep_create_streaming_applier(&storage_thd, "recovery");
1437 server_state.start_streaming_applier(server_id, transaction_id,
1438 applier);
1439 applier->start_transaction(wsrep::ws_handle(transaction_id, 0),
1440 ws_meta);
1441 }
1442 applier->store_globals();
1443 wsrep::mutable_buffer unused;
1444 applier->apply_write_set(ws_meta, data, unused);
1445 applier->after_apply();
1446 storage_service.store_globals();
1447 }
1448 else if (error == HA_ERR_END_OF_FILE)
1449 {
1450 ret= 0;
1451 break;
1452 }
1453 else
1454 {
1455 WSREP_ERROR("SR table scan returned error %d", error);
1456 break;
1457 }
1458 }
1459 Wsrep_schema_impl::end_scan(frag_table);
1460 Wsrep_schema_impl::finish_stmt(&storage_thd);
1461 trans_commit(&storage_thd);
1462 storage_thd.set_mysys_var(0);
1463 out:
1464 DBUG_RETURN(ret);
1465 }
1466