1 /* Copyright (C) 2015-2021 Codership Oy <info@codership.com>
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License along
13    with this program; if not, write to the Free Software Foundation, Inc.,
14    51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
15 
16 #include "mariadb.h"
17 
18 #include "table.h"
19 #include "key.h"
20 #include "sql_base.h"
21 #include "sql_parse.h"
22 #include "sql_update.h"
23 #include "transaction.h"
24 
25 #include "mysql/service_wsrep.h"
26 #include "wsrep_schema.h"
27 #include "wsrep_applier.h"
28 #include "wsrep_xid.h"
29 #include "wsrep_binlog.h"
30 #include "wsrep_high_priority_service.h"
31 #include "wsrep_storage_service.h"
32 #include "wsrep_thd.h"
33 
34 #include <string>
35 #include <sstream>
36 
37 #define WSREP_SCHEMA          "mysql"
38 #define WSREP_STREAMING_TABLE "wsrep_streaming_log"
39 #define WSREP_CLUSTER_TABLE   "wsrep_cluster"
40 #define WSREP_MEMBERS_TABLE   "wsrep_cluster_members"
41 
42 const char* wsrep_sr_table_name_full= WSREP_SCHEMA "/" WSREP_STREAMING_TABLE;
43 
44 static const std::string wsrep_schema_str= WSREP_SCHEMA;
45 static const std::string sr_table_str= WSREP_STREAMING_TABLE;
46 static const std::string cluster_table_str= WSREP_CLUSTER_TABLE;
47 static const std::string members_table_str= WSREP_MEMBERS_TABLE;
48 
49 static const std::string create_cluster_table_str=
50   "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_table_str +
51   "("
52   "cluster_uuid CHAR(36) PRIMARY KEY,"
53   "view_id BIGINT NOT NULL,"
54   "view_seqno BIGINT NOT NULL,"
55   "protocol_version INT NOT NULL,"
56   "capabilities INT NOT NULL"
57   ") ENGINE=InnoDB STATS_PERSISTENT=0";
58 
59 static const std::string create_members_table_str=
60   "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + members_table_str +
61   "("
62   "node_uuid CHAR(36) PRIMARY KEY,"
63   "cluster_uuid CHAR(36) NOT NULL,"
64   "node_name CHAR(32) NOT NULL,"
65   "node_incoming_address VARCHAR(256) NOT NULL"
66   ") ENGINE=InnoDB STATS_PERSISTENT=0";
67 
68 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
69 static const std::string cluster_member_history_table_str= "wsrep_cluster_member_history";
70 static const std::string create_members_history_table_str=
71   "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + cluster_member_history_table_str +
72   "("
73   "node_uuid CHAR(36) PRIMARY KEY,"
74   "cluster_uuid CHAR(36) NOT NULL,"
75   "last_view_id BIGINT NOT NULL,"
76   "last_view_seqno BIGINT NOT NULL,"
77   "node_name CHAR(32) NOT NULL,"
78   "node_incoming_address VARCHAR(256) NOT NULL"
79   ") ENGINE=InnoDB STATS_PERSISTENT=0";
80 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
81 
82 static const std::string create_frag_table_str=
83   "CREATE TABLE IF NOT EXISTS " + wsrep_schema_str + "." + sr_table_str +
84   "("
85   "node_uuid CHAR(36), "
86   "trx_id BIGINT, "
87   "seqno BIGINT, "
88   "flags INT NOT NULL, "
89   "frag LONGBLOB NOT NULL, "
90   "PRIMARY KEY (node_uuid, trx_id, seqno)"
91   ") ENGINE=InnoDB STATS_PERSISTENT=0";
92 
93 static const std::string delete_from_cluster_table=
94   "DELETE FROM " + wsrep_schema_str + "." + cluster_table_str;
95 
96 static const std::string delete_from_members_table=
97   "DELETE FROM " + wsrep_schema_str + "." + members_table_str;
98 
99 /* For rolling upgrade we need to use ALTER. We do not want
100 persistent statistics to be collected from these tables. */
101 static const std::string alter_cluster_table=
102   "ALTER TABLE " + wsrep_schema_str + "." + cluster_table_str +
103   " STATS_PERSISTENT=0";
104 
105 static const std::string alter_members_table=
106   "ALTER TABLE " + wsrep_schema_str + "." + members_table_str +
107   " STATS_PERSISTENT=0";
108 
109 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
110 static const std::string alter_members_history_table=
111   "ALTER TABLE " + wsrep_schema_str + "." + members_history_table_str +
112   " STATS_PERSISTENT=0";
113 #endif
114 
115 static const std::string alter_frag_table=
116   "ALTER TABLE " + wsrep_schema_str + "." + sr_table_str +
117   " STATS_PERSISTENT=0";
118 
119 namespace Wsrep_schema_impl
120 {
121 
122 class binlog_off
123 {
124 public:
binlog_off(THD * thd)125   binlog_off(THD* thd)
126     : m_thd(thd)
127     , m_option_bits(thd->variables.option_bits)
128     , m_sql_log_bin(thd->variables.sql_log_bin)
129   {
130     thd->variables.option_bits&= ~OPTION_BIN_LOG;
131     thd->variables.sql_log_bin= 0;
132   }
~binlog_off()133   ~binlog_off()
134   {
135     m_thd->variables.option_bits= m_option_bits;
136     m_thd->variables.sql_log_bin= m_sql_log_bin;
137   }
138 private:
139   THD* m_thd;
140   ulonglong m_option_bits;
141   my_bool m_sql_log_bin;
142 };
143 
144 class wsrep_off
145 {
146 public:
wsrep_off(THD * thd)147   wsrep_off(THD* thd)
148     : m_thd(thd)
149     , m_wsrep_on(thd->variables.wsrep_on)
150   {
151     thd->variables.wsrep_on= 0;
152   }
~wsrep_off()153   ~wsrep_off()
154   {
155     m_thd->variables.wsrep_on= m_wsrep_on;
156   }
157 private:
158   THD* m_thd;
159   my_bool m_wsrep_on;
160 };
161 
162 class thd_context_switch
163 {
164 public:
thd_context_switch(THD * orig_thd,THD * cur_thd)165   thd_context_switch(THD *orig_thd, THD *cur_thd)
166     : m_orig_thd(orig_thd)
167     , m_cur_thd(cur_thd)
168   {
169     wsrep_reset_threadvars(m_orig_thd);
170     wsrep_store_threadvars(m_cur_thd);
171   }
~thd_context_switch()172   ~thd_context_switch()
173   {
174     wsrep_reset_threadvars(m_cur_thd);
175     wsrep_store_threadvars(m_orig_thd);
176   }
177 private:
178   THD *m_orig_thd;
179   THD *m_cur_thd;
180 };
181 
182 class sql_safe_updates
183 {
184 public:
sql_safe_updates(THD * thd)185   sql_safe_updates(THD* thd)
186     : m_thd(thd)
187     , m_option_bits(thd->variables.option_bits)
188   {
189     thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
190   }
~sql_safe_updates()191   ~sql_safe_updates()
192   {
193     m_thd->variables.option_bits= m_option_bits;
194   }
195 private:
196   THD* m_thd;
197   ulonglong m_option_bits;
198 };
199 
execute_SQL(THD * thd,const char * sql,uint length)200 static int execute_SQL(THD* thd, const char* sql, uint length) {
201   DBUG_ENTER("Wsrep_schema::execute_SQL()");
202   int err= 0;
203 
204   PSI_statement_locker *parent_locker= thd->m_statement_psi;
205   Parser_state parser_state;
206 
207   WSREP_DEBUG("SQL: %d %s thd: %lld", length, sql, (long long)thd->thread_id);
208 
209   if (parser_state.init(thd, (char*)sql, length) == 0) {
210     thd->reset_for_next_command();
211     lex_start(thd);
212 
213     thd->m_statement_psi= NULL;
214 
215     thd->set_query((char*)sql, length);
216     thd->set_query_id(next_query_id());
217 
218     mysql_parse(thd, (char*)sql, length, & parser_state, FALSE, FALSE);
219 
220     if (thd->is_error()) {
221       WSREP_WARN("Wsrep_schema::execute_sql() failed, %d %s\nSQL: %s",
222                  thd->get_stmt_da()->sql_errno(),
223                  thd->get_stmt_da()->message(),
224                  sql);
225       err= 1;
226     }
227     thd->m_statement_psi= parent_locker;
228     thd->end_statement();
229     thd->reset_query();
230     close_thread_tables(thd);
231     delete_explain_query(thd->lex);
232   }
233   else {
234     WSREP_WARN("SR init failure");
235   }
236   thd->cleanup_after_query();
237   DBUG_RETURN(err);
238 }
239 
240 /*
241   Initialize thd for next "statement"
242  */
init_stmt(THD * thd)243 static void init_stmt(THD* thd) {
244   thd->reset_for_next_command();
245 }
246 
finish_stmt(THD * thd)247 static void finish_stmt(THD* thd) {
248   trans_commit_stmt(thd);
249   close_thread_tables(thd);
250 }
251 
open_table(THD * thd,const LEX_CSTRING * schema_name,const LEX_CSTRING * table_name,enum thr_lock_type const lock_type,TABLE ** table)252 static int open_table(THD* thd,
253                const LEX_CSTRING *schema_name,
254                const LEX_CSTRING *table_name,
255                enum thr_lock_type const lock_type,
256                TABLE** table) {
257   assert(table);
258   *table= NULL;
259 
260   DBUG_ENTER("Wsrep_schema::open_table()");
261 
262   TABLE_LIST tables;
263   uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK |
264                MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY |
265                MYSQL_OPEN_IGNORE_FLUSH |
266                MYSQL_LOCK_IGNORE_TIMEOUT);
267 
268   tables.init_one_table(schema_name,
269                         table_name,
270                         NULL, lock_type);
271   thd->lex->query_tables_own_last= 0;
272 
273   if (!open_n_lock_single_table(thd, &tables, tables.lock_type, flags)) {
274     close_thread_tables(thd);
275     DBUG_RETURN(1);
276   }
277 
278   *table= tables.table;
279   (*table)->use_all_columns();
280 
281   DBUG_RETURN(0);
282 }
283 
284 
open_for_write(THD * thd,const char * table_name,TABLE ** table)285 static int open_for_write(THD* thd, const char* table_name, TABLE** table) {
286   LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
287   LEX_CSTRING table_str= { table_name, strlen(table_name) };
288   if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_WRITE,
289                                     table)) {
290     // No need to log an error if the query was bf aborted,
291     // thd client will get ER_LOCK_DEADLOCK in the end.
292     const bool interrupted= thd->killed ||
293       (thd->is_error() &&
294        (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED));
295     if (!interrupted) {
296       WSREP_ERROR("Failed to open table %s.%s for writing",
297                   schema_str.str, table_name);
298     }
299     return 1;
300   }
301   empty_record(*table);
302   (*table)->use_all_columns();
303   restore_record(*table, s->default_values);
304   return 0;
305 }
306 
store(TABLE * table,uint field,const Wsrep_id & id)307 static void store(TABLE* table, uint field, const Wsrep_id& id) {
308   assert(field < table->s->fields);
309   std::ostringstream os;
310   os << id;
311   table->field[field]->store(os.str().c_str(),
312                              os.str().size(),
313                              &my_charset_bin);
314 }
315 
316 
317 template <typename INTTYPE>
store(TABLE * table,uint field,const INTTYPE val)318 static void store(TABLE* table, uint field, const INTTYPE val) {
319   assert(field < table->s->fields);
320   table->field[field]->store(val);
321 }
322 
323 template <typename CHARTYPE>
store(TABLE * table,uint field,const CHARTYPE * str,size_t str_len)324 static void store(TABLE* table, uint field, const CHARTYPE* str, size_t str_len) {
325   assert(field < table->s->fields);
326   table->field[field]->store((const char*)str,
327                              str_len,
328                              &my_charset_bin);
329 }
330 
store(TABLE * table,uint field,const std::string & str)331 static void store(TABLE* table, uint field, const std::string& str)
332 {
333   store(table, field, str.c_str(), str.size());
334 }
335 
update_or_insert(TABLE * table)336 static int update_or_insert(TABLE* table) {
337   DBUG_ENTER("Wsrep_schema::update_or_insert()");
338   int ret= 0;
339   char* key;
340   int error;
341 
342   /*
343     Verify that the table has primary key defined.
344   */
345   if (table->s->primary_key >= MAX_KEY ||
346       !table->s->keys_in_use.is_set(table->s->primary_key)) {
347     WSREP_ERROR("No primary key for %s.%s",
348                 table->s->db.str, table->s->table_name.str);
349     DBUG_RETURN(1);
350   }
351 
352   /*
353     Find the record and update or insert a new one if not found.
354   */
355   if (!(key= (char*) my_safe_alloca(table->s->max_unique_length))) {
356     WSREP_ERROR("Error allocating %ud bytes for key",
357                 table->s->max_unique_length);
358     DBUG_RETURN(1);
359   }
360 
361   key_copy((uchar*) key, table->record[0],
362            table->key_info + table->s->primary_key, 0);
363 
364   if ((error= table->file->ha_index_read_idx_map(table->record[1],
365                                                  table->s->primary_key,
366                                                  (uchar*) key,
367                                                  HA_WHOLE_KEY,
368                                                  HA_READ_KEY_EXACT))) {
369     /*
370       Row not found, insert a new one.
371     */
372     if ((error= table->file->ha_write_row(table->record[0]))) {
373       WSREP_ERROR("Error writing into %s.%s: %d",
374                   table->s->db.str,
375                   table->s->table_name.str,
376                   error);
377       ret= 1;
378     }
379   }
380   else if (!records_are_comparable(table) || compare_record(table)) {
381     /*
382       Record has changed
383     */
384     if ((error= table->file->ha_update_row(table->record[1],
385                                            table->record[0])) &&
386         error != HA_ERR_RECORD_IS_THE_SAME) {
387       WSREP_ERROR("Error updating record in %s.%s: %d",
388                   table->s->db.str,
389                   table->s->table_name.str,
390                   error);
391       ret= 1;
392     }
393   }
394 
395   my_safe_afree(key, table->s->max_unique_length);
396 
397   DBUG_RETURN(ret);
398 }
399 
insert(TABLE * table)400 static int insert(TABLE* table) {
401   DBUG_ENTER("Wsrep_schema::insert()");
402   int ret= 0;
403   int error;
404 
405   /*
406     Verify that the table has primary key defined.
407   */
408   if (table->s->primary_key >= MAX_KEY ||
409       !table->s->keys_in_use.is_set(table->s->primary_key)) {
410     WSREP_ERROR("No primary key for %s.%s",
411                 table->s->db.str, table->s->table_name.str);
412     DBUG_RETURN(1);
413   }
414 
415   if ((error= table->file->ha_write_row(table->record[0]))) {
416     WSREP_ERROR("Error writing into %s.%s: %d",
417                 table->s->db.str,
418                 table->s->table_name.str,
419                 error);
420     ret= 1;
421   }
422 
423   DBUG_RETURN(ret);
424 }
425 
delete_row(TABLE * table)426 static int delete_row(TABLE* table) {
427   int error;
428   int retry= 3;
429 
430   do {
431     error= table->file->ha_delete_row(table->record[0]);
432     retry--;
433   } while (error && retry);
434 
435   if (error) {
436     WSREP_ERROR("Error deleting row from %s.%s: %d",
437                 table->s->db.str,
438                 table->s->table_name.str,
439                 error);
440     return 1;
441   }
442   return 0;
443 }
444 
open_for_read(THD * thd,const char * table_name,TABLE ** table)445 static int open_for_read(THD* thd, const char* table_name, TABLE** table) {
446 
447   LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() };
448   LEX_CSTRING table_str= { table_name, strlen(table_name) };
449   if (Wsrep_schema_impl::open_table(thd, &schema_str, &table_str, TL_READ,
450                                     table)) {
451     WSREP_ERROR("Failed to open table %s.%s for reading",
452                 schema_str.str, table_name);
453     return 1;
454   }
455   empty_record(*table);
456   (*table)->use_all_columns();
457   restore_record(*table, s->default_values);
458   return 0;
459 }
460 
461 /*
462   Init table for sequential scan.
463 
464   @return 0 in case of success, 1 in case of error.
465  */
init_for_scan(TABLE * table)466 static int init_for_scan(TABLE* table) {
467   int error;
468   if ((error= table->file->ha_rnd_init(TRUE))) {
469     WSREP_ERROR("Failed to init table for scan: %d", error);
470     return 1;
471   }
472   return 0;
473 }
474 /*
475   Scan next record. For return codes see handler::ha_rnd_next()
476 
477   @return 0 in case of success, error code in case of error
478  */
next_record(TABLE * table)479 static int next_record(TABLE* table) {
480   int error;
481   if ((error= table->file->ha_rnd_next(table->record[0])) &&
482       error != HA_ERR_END_OF_FILE) {
483     WSREP_ERROR("Failed to read next record: %d", error);
484   }
485   return error;
486 }
487 
488 /*
489   End scan.
490 
491   @return 0 in case of success, 1 in case of error.
492  */
end_scan(TABLE * table)493 static int end_scan(TABLE* table) {
494   int error;
495   if ((error= table->file->ha_rnd_end())) {
496     WSREP_ERROR("Failed to end scan: %d", error);
497     return 1;
498   }
499   return 0;
500 }
501 
scan(TABLE * table,uint field,wsrep::id & id)502 static int scan(TABLE* table, uint field, wsrep::id& id)
503 {
504   assert(field < table->s->fields);
505   String uuid_str;
506   (void)table->field[field]->val_str(&uuid_str);
507   id= wsrep::id(std::string(uuid_str.c_ptr(), uuid_str.length()));
508   return 0;
509 }
510 
511 template <typename INTTYPE>
scan(TABLE * table,uint field,INTTYPE & val)512 static int scan(TABLE* table, uint field, INTTYPE& val)
513 {
514   assert(field < table->s->fields);
515   val= table->field[field]->val_int();
516   return 0;
517 }
518 
scan(TABLE * table,uint field,char * strbuf,uint strbuf_len)519 static int scan(TABLE* table, uint field, char* strbuf, uint strbuf_len)
520 {
521   String str;
522   (void)table->field[field]->val_str(&str);
523   LEX_CSTRING tmp= str.lex_cstring();
524   uint len = tmp.length;
525   strncpy(strbuf, tmp.str, std::min(len, strbuf_len));
526   strbuf[strbuf_len - 1]= '\0';
527   return 0;
528 }
529 
530 /*
531   Scan member
532   TODO: filter members by cluster UUID
533  */
scan_member(TABLE * table,const Wsrep_id & cluster_uuid,std::vector<Wsrep_view::member> & members)534 static int scan_member(TABLE* table,
535                        const Wsrep_id& cluster_uuid,
536                        std::vector<Wsrep_view::member>& members)
537 {
538   Wsrep_id member_id;
539   char member_name[128]= { 0, };
540   char member_incoming[128]= { 0, };
541 
542   if (scan(table, 0, member_id) ||
543       scan(table, 2, member_name, sizeof(member_name)) ||
544       scan(table, 3, member_incoming, sizeof(member_incoming))) {
545     return 1;
546   }
547 
548   if (members.empty() == false) {
549     assert(members.rbegin()->id() < member_id);
550   }
551 
552   try {
553     members.push_back(Wsrep_view::member(member_id,
554                                          member_name,
555                                          member_incoming));
556   }
557   catch (...) {
558     WSREP_ERROR("Caught exception while scanning members table");
559     return 1;
560   }
561   return 0;
562 }
563 
564 /*
565   Init table for index scan and retrieve first record
566 
567   @return 0 in case of success, error code in case of error.
568  */
init_for_index_scan(TABLE * table,const uchar * key,key_part_map map)569 static int init_for_index_scan(TABLE* table, const uchar* key,
570                                key_part_map map) {
571   int error;
572   if ((error= table->file->ha_index_init(table->s->primary_key, true))) {
573     WSREP_ERROR("Failed to init table for index scan: %d", error);
574     return error;
575   }
576 
577   error= table->file->ha_index_read_map(table->record[0],
578                                         key, map, HA_READ_KEY_EXACT);
579   switch(error) {
580   case 0:
581   case HA_ERR_END_OF_FILE:
582   case HA_ERR_KEY_NOT_FOUND:
583   case HA_ERR_ABORTED_BY_USER:
584     break;
585   case -1:
586     WSREP_DEBUG("init_for_index_scan interrupted");
587     break;
588   default:
589     WSREP_ERROR("init_for_index_scan failed to read first record, error %d", error);
590   }
591   return error;
592 }
593 
594 /*
595   End index scan.
596 
597   @return 0 in case of success, 1 in case of error.
598  */
end_index_scan(TABLE * table)599 static int end_index_scan(TABLE* table) {
600   int error;
601   if (table->file->inited) {
602     if ((error= table->file->ha_index_end())) {
603       WSREP_ERROR("Failed to end scan: %d", error);
604       return 1;
605     }
606   }
607   return 0;
608 }
609 
make_key(TABLE * table,uchar ** key,key_part_map * map,int parts)610 static void make_key(TABLE* table, uchar** key, key_part_map* map, int parts) {
611   uint prefix_length= 0;
612   KEY_PART_INFO* key_part= table->key_info->key_part;
613 
614   for (int i=0; i < parts; i++)
615     prefix_length += key_part[i].store_length;
616 
617   *map= make_prev_keypart_map(parts);
618 
619   if (!(*key= (uchar *) my_malloc(prefix_length + 1, MYF(MY_WME))))
620   {
621     WSREP_ERROR("Failed to allocate memory for key prefix_length %u", prefix_length);
622     assert(0);
623   }
624 
625   key_copy(*key, table->record[0], table->key_info, prefix_length);
626 }
627 
628 } /* namespace Wsrep_schema_impl */
629 
630 
Wsrep_schema()631 Wsrep_schema::Wsrep_schema()
632 {
633 }
634 
~Wsrep_schema()635 Wsrep_schema::~Wsrep_schema()
636 { }
637 
wsrep_init_thd_for_schema(THD * thd)638 static void wsrep_init_thd_for_schema(THD *thd)
639 {
640   thd->security_ctx->skip_grants();
641   thd->system_thread= SYSTEM_THREAD_GENERIC;
642 
643   thd->real_id=pthread_self(); // Keep purify happy
644 
645   thd->prior_thr_create_utime= thd->start_utime= thd->thr_create_utime;
646 
647   /* No Galera replication */
648   thd->variables.wsrep_on= 0;
649   /* No binlogging */
650   thd->variables.sql_log_bin= 0;
651   thd->variables.option_bits&= ~OPTION_BIN_LOG;
652   /* No safe updates */
653   thd->variables.option_bits&= ~OPTION_SAFE_UPDATES;
654   /* No general log */
655   thd->variables.option_bits|= OPTION_LOG_OFF;
656   /* Read committed isolation to avoid gap locking */
657   thd->variables.tx_isolation= ISO_READ_COMMITTED;
658   wsrep_assign_from_threadvars(thd);
659   wsrep_store_threadvars(thd);
660 }
661 
init()662 int Wsrep_schema::init()
663 {
664   DBUG_ENTER("Wsrep_schema::init()");
665   int ret;
666   THD* thd= new THD(next_thread_id());
667   if (!thd) {
668     WSREP_ERROR("Unable to get thd");
669     DBUG_RETURN(1);
670   }
671   thd->thread_stack= (char*)&thd;
672   wsrep_init_thd_for_schema(thd);
673 
674   if (Wsrep_schema_impl::execute_SQL(thd, create_cluster_table_str.c_str(),
675                                      create_cluster_table_str.size()) ||
676       Wsrep_schema_impl::execute_SQL(thd, create_members_table_str.c_str(),
677                                      create_members_table_str.size()) ||
678 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
679       Wsrep_schema_impl::execute_SQL(thd,
680                                      create_members_history_table_str.c_str(),
681                                      create_members_history_table_str.size()) ||
682       Wsrep_schema_impl::execute_SQL(thd,
683                                      alter_members_history_table.c_str(),
684                                      alter_members_history_table.size()) ||
685 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
686       Wsrep_schema_impl::execute_SQL(thd,
687                                      create_frag_table_str.c_str(),
688                                      create_frag_table_str.size()) ||
689       Wsrep_schema_impl::execute_SQL(thd,
690                                      alter_cluster_table.c_str(),
691                                      alter_cluster_table.size()) ||
692       Wsrep_schema_impl::execute_SQL(thd,
693                                      alter_members_table.c_str(),
694                                      alter_members_table.size()) ||
695       Wsrep_schema_impl::execute_SQL(thd,
696                                      alter_frag_table.c_str(),
697 	                             alter_frag_table.size()))
698   {
699     ret= 1;
700   }
701   else
702   {
703     ret= 0;
704   }
705 
706   delete thd;
707   DBUG_RETURN(ret);
708 }
709 
store_view(THD * thd,const Wsrep_view & view)710 int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view)
711 {
712   DBUG_ENTER("Wsrep_schema::store_view()");
713   assert(view.status() == Wsrep_view::primary);
714   int ret= 1;
715   int error;
716   TABLE* cluster_table= 0;
717   TABLE* members_table= 0;
718 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
719   TABLE* members_history_table= 0;
720 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
721 
722   Wsrep_schema_impl::wsrep_off wsrep_off(thd);
723   Wsrep_schema_impl::binlog_off binlog_off(thd);
724   Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
725 
726   /*
727     Clean up cluster table and members table.
728   */
729   if (Wsrep_schema_impl::execute_SQL(thd,
730                                      delete_from_cluster_table.c_str(),
731                                      delete_from_cluster_table.size()) ||
732       Wsrep_schema_impl::execute_SQL(thd,
733                                      delete_from_members_table.c_str(),
734                                      delete_from_members_table.size())) {
735     goto out;
736   }
737 
738   /*
739     Store cluster view info
740   */
741   Wsrep_schema_impl::init_stmt(thd);
742   if (Wsrep_schema_impl::open_for_write(thd, cluster_table_str.c_str(), &cluster_table))
743   {
744     goto out;
745   }
746 
747   Wsrep_schema_impl::store(cluster_table, 0, view.state_id().id());
748   Wsrep_schema_impl::store(cluster_table, 1, view.view_seqno().get());
749   Wsrep_schema_impl::store(cluster_table, 2, view.state_id().seqno().get());
750   Wsrep_schema_impl::store(cluster_table, 3, view.protocol_version());
751   Wsrep_schema_impl::store(cluster_table, 4, view.capabilities());
752 
753   if ((error= Wsrep_schema_impl::update_or_insert(cluster_table)))
754   {
755     WSREP_ERROR("failed to write to cluster table: %d", error);
756     goto out;
757   }
758 
759   Wsrep_schema_impl::finish_stmt(thd);
760 
761   /*
762     Store info about current members
763   */
764   Wsrep_schema_impl::init_stmt(thd);
765   if (Wsrep_schema_impl::open_for_write(thd, members_table_str.c_str(),
766                                         &members_table))
767   {
768     WSREP_ERROR("failed to open wsrep.members table");
769     goto out;
770   }
771 
772   for (size_t i= 0; i < view.members().size(); ++i)
773   {
774     Wsrep_schema_impl::store(members_table, 0, view.members()[i].id());
775     Wsrep_schema_impl::store(members_table, 1, view.state_id().id());
776     Wsrep_schema_impl::store(members_table, 2, view.members()[i].name());
777     Wsrep_schema_impl::store(members_table, 3, view.members()[i].incoming());
778     if ((error= Wsrep_schema_impl::update_or_insert(members_table)))
779     {
780       WSREP_ERROR("failed to write wsrep.members table: %d", error);
781       goto out;
782     }
783   }
784   Wsrep_schema_impl::finish_stmt(thd);
785 
786 #ifdef WSREP_SCHEMA_MEMBERS_HISTORY
787   /*
788     Store members history
789   */
790   Wsrep_schema_impl::init_stmt(thd);
791   if (Wsrep_schema_impl::open_for_write(thd, cluster_member_history.c_str(),
792                                         &members_history_table)) {
793     WSREP_ERROR("failed to open wsrep.members table");
794     goto out;
795   }
796 
797   for (size_t i= 0; i < view.members().size(); ++i) {
798     Wsrep_schema_impl::store(members_history_table, 0, view.members()[i].id());
799     Wsrep_schema_impl::store(members_history_table, 1, view.state_id().id());
800     Wsrep_schema_impl::store(members_history_table, 2, view.view_seqno());
801     Wsrep_schema_impl::store(members_history_table, 3, view.state_id().seqno());
802     Wsrep_schema_impl::store(members_history_table, 4,
803                              view.members()[i].name());
804     Wsrep_schema_impl::store(members_history_table, 5,
805                              view.members()[i].incoming());
806     if ((error= Wsrep_schema_impl::update_or_insert(members_history_table))) {
807       WSREP_ERROR("failed to write wsrep_cluster_member_history table: %d", error);
808       goto out;
809     }
810   }
811   Wsrep_schema_impl::finish_stmt(thd);
812 #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
813   ret= 0;
814  out:
815 
816   DBUG_RETURN(ret);
817 }
818 
restore_view(THD * thd,const Wsrep_id & own_id) const819 Wsrep_view Wsrep_schema::restore_view(THD* thd, const Wsrep_id& own_id) const {
820   DBUG_ENTER("Wsrep_schema::restore_view()");
821 
822   int ret= 1;
823   int error;
824 
825   TABLE* cluster_table= 0;
826   bool end_cluster_scan= false;
827   TABLE* members_table= 0;
828   bool end_members_scan= false;
829 
830   /* variables below need to be initialized in case cluster table is empty */
831   Wsrep_id cluster_uuid;
832   wsrep_seqno_t view_id= -1;
833   wsrep_seqno_t view_seqno= -1;
834   int my_idx= -1;
835   int proto_ver= 0;
836   wsrep_cap_t capabilities= 0;
837   std::vector<Wsrep_view::member> members;
838 
839   // we don't want causal waits for reading non-replicated private data
840   int const wsrep_sync_wait_saved= thd->variables.wsrep_sync_wait;
841   thd->variables.wsrep_sync_wait= 0;
842 
843   if (trans_begin(thd, MYSQL_START_TRANS_OPT_READ_ONLY)) {
844     WSREP_ERROR("wsrep_schema::restore_view(): Failed to start transaction");
845     goto out;
846   }
847 
848   /*
849     Read cluster info from cluster table
850    */
851   Wsrep_schema_impl::init_stmt(thd);
852   if (Wsrep_schema_impl::open_for_read(thd, cluster_table_str.c_str(), &cluster_table) ||
853       Wsrep_schema_impl::init_for_scan(cluster_table)) {
854     goto out;
855   }
856 
857   if (((error= Wsrep_schema_impl::next_record(cluster_table)) != 0 ||
858        Wsrep_schema_impl::scan(cluster_table, 0, cluster_uuid) ||
859        Wsrep_schema_impl::scan(cluster_table, 1, view_id) ||
860        Wsrep_schema_impl::scan(cluster_table, 2, view_seqno) ||
861        Wsrep_schema_impl::scan(cluster_table, 3, proto_ver) ||
862        Wsrep_schema_impl::scan(cluster_table, 4, capabilities)) &&
863       error != HA_ERR_END_OF_FILE) {
864     end_cluster_scan= true;
865     goto out;
866   }
867 
868   if (Wsrep_schema_impl::end_scan(cluster_table)) {
869     goto out;
870   }
871   Wsrep_schema_impl::finish_stmt(thd);
872 
873   /*
874     Read members from members table
875   */
876   Wsrep_schema_impl::init_stmt(thd);
877   if (Wsrep_schema_impl::open_for_read(thd, members_table_str.c_str(), &members_table) ||
878       Wsrep_schema_impl::init_for_scan(members_table)) {
879     goto out;
880   }
881   end_members_scan= true;
882 
883   while (true) {
884     if ((error= Wsrep_schema_impl::next_record(members_table)) == 0) {
885       if (Wsrep_schema_impl::scan_member(members_table,
886                                          cluster_uuid,
887                                          members)) {
888         goto out;
889       }
890     }
891     else if (error == HA_ERR_END_OF_FILE) {
892       break;
893     }
894     else {
895       goto out;
896     }
897   }
898 
899   end_members_scan= false;
900   if (Wsrep_schema_impl::end_scan(members_table)) {
901     goto out;
902   }
903   Wsrep_schema_impl::finish_stmt(thd);
904 
905   if (own_id.is_undefined() == false) {
906     for (uint i= 0; i < members.size(); ++i) {
907       if (members[i].id() == own_id) {
908         my_idx= i;
909         break;
910       }
911     }
912   }
913 
914   (void)trans_commit(thd);
915   ret= 0; /* Success*/
916  out:
917 
918   if (end_cluster_scan) Wsrep_schema_impl::end_scan(cluster_table);
919   if (end_members_scan) Wsrep_schema_impl::end_scan(members_table);
920 
921   if (0 != ret) {
922     trans_rollback_stmt(thd);
923     if (!trans_rollback(thd)) {
924       close_thread_tables(thd);
925     }
926   }
927   thd->release_transactional_locks();
928 
929   thd->variables.wsrep_sync_wait= wsrep_sync_wait_saved;
930 
931   if (0 == ret) {
932     Wsrep_view ret_view(
933       wsrep::gtid(cluster_uuid, Wsrep_seqno(view_seqno)),
934       Wsrep_seqno(view_id),
935       wsrep::view::primary,
936       capabilities,
937       my_idx,
938       proto_ver,
939       members
940     );
941 
942     if (wsrep_debug) {
943       std::ostringstream os;
944       os << "Restored cluster view:\n" << ret_view;
945       WSREP_INFO("%s", os.str().c_str());
946     }
947     DBUG_RETURN(ret_view);
948   }
949   else
950   {
951     WSREP_ERROR("wsrep_schema::restore_view() failed.");
952     Wsrep_view ret_view;
953     DBUG_RETURN(ret_view);
954   }
955 }
956 
append_fragment(THD * thd,const wsrep::id & server_id,wsrep::transaction_id transaction_id,wsrep::seqno seqno,int flags,const wsrep::const_buffer & data)957 int Wsrep_schema::append_fragment(THD* thd,
958                                   const wsrep::id& server_id,
959                                   wsrep::transaction_id transaction_id,
960                                   wsrep::seqno seqno,
961                                   int flags,
962                                   const wsrep::const_buffer& data)
963 {
964   DBUG_ENTER("Wsrep_schema::append_fragment");
965   std::ostringstream os;
966   os << server_id;
967   WSREP_DEBUG("Append fragment(%llu) %s, %llu",
968               thd->thread_id,
969               os.str().c_str(),
970               transaction_id.get());
971   /* use private query table list for the duration of fragment storing,
972      populated query table list from "parent DML" may cause problems .e.g
973      for virtual column handling
974  */
975   Query_tables_list query_tables_list_backup;
976   thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
977 
978   Wsrep_schema_impl::binlog_off binlog_off(thd);
979   Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
980   Wsrep_schema_impl::init_stmt(thd);
981 
982   TABLE* frag_table= 0;
983   if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
984   {
985     trans_rollback_stmt(thd);
986     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
987     DBUG_RETURN(1);
988   }
989 
990   Wsrep_schema_impl::store(frag_table, 0, server_id);
991   Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
992   Wsrep_schema_impl::store(frag_table, 2, seqno.get());
993   Wsrep_schema_impl::store(frag_table, 3, flags);
994   Wsrep_schema_impl::store(frag_table, 4, data.data(), data.size());
995 
996   int error;
997   if ((error= Wsrep_schema_impl::insert(frag_table))) {
998     WSREP_ERROR("Failed to write to frag table: %d", error);
999     trans_rollback_stmt(thd);
1000     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1001     DBUG_RETURN(1);
1002   }
1003   Wsrep_schema_impl::finish_stmt(thd);
1004   thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1005   DBUG_RETURN(0);
1006 }
1007 
update_fragment_meta(THD * thd,const wsrep::ws_meta & ws_meta)1008 int Wsrep_schema::update_fragment_meta(THD* thd,
1009                                        const wsrep::ws_meta& ws_meta)
1010 {
1011   DBUG_ENTER("Wsrep_schema::update_fragment_meta");
1012   std::ostringstream os;
1013   os << ws_meta.server_id();
1014   WSREP_DEBUG("update_frag_seqno(%llu) %s, %llu, seqno %lld",
1015               thd->thread_id,
1016               os.str().c_str(),
1017               ws_meta.transaction_id().get(),
1018               ws_meta.seqno().get());
1019   DBUG_ASSERT(ws_meta.seqno().is_undefined() == false);
1020 
1021   /* use private query table list for the duration of fragment storing,
1022      populated query table list from "parent DML" may cause problems .e.g
1023      for virtual column handling
1024  */
1025   Query_tables_list query_tables_list_backup;
1026   thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
1027 
1028   Wsrep_schema_impl::binlog_off binlog_off(thd);
1029   Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
1030   int error;
1031   uchar *key=NULL;
1032   key_part_map key_map= 0;
1033   TABLE* frag_table= 0;
1034 
1035   Wsrep_schema_impl::init_stmt(thd);
1036   if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
1037   {
1038     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1039     DBUG_RETURN(1);
1040   }
1041 
1042   /* Find record with the given uuid, trx id, and seqno -1 */
1043   Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
1044   Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
1045   Wsrep_schema_impl::store(frag_table, 2, -1);
1046   Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
1047 
1048   if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1049                                                      key, key_map)))
1050   {
1051     if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
1052     {
1053       WSREP_WARN("Record not found in %s.%s: %d",
1054                  frag_table->s->db.str,
1055                  frag_table->s->table_name.str,
1056                  error);
1057     }
1058     Wsrep_schema_impl::finish_stmt(thd);
1059     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1060     my_free(key);
1061     DBUG_RETURN(1);
1062   }
1063 
1064   my_free(key);
1065   /* Copy the original record to frag_table->record[1] */
1066   store_record(frag_table, record[1]);
1067 
1068   /* Store seqno in frag_table->record[0] and update the row */
1069   Wsrep_schema_impl::store(frag_table, 2, ws_meta.seqno().get());
1070   if ((error= frag_table->file->ha_update_row(frag_table->record[1],
1071                                               frag_table->record[0]))) {
1072     WSREP_ERROR("Error updating record in %s.%s: %d",
1073                 frag_table->s->db.str,
1074                 frag_table->s->table_name.str,
1075                 error);
1076     Wsrep_schema_impl::finish_stmt(thd);
1077     thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1078     DBUG_RETURN(1);
1079   }
1080 
1081   int ret= Wsrep_schema_impl::end_index_scan(frag_table);
1082   Wsrep_schema_impl::finish_stmt(thd);
1083   thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1084   DBUG_RETURN(ret);
1085 }
1086 
remove_fragment(THD * thd,TABLE * frag_table,const wsrep::id & server_id,wsrep::transaction_id transaction_id,wsrep::seqno seqno)1087 static int remove_fragment(THD*                  thd,
1088                            TABLE*                frag_table,
1089                            const wsrep::id&      server_id,
1090                            wsrep::transaction_id transaction_id,
1091                            wsrep::seqno          seqno)
1092 {
1093   WSREP_DEBUG("remove_fragment(%llu) trx %llu, seqno %lld",
1094               thd->thread_id,
1095               transaction_id.get(),
1096               seqno.get());
1097   int ret= 0;
1098   int error;
1099   uchar *key= NULL;
1100   key_part_map key_map= 0;
1101 
1102   DBUG_ASSERT(server_id.is_undefined() == false);
1103   DBUG_ASSERT(transaction_id.is_undefined() == false);
1104   DBUG_ASSERT(seqno.is_undefined() == false);
1105 
1106   /*
1107     Remove record with the given uuid, trx id, and seqno.
1108     Using a complete key here avoids gap locks.
1109   */
1110   Wsrep_schema_impl::store(frag_table, 0, server_id);
1111   Wsrep_schema_impl::store(frag_table, 1, transaction_id.get());
1112   Wsrep_schema_impl::store(frag_table, 2, seqno.get());
1113   Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
1114 
1115   if ((error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1116                                                      key,
1117                                                      key_map)))
1118   {
1119     if (error == HA_ERR_END_OF_FILE || error == HA_ERR_KEY_NOT_FOUND)
1120     {
1121       WSREP_DEBUG("Record not found in %s.%s:trx %llu, seqno %lld, error %d",
1122                  frag_table->s->db.str,
1123                  frag_table->s->table_name.str,
1124                  transaction_id.get(),
1125                  seqno.get(),
1126                  error);
1127     }
1128     ret= error;
1129   }
1130   else if (Wsrep_schema_impl::delete_row(frag_table))
1131   {
1132     ret= 1;
1133   }
1134 
1135   if (key)
1136     my_free(key);
1137   Wsrep_schema_impl::end_index_scan(frag_table);
1138   return ret;
1139 }
1140 
remove_fragments(THD * thd,const wsrep::id & server_id,wsrep::transaction_id transaction_id,const std::vector<wsrep::seqno> & fragments)1141 int Wsrep_schema::remove_fragments(THD* thd,
1142                                    const wsrep::id& server_id,
1143                                    wsrep::transaction_id transaction_id,
1144                                    const std::vector<wsrep::seqno>& fragments)
1145 {
1146   DBUG_ENTER("Wsrep_schema::remove_fragments");
1147   int ret= 0;
1148 
1149   WSREP_DEBUG("Removing %zu fragments", fragments.size());
1150   Wsrep_schema_impl::wsrep_off  wsrep_off(thd);
1151   Wsrep_schema_impl::binlog_off binlog_off(thd);
1152   Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
1153 
1154   Query_tables_list query_tables_list_backup;
1155   Open_tables_backup open_tables_backup;
1156   thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup);
1157   thd->reset_n_backup_open_tables_state(&open_tables_backup);
1158 
1159   TABLE* frag_table= 0;
1160   if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table))
1161   {
1162     ret= 1;
1163   }
1164   else
1165   {
1166     for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
1167          i != fragments.end(); ++i)
1168     {
1169       if (remove_fragment(thd,
1170                           frag_table,
1171                           server_id,
1172                           transaction_id, *i))
1173       {
1174         ret= 1;
1175         break;
1176       }
1177     }
1178   }
1179   close_thread_tables(thd);
1180   thd->restore_backup_open_tables_state(&open_tables_backup);
1181   thd->lex->restore_backup_query_tables_list(&query_tables_list_backup);
1182 
1183   if (thd->wsrep_cs().mode() == wsrep::client_state::m_local &&
1184       !thd->in_multi_stmt_transaction_mode())
1185   {
1186     /*
1187       The ugly part: Locally executing autocommit statement is
1188       committing and it has removed a fragment from stable storage.
1189       Now calling finish_stmt() will call trans_commit_stmt(), which will
1190       actually commit the transaction, what we really don't want
1191       to do at this point.
1192 
1193       Doing nothing at this point seems to work ok, this block is
1194       intentionally no-op and for documentation purposes only.
1195     */
1196   }
1197   else
1198   {
1199     Wsrep_schema_impl::finish_stmt(thd);
1200   }
1201 
1202   DBUG_RETURN(ret);
1203 }
1204 
replay_transaction(THD * orig_thd,Relay_log_info * rli,const wsrep::ws_meta & ws_meta,const std::vector<wsrep::seqno> & fragments)1205 int Wsrep_schema::replay_transaction(THD* orig_thd,
1206                                      Relay_log_info* rli,
1207                                      const wsrep::ws_meta& ws_meta,
1208                                      const std::vector<wsrep::seqno>& fragments)
1209 {
1210   DBUG_ENTER("Wsrep_schema::replay_transaction");
1211   DBUG_ASSERT(!fragments.empty());
1212 
1213   THD thd(next_thread_id(), true);
1214   thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
1215                      (char*) &thd);
1216   wsrep_assign_from_threadvars(&thd);
1217 
1218   Wsrep_schema_impl::wsrep_off  wsrep_off(&thd);
1219   Wsrep_schema_impl::binlog_off binlog_off(&thd);
1220   Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&thd);
1221   Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd, &thd);
1222 
1223   int ret= 1;
1224   int error;
1225   TABLE* frag_table= 0;
1226   uchar *key=NULL;
1227   key_part_map key_map= 0;
1228 
1229   for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin();
1230        i != fragments.end(); ++i)
1231   {
1232     Wsrep_schema_impl::init_stmt(&thd);
1233     if ((error= Wsrep_schema_impl::open_for_read(&thd, sr_table_str.c_str(), &frag_table)))
1234     {
1235       WSREP_WARN("Could not open SR table for read: %d", error);
1236       Wsrep_schema_impl::finish_stmt(&thd);
1237       DBUG_RETURN(1);
1238     }
1239 
1240     Wsrep_schema_impl::store(frag_table, 0, ws_meta.server_id());
1241     Wsrep_schema_impl::store(frag_table, 1, ws_meta.transaction_id().get());
1242     Wsrep_schema_impl::store(frag_table, 2, i->get());
1243     Wsrep_schema_impl::make_key(frag_table, &key, &key_map, 3);
1244 
1245     int error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1246                                                       key,
1247                                                       key_map);
1248     if (error)
1249     {
1250       WSREP_WARN("Failed to init streaming log table for index scan: %d",
1251                  error);
1252       Wsrep_schema_impl::end_index_scan(frag_table);
1253       ret= 1;
1254       break;
1255     }
1256 
1257     int flags;
1258     Wsrep_schema_impl::scan(frag_table, 3, flags);
1259     WSREP_DEBUG("replay_fragment(%llu): seqno: %lld flags: %x",
1260                 ws_meta.transaction_id().get(),
1261                 i->get(),
1262                 flags);
1263     String buf;
1264     frag_table->field[4]->val_str(&buf);
1265 
1266     {
1267       Wsrep_schema_impl::thd_context_switch thd_context_switch(&thd, orig_thd);
1268 
1269       ret= wsrep_apply_events(orig_thd, rli, buf.c_ptr_quick(), buf.length());
1270       if (ret)
1271       {
1272         WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");
1273         break;
1274       }
1275     }
1276 
1277     Wsrep_schema_impl::end_index_scan(frag_table);
1278     Wsrep_schema_impl::finish_stmt(&thd);
1279 
1280     Wsrep_schema_impl::init_stmt(&thd);
1281 
1282     if ((error= Wsrep_schema_impl::open_for_write(&thd,
1283                                                   sr_table_str.c_str(),
1284                                                   &frag_table)))
1285     {
1286       WSREP_WARN("Could not open SR table for write: %d", error);
1287       Wsrep_schema_impl::finish_stmt(&thd);
1288       DBUG_RETURN(1);
1289     }
1290 
1291     error= Wsrep_schema_impl::init_for_index_scan(frag_table,
1292                                                   key,
1293                                                   key_map);
1294     if (error)
1295     {
1296       WSREP_WARN("Failed to init streaming log table for index scan: %d",
1297                  error);
1298       Wsrep_schema_impl::end_index_scan(frag_table);
1299       ret= 1;
1300       break;
1301     }
1302 
1303     error= Wsrep_schema_impl::delete_row(frag_table);
1304 
1305     if (error)
1306     {
1307       WSREP_WARN("Could not delete row from streaming log table: %d", error);
1308       Wsrep_schema_impl::end_index_scan(frag_table);
1309       ret= 1;
1310       break;
1311     }
1312     Wsrep_schema_impl::end_index_scan(frag_table);
1313     Wsrep_schema_impl::finish_stmt(&thd);
1314     my_free(key);
1315     key= NULL;
1316   }
1317 
1318   if (key)
1319     my_free(key);
1320   DBUG_RETURN(ret);
1321 }
1322 
recover_sr_transactions(THD * orig_thd)1323 int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
1324 {
1325   DBUG_ENTER("Wsrep_schema::recover_sr_transactions");
1326   THD storage_thd(next_thread_id(), true);
1327   storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
1328                              (char*) &storage_thd);
1329   wsrep_assign_from_threadvars(&storage_thd);
1330   TABLE* frag_table= 0;
1331   TABLE* cluster_table= 0;
1332   Wsrep_storage_service storage_service(&storage_thd);
1333   Wsrep_schema_impl::binlog_off binlog_off(&storage_thd);
1334   Wsrep_schema_impl::wsrep_off wsrep_off(&storage_thd);
1335   Wsrep_schema_impl::sql_safe_updates sql_safe_updates(&storage_thd);
1336   Wsrep_schema_impl::thd_context_switch thd_context_switch(orig_thd,
1337                                                            &storage_thd);
1338   Wsrep_server_state& server_state(Wsrep_server_state::instance());
1339 
1340   int ret= 1;
1341   int error;
1342   wsrep::id cluster_id;
1343 
1344   Wsrep_schema_impl::init_stmt(&storage_thd);
1345   storage_thd.wsrep_skip_locking= FALSE;
1346   if (Wsrep_schema_impl::open_for_read(&storage_thd,
1347                                        cluster_table_str.c_str(),
1348                                        &cluster_table) ||
1349       Wsrep_schema_impl::init_for_scan(cluster_table))
1350   {
1351     Wsrep_schema_impl::finish_stmt(&storage_thd);
1352     DBUG_RETURN(1);
1353   }
1354 
1355   if ((error= Wsrep_schema_impl::next_record(cluster_table)))
1356   {
1357     Wsrep_schema_impl::end_scan(cluster_table);
1358     Wsrep_schema_impl::finish_stmt(&storage_thd);
1359     trans_commit(&storage_thd);
1360     if (error == HA_ERR_END_OF_FILE)
1361     {
1362       WSREP_INFO("Cluster table is empty, not recovering transactions");
1363       DBUG_RETURN(0);
1364     }
1365     else
1366     {
1367       WSREP_ERROR("Failed to read cluster table: %d", error);
1368       DBUG_RETURN(1);
1369     }
1370   }
1371 
1372   Wsrep_schema_impl::scan(cluster_table, 0, cluster_id);
1373   Wsrep_schema_impl::end_scan(cluster_table);
1374   Wsrep_schema_impl::finish_stmt(&storage_thd);
1375 
1376   std::ostringstream os;
1377   os << cluster_id;
1378   WSREP_INFO("Recovered cluster id %s", os.str().c_str());
1379 
1380   storage_thd.wsrep_skip_locking= TRUE;
1381   Wsrep_schema_impl::init_stmt(&storage_thd);
1382 
1383   /*
1384     Open the table for reading and writing so that fragments without
1385     valid seqno can be deleted.
1386   */
1387   if (Wsrep_schema_impl::open_for_write(&storage_thd, sr_table_str.c_str(), &frag_table) ||
1388       Wsrep_schema_impl::init_for_scan(frag_table))
1389   {
1390     WSREP_ERROR("Failed to open SR table for write");
1391     goto out;
1392   }
1393 
1394   while (true)
1395   {
1396     if ((error= Wsrep_schema_impl::next_record(frag_table)) == 0)
1397     {
1398       wsrep::id server_id;
1399       Wsrep_schema_impl::scan(frag_table, 0, server_id);
1400       wsrep::client_id client_id;
1401       unsigned long long transaction_id_ull;
1402       Wsrep_schema_impl::scan(frag_table, 1, transaction_id_ull);
1403       wsrep::transaction_id transaction_id(transaction_id_ull);
1404       long long seqno_ll;
1405       Wsrep_schema_impl::scan(frag_table, 2, seqno_ll);
1406       wsrep::seqno seqno(seqno_ll);
1407 
1408       /* This is possible if the server crashes between inserting the
1409          fragment into table and updating the fragment seqno after
1410          certification. */
1411       if (seqno.is_undefined())
1412       {
1413         Wsrep_schema_impl::delete_row(frag_table);
1414         continue;
1415       }
1416 
1417       wsrep::gtid gtid(cluster_id, seqno);
1418       int flags;
1419       Wsrep_schema_impl::scan(frag_table, 3, flags);
1420       String data_str;
1421 
1422       (void)frag_table->field[4]->val_str(&data_str);
1423       wsrep::const_buffer data(data_str.c_ptr_quick(), data_str.length());
1424       wsrep::ws_meta ws_meta(gtid,
1425                              wsrep::stid(server_id,
1426                                          transaction_id,
1427                                          client_id),
1428                              wsrep::seqno::undefined(),
1429                              flags);
1430 
1431       wsrep::high_priority_service* applier;
1432       if (!(applier= server_state.find_streaming_applier(server_id,
1433                                                          transaction_id)))
1434       {
1435         DBUG_ASSERT(wsrep::starts_transaction(flags));
1436         applier = wsrep_create_streaming_applier(&storage_thd, "recovery");
1437         server_state.start_streaming_applier(server_id, transaction_id,
1438                                              applier);
1439         applier->start_transaction(wsrep::ws_handle(transaction_id, 0),
1440                                    ws_meta);
1441       }
1442       applier->store_globals();
1443       wsrep::mutable_buffer unused;
1444       applier->apply_write_set(ws_meta, data, unused);
1445       applier->after_apply();
1446       storage_service.store_globals();
1447     }
1448     else if (error == HA_ERR_END_OF_FILE)
1449     {
1450       ret= 0;
1451       break;
1452     }
1453     else
1454     {
1455       WSREP_ERROR("SR table scan returned error %d", error);
1456       break;
1457     }
1458   }
1459   Wsrep_schema_impl::end_scan(frag_table);
1460   Wsrep_schema_impl::finish_stmt(&storage_thd);
1461   trans_commit(&storage_thd);
1462   storage_thd.set_mysys_var(0);
1463 out:
1464   DBUG_RETURN(ret);
1465 }
1466