1 /*
2 Copyright (c) 2012, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24 #include <my_global.h> /* For config defines */
25
26 #include "ha_ndbcluster_glue.h"
27 #include "ndb_conflict.h"
28 #include "ndb_binlog_extra_row_info.h"
29 #include "ndb_table_guard.h"
30
31 extern st_ndb_slave_state g_ndb_slave_state;
32
33 #ifdef HAVE_NDB_BINLOG
34 #include "ndb_mi.h"
35 extern ulong opt_ndb_slave_conflict_role;
36 extern ulong opt_ndb_extra_logging;
37
38 #define NDBTAB NdbDictionary::Table
39 #define NDBCOL NdbDictionary::Column
40
41
42 #define NDB_EXCEPTIONS_TABLE_SUFFIX "$EX"
43 #define NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER "$ex"
44
45 #define NDB_EXCEPTIONS_TABLE_COLUMN_PREFIX "NDB$"
46 #define NDB_EXCEPTIONS_TABLE_OP_TYPE "NDB$OP_TYPE"
47 #define NDB_EXCEPTIONS_TABLE_CONFLICT_CAUSE "NDB$CFT_CAUSE"
48 #define NDB_EXCEPTIONS_TABLE_ORIG_TRANSID "NDB$ORIG_TRANSID"
49 #define NDB_EXCEPTIONS_TABLE_COLUMN_OLD_SUFFIX "$OLD"
50 #define NDB_EXCEPTIONS_TABLE_COLUMN_NEW_SUFFIX "$NEW"
51
52
53 /*
54 Return true if a column has a specific prefix.
55 */
56 bool
has_prefix_ci(const char * col_name,const char * prefix,CHARSET_INFO * cs)57 ExceptionsTableWriter::has_prefix_ci(const char *col_name,
58 const char *prefix,
59 CHARSET_INFO *cs)
60 {
61 uint col_len= strlen(col_name);
62 uint prefix_len= strlen(prefix);
63 if (col_len < prefix_len)
64 return false;
65 char col_name_prefix[FN_HEADLEN];
66 strncpy(col_name_prefix, col_name, prefix_len);
67 col_name_prefix[prefix_len]= '\0';
68 return (my_strcasecmp(cs,
69 col_name_prefix,
70 prefix) == 0);
71 }
72
73 /*
74 Return true if a column has a specific suffix
75 and sets the column_real_name to the column name
76 without the suffix.
77 */
78 bool
has_suffix_ci(const char * col_name,const char * suffix,CHARSET_INFO * cs,char * col_name_real)79 ExceptionsTableWriter::has_suffix_ci(const char *col_name,
80 const char *suffix,
81 CHARSET_INFO *cs,
82 char *col_name_real)
83 {
84 uint col_len= strlen(col_name);
85 uint suffix_len= strlen(suffix);
86 const char *col_name_endp= col_name + col_len;
87 strcpy(col_name_real, col_name);
88 if (col_len > suffix_len &&
89 my_strcasecmp(cs,
90 col_name_endp - suffix_len,
91 suffix) == 0)
92 {
93 col_name_real[col_len - suffix_len]= '\0';
94 return true;
95 }
96 return false;
97 }
98
99 /*
100 Search for column_name in table and
101 return true if found. Also return what
102 position column was found in pos and possible
103 position in the primary key in key_pos.
104 */
105 bool
find_column_name_ci(CHARSET_INFO * cs,const char * col_name,const NdbDictionary::Table * table,int * pos,int * key_pos)106 ExceptionsTableWriter::find_column_name_ci(CHARSET_INFO *cs,
107 const char *col_name,
108 const NdbDictionary::Table* table,
109 int *pos,
110 int *key_pos)
111 {
112 int ncol= table->getNoOfColumns();
113 for(int m= 0; m < ncol; m++)
114 {
115 const NdbDictionary::Column* col= table->getColumn(m);
116 const char *tcol_name= col->getName();
117 if (col->getPrimaryKey())
118 (*key_pos)++;
119 if (my_strcasecmp(cs, col_name, tcol_name) == 0)
120 {
121 *pos= m;
122 return true;
123 }
124 }
125 return false;
126 }
127
128
129 bool
check_mandatory_columns(const NdbDictionary::Table * exceptionsTable)130 ExceptionsTableWriter::check_mandatory_columns(const NdbDictionary::Table* exceptionsTable)
131 {
132 DBUG_ENTER("ExceptionsTableWriter::check_mandatory_columns");
133 if (/* server id */
134 exceptionsTable->getColumn(0)->getType() == NDBCOL::Unsigned &&
135 exceptionsTable->getColumn(0)->getPrimaryKey() &&
136 /* master_server_id */
137 exceptionsTable->getColumn(1)->getType() == NDBCOL::Unsigned &&
138 exceptionsTable->getColumn(1)->getPrimaryKey() &&
139 /* master_epoch */
140 exceptionsTable->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
141 exceptionsTable->getColumn(2)->getPrimaryKey() &&
142 /* count */
143 exceptionsTable->getColumn(3)->getType() == NDBCOL::Unsigned &&
144 exceptionsTable->getColumn(3)->getPrimaryKey()
145 )
146 DBUG_RETURN(true);
147 else
148 DBUG_RETURN(false);
149 }
150
151 bool
check_pk_columns(const NdbDictionary::Table * mainTable,const NdbDictionary::Table * exceptionsTable,int & k)152 ExceptionsTableWriter::check_pk_columns(const NdbDictionary::Table* mainTable,
153 const NdbDictionary::Table* exceptionsTable,
154 int &k)
155 {
156 DBUG_ENTER("ExceptionsTableWriter::check_pk_columns");
157 const int fixed_cols= 4;
158 int ncol= mainTable->getNoOfColumns();
159 int nkey= mainTable->getNoOfPrimaryKeys();
160 /* Check columns that are part of the primary key */
161 for (int i= k= 0; i < ncol && k < nkey; i++)
162 {
163 const NdbDictionary::Column* col= mainTable->getColumn(i);
164 if (col->getPrimaryKey())
165 {
166 const NdbDictionary::Column* ex_col=
167 exceptionsTable->getColumn(fixed_cols + k);
168 if(!(ex_col != NULL &&
169 col->getType() == ex_col->getType() &&
170 col->getLength() == ex_col->getLength() &&
171 col->getNullable() == ex_col->getNullable()))
172 {
173 /*
174 Primary key type of the original table doesn't match
175 the primary key column of the execption table.
176 Assume that the table format has been extended and
177 check more below.
178 */
179 DBUG_PRINT("info", ("Primary key column columns don't match, assume extended table"));
180 m_extended= true;
181 break;
182 }
183 /*
184 Store mapping of Exception table key# to
185 orig table attrid
186 */
187 DBUG_PRINT("info", ("%u: Setting m_key_attrids[%i]= %i", __LINE__, k, i));
188 m_key_attrids[k]= i;
189 k++;
190 }
191 }
192 DBUG_RETURN(true);
193 }
194
195 bool
check_optional_columns(const NdbDictionary::Table * mainTable,const NdbDictionary::Table * exceptionsTable,char * msg_buf,uint msg_buf_len,const char ** msg,int & k,char * error_details,uint error_details_len)196 ExceptionsTableWriter::check_optional_columns(const NdbDictionary::Table* mainTable,
197 const NdbDictionary::Table* exceptionsTable,
198 char* msg_buf,
199 uint msg_buf_len,
200 const char** msg,
201 int &k,
202 char *error_details,
203 uint error_details_len)
204 {
205 DBUG_ENTER("ExceptionsTableWriter::check_optional_columns");
206 /*
207 Check optional columns.
208 Check if table has been extended by looking for
209 the NDB$ prefix. By looking at the columns in
210 reverse order we can determine if table has been
211 extended and then double check that the original
212 mandatory columns also have the NDB$ prefix.
213 If an incomplete primary key has been found or
214 additional non-primary key attributes from the
215 original table then table is also assumed to be
216 extended.
217 */
218 const char* ex_tab_name= exceptionsTable->getName();
219 const int fixed_cols= 4;
220 bool ok= true;
221 int xncol= exceptionsTable->getNoOfColumns();
222 int i;
223 for (i= xncol - 1; i >= 0; i--)
224 {
225 const NdbDictionary::Column* col= exceptionsTable->getColumn(i);
226 const char* col_name= col->getName();
227 /*
228 We really need the CHARSET_INFO from when the table was
229 created but NdbDictionary::Table doesn't save this. This
230 means we cannot handle tables and execption tables defined
231 with a charset different than the system charset.
232 */
233 CHARSET_INFO *cs= system_charset_info;
234 bool has_prefix= false;
235
236 if (has_prefix_ci(col_name, NDB_EXCEPTIONS_TABLE_COLUMN_PREFIX, cs))
237 {
238 has_prefix= true;
239 m_extended= true;
240 DBUG_PRINT("info",
241 ("Exceptions table %s is extended with column %s",
242 ex_tab_name, col_name));
243 }
244 /* Check that mandatory columns have NDB$ prefix */
245 if (i < 4)
246 {
247 if (m_extended && !has_prefix)
248 {
249 my_snprintf(msg_buf, msg_buf_len,
250 "Exceptions table %s is extended, but mandatory column %s doesn't have the \'%s\' prefix",
251 ex_tab_name,
252 col_name,
253 NDB_EXCEPTIONS_TABLE_COLUMN_PREFIX);
254 *msg= msg_buf;
255 DBUG_RETURN(false);
256 }
257 }
258 k= i - fixed_cols;
259 /* Check for extended columns */
260 if (my_strcasecmp(cs,
261 col_name,
262 NDB_EXCEPTIONS_TABLE_OP_TYPE) == 0)
263 {
264 /* Check if ENUM or INT UNSIGNED */
265 if (exceptionsTable->getColumn(i)->getType() != NDBCOL::Char &&
266 exceptionsTable->getColumn(i)->getType() != NDBCOL::Unsigned)
267 {
268 my_snprintf(error_details, error_details_len,
269 "Table %s has incorrect type %u for NDB$OP_TYPE",
270 exceptionsTable->getName(),
271 exceptionsTable->getColumn(i)->getType());
272 DBUG_PRINT("info", ("%s", error_details));
273 ok= false;
274 break;
275 }
276 m_extended= true;
277 m_op_type_pos= i;
278 continue;
279 }
280 if (my_strcasecmp(cs,
281 col_name,
282 NDB_EXCEPTIONS_TABLE_CONFLICT_CAUSE) == 0)
283 {
284 /* Check if ENUM or INT UNSIGNED */
285 if (exceptionsTable->getColumn(i)->getType() != NDBCOL::Char &&
286 exceptionsTable->getColumn(i)->getType() != NDBCOL::Unsigned)
287 {
288 my_snprintf(error_details, error_details_len,
289 "Table %s has incorrect type %u for NDB$CFT_CAUSE",
290 exceptionsTable->getName(),
291 exceptionsTable->getColumn(i)->getType());
292 DBUG_PRINT("info", ("%s", error_details));
293 ok= false;
294 break;
295 }
296 m_extended= true;
297 m_conflict_cause_pos= i;
298 continue;
299 }
300 if (my_strcasecmp(cs,
301 col_name,
302 NDB_EXCEPTIONS_TABLE_ORIG_TRANSID) == 0)
303 {
304 if (exceptionsTable->getColumn(i)->getType() != NDBCOL::Bigunsigned)
305 {
306 my_snprintf(error_details, error_details_len,
307 "Table %s has incorrect type %u for NDB$ORIG_TRANSID",
308 exceptionsTable->getName(),
309 exceptionsTable->getColumn(i)->getType());
310 DBUG_PRINT("info", ("%s", error_details));
311 ok= false;
312 break;
313 }
314 m_extended= true;
315 m_orig_transid_pos= i;
316 continue;
317 }
318 /*
319 Check for any optional columns from the original table in the extended
320 table. Compare column types of columns with names matching a column in
321 the original table. If a non-primary key column is found we assume that
322 the table is extended.
323 */
324 if (i >= fixed_cols)
325 {
326 int match= -1;
327 int match_k= -1;
328 COLUMN_VERSION column_version= DEFAULT;
329 char col_name_real[FN_HEADLEN];
330 /* Check for old or new column reference */
331 if (has_suffix_ci(col_name,
332 NDB_EXCEPTIONS_TABLE_COLUMN_OLD_SUFFIX,
333 cs,
334 col_name_real))
335 {
336 DBUG_PRINT("info", ("Found reference to old column %s", col_name));
337 column_version= OLD;
338 }
339 else if (has_suffix_ci(col_name,
340 NDB_EXCEPTIONS_TABLE_COLUMN_NEW_SUFFIX,
341 cs,
342 col_name_real))
343 {
344 DBUG_PRINT("info", ("Found reference to new column %s", col_name));
345 column_version= NEW;
346 }
347 DBUG_PRINT("info", ("Checking for original column %s", col_name_real));
348 /*
349 We really need the CHARSET_INFO from when the table was
350 created but NdbDictionary::Table doesn't save this. This
351 means we cannot handle tables end execption tables defined
352 with a charset different than the system charset.
353 */
354 CHARSET_INFO *mcs= system_charset_info;
355 if (! find_column_name_ci(mcs, col_name_real, mainTable, &match, &match_k))
356 {
357 if (! strcmp(col_name, col_name_real))
358 {
359 /*
360 Column did have $OLD or $NEW suffix, but it didn't
361 match. Check if that is the real name of the column.
362 */
363 match_k= -1;
364 if (find_column_name_ci(mcs, col_name, mainTable, &match, &match_k))
365 {
366 DBUG_PRINT("info", ("Column %s in main table %s has an unfortunate name",
367 col_name, mainTable->getName()));
368 }
369 }
370 }
371 /*
372 Check that old or new references are nullable
373 or have a default value.
374 */
375 if (column_version != DEFAULT &&
376 match_k != -1)
377 {
378 if ((! col->getNullable()) &&
379 col->getDefaultValue() == NULL)
380 {
381 my_snprintf(error_details, error_details_len,
382 "Old or new column reference %s in table %s is not nullable and doesn't have a default value",
383 col->getName(), exceptionsTable->getName());
384 DBUG_PRINT("info", ("%s", error_details));
385 ok= false;
386 break;
387 }
388 }
389
390 if (match == -1)
391 {
392 /*
393 Column do not have the same name, could be allowed
394 if column is nullable or has a default value,
395 continue checking, but give a warning to user
396 */
397 if ((! col->getNullable()) &&
398 col->getDefaultValue() == NULL)
399 {
400 my_snprintf(error_details, error_details_len,
401 "Extra column %s in table %s is not nullable and doesn't have a default value",
402 col->getName(), exceptionsTable->getName());
403 DBUG_PRINT("info", ("%s", error_details));
404 ok= false;
405 break;
406 }
407 my_snprintf(error_details, error_details_len,
408 "Column %s in extension table %s not found in %s",
409 col->getName(), exceptionsTable->getName(),
410 mainTable->getName());
411 DBUG_PRINT("info", ("%s", error_details));
412 my_snprintf(msg_buf, msg_buf_len,
413 "NDB Slave: exceptions table %s has suspicious "
414 "definition ((column %d): %s",
415 ex_tab_name, fixed_cols + k, error_details);
416 continue;
417 }
418 /* We have a matching name */
419 const NdbDictionary::Column* mcol= mainTable->getColumn(match);
420 if (col->getType() == mcol->getType())
421 {
422 DBUG_PRINT("info", ("Comparing column %s in exceptions table with column %s in main table", col->getName(), mcol->getName()));
423 /* We have matching type */
424 if (!mcol->getPrimaryKey())
425 {
426 /*
427 Matching non-key column found.
428 Check that column is nullable
429 or has a default value.
430 */
431 if (col->getNullable() ||
432 col->getDefaultValue() != NULL)
433 {
434 DBUG_PRINT("info", ("Mapping column %s %s(%i) to %s(%i)",
435 col->getName(),
436 mainTable->getName(), match,
437 exceptionsTable->getName(), i));
438 /* Save position */
439 m_data_pos[i]= match;
440 m_column_version[i]= column_version;
441 }
442 else
443 {
444 my_snprintf(error_details, error_details_len,
445 "Data column %s in table %s is not nullable and doesn't have a default value",
446 col->getName(), exceptionsTable->getName());
447 DBUG_PRINT("info", ("%s", error_details));
448 ok= false;
449 break;
450 }
451 }
452 else
453 {
454 /* Column is part of the primary key */
455 if (column_version != DEFAULT)
456 {
457 my_snprintf(error_details, error_details_len,
458 "Old or new values of primary key columns cannot be referenced since primary keys cannot be updated, column %s in table %s",
459 col->getName(), exceptionsTable->getName());
460 DBUG_PRINT("info", ("%s", error_details));
461 ok= false;
462 break;
463 }
464 if (col->getNullable() == mcol->getNullable())
465 {
466 /*
467 Columns are both nullable or not nullable.
468 Save position.
469 */
470 if (m_key_data_pos[match_k] != -1)
471 {
472 my_snprintf(error_details, error_details_len,
473 "Multiple references to the same key column %s in table %s",
474 col->getName(), exceptionsTable->getName());
475 DBUG_PRINT("info", ("%s", error_details));
476 ok= false;
477 break;
478 }
479 DBUG_PRINT("info", ("Setting m_key_data_pos[%i]= %i", match_k, i));
480 m_key_data_pos[match_k]= i;
481
482 if (i == fixed_cols + match_k)
483 {
484 /* Found key column in correct position */
485 if (!m_extended)
486 continue;
487 }
488 /*
489 Store mapping of Exception table key# to
490 orig table attrid
491 */
492 DBUG_PRINT("info", ("%u: Setting m_key_attrids[%i]= %i", __LINE__, match_k, match));
493 m_key_attrids[match_k]= match;
494 m_extended= true;
495 }
496 else if (column_version == DEFAULT)
497 {
498 /*
499 Columns have same name and same type
500 Column with this name is part of primary key,
501 but both columns are not declared not null
502 */
503 my_snprintf(error_details, error_details_len,
504 "Pk column %s not declared not null in both tables",
505 col->getName());
506 DBUG_PRINT("info", ("%s", error_details));
507 ok= false;
508 break;
509 }
510 }
511 }
512 else
513 {
514 /*
515 Columns have same name, but not the same type
516 */
517 my_snprintf(error_details, error_details_len,
518 "Column %s has matching name to column %s for table %s, but wrong type, %u versus %u",
519 col->getName(), mcol->getName(),
520 mainTable->getName(),
521 col->getType(), mcol->getType());
522 DBUG_PRINT("info", ("%s", error_details));
523 ok= false;
524 break;
525 }
526 }
527 }
528
529 DBUG_RETURN(ok);
530 }
531
532 int
init(const NdbDictionary::Table * mainTable,const NdbDictionary::Table * exceptionsTable,char * msg_buf,uint msg_buf_len,const char ** msg)533 ExceptionsTableWriter::init(const NdbDictionary::Table* mainTable,
534 const NdbDictionary::Table* exceptionsTable,
535 char* msg_buf,
536 uint msg_buf_len,
537 const char** msg)
538 {
539 DBUG_ENTER("ExceptionsTableWriter::init");
540 const char* ex_tab_name= exceptionsTable->getName();
541 const int fixed_cols= 4;
542 *msg= NULL;
543 *msg_buf= '\0';
544
545 DBUG_PRINT("info", ("Checking definition of exceptions table %s",
546 ex_tab_name));
547 /*
548 Check that the table have the corrct number of columns
549 and the mandatory columns.
550 */
551
552 bool ok=
553 exceptionsTable->getNoOfColumns() >= fixed_cols &&
554 exceptionsTable->getNoOfPrimaryKeys() == 4 &&
555 check_mandatory_columns(exceptionsTable);
556
557 if (ok)
558 {
559 char error_details[ FN_REFLEN ];
560 uint error_details_len= sizeof(error_details);
561 error_details[0]= '\0';
562 int ncol= mainTable->getNoOfColumns();
563 int nkey= mainTable->getNoOfPrimaryKeys();
564 int xncol= exceptionsTable->getNoOfColumns();
565 int i, k;
566 /* Initialize position arrays */
567 for(k=0; k < nkey; k++)
568 m_key_data_pos[k]= -1;
569 for(i=0; i < xncol; i++)
570 m_data_pos[i]= -1;
571 /* Initialize nullability information */
572 for(i=0; i < ncol; i++)
573 {
574 const NdbDictionary::Column* col= mainTable->getColumn(i);
575 m_col_nullable[i]= col->getNullable();
576 }
577
578 /*
579 Check that the primary key columns in the main table
580 are referenced correctly.
581 Then check if the table is extended with optional
582 columns.
583 */
584 ok=
585 check_pk_columns(mainTable, exceptionsTable, k) &&
586 check_optional_columns(mainTable,
587 exceptionsTable,
588 msg_buf,
589 msg_buf_len,
590 msg,
591 k,
592 error_details,
593 error_details_len);
594 if (ok)
595 {
596 m_ex_tab= exceptionsTable;
597 m_pk_cols= nkey;
598 m_cols= ncol;
599 m_xcols= xncol;
600 if (m_extended && strlen(msg_buf) > 0)
601 *msg= msg_buf;
602 DBUG_RETURN(0);
603 }
604 else
605 my_snprintf(msg_buf, msg_buf_len,
606 "NDB Slave: exceptions table %s has wrong "
607 "definition (column %d): %s",
608 ex_tab_name, fixed_cols + k, error_details);
609 }
610 else
611 my_snprintf(msg_buf, msg_buf_len,
612 "NDB Slave: exceptions table %s has wrong "
613 "definition (initial %d columns)",
614 ex_tab_name, fixed_cols);
615
616 *msg= msg_buf;
617 DBUG_RETURN(-1);
618 }
619
620 void
mem_free(Ndb * ndb)621 ExceptionsTableWriter::mem_free(Ndb* ndb)
622 {
623 if (m_ex_tab)
624 {
625 NdbDictionary::Dictionary* dict = ndb->getDictionary();
626 dict->removeTableGlobal(*m_ex_tab, 0);
627 m_ex_tab= 0;
628 }
629 }
630
631 int
writeRow(NdbTransaction * trans,const NdbRecord * keyRecord,const NdbRecord * dataRecord,uint32 server_id,uint32 master_server_id,uint64 master_epoch,const uchar * oldRowPtr,const uchar * newRowPtr,enum_conflicting_op_type op_type,enum_conflict_cause conflict_cause,uint64 orig_transid,const MY_BITMAP * write_set,NdbError & err)632 ExceptionsTableWriter::writeRow(NdbTransaction* trans,
633 const NdbRecord* keyRecord,
634 const NdbRecord* dataRecord,
635 uint32 server_id,
636 uint32 master_server_id,
637 uint64 master_epoch,
638 const uchar* oldRowPtr,
639 const uchar* newRowPtr,
640 enum_conflicting_op_type op_type,
641 enum_conflict_cause conflict_cause,
642 uint64 orig_transid,
643 const MY_BITMAP *write_set,
644 NdbError& err)
645 {
646 DBUG_ENTER("ExceptionsTableWriter::writeRow");
647 DBUG_PRINT("info", ("op_type(pos):%u(%u), conflict_cause(pos):%u(%u), orig_transid:%llu(%u)",
648 op_type, m_op_type_pos,
649 conflict_cause, m_conflict_cause_pos,
650 orig_transid, m_orig_transid_pos));
651 assert(write_set != NULL);
652 assert(err.code == 0);
653 const uchar* rowPtr= (op_type == DELETE_ROW)? oldRowPtr : newRowPtr;
654
655 do
656 {
657 /* Have exceptions table, add row to it */
658 const NDBTAB *ex_tab= m_ex_tab;
659
660 /* get insert op */
661 NdbOperation *ex_op= trans->getNdbOperation(ex_tab);
662 if (ex_op == NULL)
663 {
664 err= trans->getNdbError();
665 break;
666 }
667 if (ex_op->insertTuple() == -1)
668 {
669 err= ex_op->getNdbError();
670 break;
671 }
672 {
673 uint32 count= (uint32)++m_count;
674 /* Set mandatory columns */
675 if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
676 ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
677 ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
678 ex_op->setValue((Uint32)3, (const char *)&(count)))
679 {
680 err= ex_op->getNdbError();
681 break;
682 }
683 /* Set optional columns */
684 if (m_extended)
685 {
686 if (m_op_type_pos)
687 {
688 if (m_ex_tab->getColumn(m_op_type_pos)->getType()
689 == NDBCOL::Char)
690 {
691 /* Defined as ENUM */
692 char op_type_val= (char)op_type;
693 if (ex_op->setValue((Uint32)m_op_type_pos,
694 (const char *)&(op_type_val)))
695 {
696 err= ex_op->getNdbError();
697 break;
698 }
699 }
700 else
701 {
702 uint32 op_type_val= op_type;
703 if (ex_op->setValue((Uint32)m_op_type_pos,
704 (const char *)&(op_type_val)))
705 {
706 err= ex_op->getNdbError();
707 break;
708 }
709 }
710 }
711 if (m_conflict_cause_pos)
712 {
713 if (m_ex_tab->getColumn(m_conflict_cause_pos)->getType()
714 == NDBCOL::Char)
715 {
716 /* Defined as ENUM */
717 char conflict_cause_val= (char)conflict_cause;
718 if (ex_op->setValue((Uint32)m_conflict_cause_pos,
719 (const char *)&(conflict_cause_val)))
720 {
721 err= ex_op->getNdbError();
722 break;
723 }
724 }
725 else
726 {
727 uint32 conflict_cause_val= conflict_cause;
728 if (ex_op->setValue((Uint32)m_conflict_cause_pos,
729 (const char *)&(conflict_cause_val)))
730 {
731 err= ex_op->getNdbError();
732 break;
733 }
734 }
735 }
736 if (m_orig_transid_pos != 0)
737 {
738 const NdbDictionary::Column* col= m_ex_tab->getColumn(m_orig_transid_pos);
739 if (orig_transid == Ndb_binlog_extra_row_info::InvalidTransactionId
740 &&
741 col->getNullable())
742 {
743 if (ex_op->setValue((Uint32) m_orig_transid_pos, (char*)NULL))
744 {
745 err= ex_op->getNdbError();
746 break;
747 }
748 }
749 else
750 {
751 DBUG_PRINT("info", ("Setting orig_transid (%u) for table %s", m_orig_transid_pos, ex_tab->getName()));
752 uint64 orig_transid_val= orig_transid;
753 if (ex_op->setValue((Uint32)m_orig_transid_pos,
754 (const char *)&(orig_transid_val)))
755 {
756 err= ex_op->getNdbError();
757 break;
758 }
759 }
760 }
761 }
762 }
763 /* copy primary keys */
764 {
765 int nkey= m_pk_cols;
766 int k;
767 for (k= 0; k < nkey; k++)
768 {
769 assert(rowPtr != NULL);
770 if (m_key_data_pos[k] != -1)
771 {
772 const uchar* data=
773 (const uchar*) NdbDictionary::getValuePtr(keyRecord,
774 (const char*) rowPtr,
775 m_key_attrids[k]);
776 if (ex_op->setValue((Uint32) m_key_data_pos[k], (const char*)data) == -1)
777 {
778 err= ex_op->getNdbError();
779 break;
780 }
781 }
782 }
783 }
784 /* Copy additional data */
785 if (m_extended)
786 {
787 int xncol= m_xcols;
788 int i;
789 for (i= 0; i < xncol; i++)
790 {
791 const NdbDictionary::Column* col= m_ex_tab->getColumn(i);
792 const uchar* default_value= (const uchar*) col->getDefaultValue();
793 DBUG_PRINT("info", ("Checking column %s(%i)%s", col->getName(), i,
794 (default_value)?", has default value":""));
795 assert(rowPtr != NULL);
796 if (m_data_pos[i] != -1)
797 {
798 const uchar* row_vPtr= NULL;
799 switch (m_column_version[i]) {
800 case DEFAULT:
801 row_vPtr= rowPtr;
802 break;
803 case OLD:
804 if (op_type != WRITE_ROW)
805 row_vPtr= oldRowPtr;
806 break;
807 case NEW:
808 if (op_type != DELETE_ROW)
809 row_vPtr= newRowPtr;
810 }
811 if (row_vPtr == NULL ||
812 (m_col_nullable[m_data_pos[i]] &&
813 NdbDictionary::isNull(dataRecord,
814 (const char*) row_vPtr,
815 m_data_pos[i])))
816 {
817 DBUG_PRINT("info", ("Column %s is set to NULL because it is NULL", col->getName()));
818 if (ex_op->setValue((Uint32) i, (char*)NULL))
819 {
820 err= ex_op->getNdbError();
821 break;
822 }
823 }
824 else if (write_set != NULL && bitmap_is_set(write_set, m_data_pos[i]))
825 {
826 DBUG_PRINT("info", ("Column %s is set", col->getName()));
827 const uchar* data=
828 (const uchar*) NdbDictionary::getValuePtr(dataRecord,
829 (const char*) row_vPtr,
830 m_data_pos[i]);
831 if (ex_op->setValue((Uint32) i, (const char*)data) == -1)
832 {
833 err= ex_op->getNdbError();
834 break;
835 }
836 }
837 else if (default_value != NULL)
838 {
839 DBUG_PRINT("info", ("Column %s is not set to NULL because it has a default value", col->getName()));
840 /*
841 * Column has a default value
842 * Since no value was set in write_set
843 * we let the default value be set from
844 * Ndb instead.
845 */
846 }
847 else
848 {
849 DBUG_PRINT("info", ("Column %s is set to NULL because it not in write_set", col->getName()));
850 if (ex_op->setValue((Uint32) i, (char*)NULL))
851 {
852 err= ex_op->getNdbError();
853 break;
854 }
855 }
856 }
857 }
858 }
859 } while (0);
860
861 if (err.code != 0)
862 {
863 if (err.classification == NdbError::SchemaError)
864 {
865 /*
866 * Something up with Exceptions table schema, forget it.
867 * No further exceptions will be recorded.
868 * Caller will log this and slave will stop.
869 */
870 NdbDictionary::Dictionary* dict= trans->getNdb()->getDictionary();
871 dict->removeTableGlobal(*m_ex_tab, false);
872 m_ex_tab= NULL;
873 DBUG_RETURN(0);
874 }
875 DBUG_RETURN(-1);
876 }
877 DBUG_RETURN(0);
878 }
879
880 /* HAVE_NDB_BINLOG */
881 #endif
882
883 /**
884 st_ndb_slave_state constructor
885
886 Initialise Ndb Slave state object
887 */
st_ndb_slave_state()888 st_ndb_slave_state::st_ndb_slave_state()
889 : current_delete_delete_count(0),
890 current_reflect_op_prepare_count(0),
891 current_reflect_op_discard_count(0),
892 current_refresh_op_count(0),
893 current_master_server_epoch(0),
894 current_master_server_epoch_committed(false),
895 current_max_rep_epoch(0),
896 conflict_flags(0),
897 retry_trans_count(0),
898 current_trans_row_conflict_count(0),
899 current_trans_row_reject_count(0),
900 current_trans_in_conflict_count(0),
901 last_conflicted_epoch(0),
902 last_stable_epoch(0),
903 total_delete_delete_count(0),
904 total_reflect_op_prepare_count(0),
905 total_reflect_op_discard_count(0),
906 total_refresh_op_count(0),
907 max_rep_epoch(0),
908 sql_run_id(~Uint32(0)),
909 trans_row_conflict_count(0),
910 trans_row_reject_count(0),
911 trans_detect_iter_count(0),
912 trans_in_conflict_count(0),
913 trans_conflict_commit_count(0),
914 trans_conflict_apply_state(SAS_NORMAL),
915 trans_dependency_tracker(NULL)
916 {
917 memset(current_violation_count, 0, sizeof(current_violation_count));
918 memset(total_violation_count, 0, sizeof(total_violation_count));
919
920 /* Init conflict handling state memroot */
921 const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
922 init_alloc_root(PSI_INSTRUMENT_ME,
923 &conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
924 }
925
~st_ndb_slave_state()926 st_ndb_slave_state::~st_ndb_slave_state()
927 {
928 free_root(&conflict_mem_root, 0);
929 }
930
931 /**
932 resetPerAttemptCounters
933
934 Reset the per-epoch-transaction-application-attempt counters
935 */
936 void
resetPerAttemptCounters()937 st_ndb_slave_state::resetPerAttemptCounters()
938 {
939 memset(current_violation_count, 0, sizeof(current_violation_count));
940 current_delete_delete_count = 0;
941 current_reflect_op_prepare_count = 0;
942 current_reflect_op_discard_count = 0;
943 current_refresh_op_count = 0;
944 current_trans_row_conflict_count = 0;
945 current_trans_row_reject_count = 0;
946 current_trans_in_conflict_count = 0;
947
948 conflict_flags = 0;
949 current_max_rep_epoch = 0;
950 }
951
952 /**
953 atTransactionAbort()
954
955 Called by Slave SQL thread during transaction abort.
956 */
957 void
atTransactionAbort()958 st_ndb_slave_state::atTransactionAbort()
959 {
960 #ifdef HAVE_NDB_BINLOG
961 /* Reset any gathered transaction dependency information */
962 atEndTransConflictHandling();
963 trans_conflict_apply_state = SAS_NORMAL;
964 #endif
965
966 /* Reset current-transaction counters + state */
967 resetPerAttemptCounters();
968 }
969
970
971
972 /**
973 atTransactionCommit()
974
975 Called by Slave SQL thread after transaction commit
976 */
977 void
atTransactionCommit(Uint64 epoch)978 st_ndb_slave_state::atTransactionCommit(Uint64 epoch)
979 {
980 assert( ((trans_dependency_tracker == NULL) &&
981 (trans_conflict_apply_state == SAS_NORMAL)) ||
982 ((trans_dependency_tracker != NULL) &&
983 (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
984 assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
985
986 /* Merge committed transaction counters into total state
987 * Then reset current transaction counters
988 */
989 Uint32 total_conflicts = 0;
990 for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
991 {
992 total_conflicts+= current_violation_count[i];
993 total_violation_count[i]+= current_violation_count[i];
994 }
995 total_delete_delete_count+= current_delete_delete_count;
996 total_reflect_op_prepare_count+= current_reflect_op_prepare_count;
997 total_reflect_op_discard_count+= current_reflect_op_discard_count;
998 total_refresh_op_count+= current_refresh_op_count;
999 trans_row_conflict_count+= current_trans_row_conflict_count;
1000 trans_row_reject_count+= current_trans_row_reject_count;
1001 trans_in_conflict_count+= current_trans_in_conflict_count;
1002
1003 if (current_trans_in_conflict_count)
1004 trans_conflict_commit_count++;
1005
1006 if (current_max_rep_epoch > max_rep_epoch)
1007 {
1008 DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
1009 max_rep_epoch,
1010 current_max_rep_epoch));
1011 max_rep_epoch = current_max_rep_epoch;
1012 }
1013
1014 {
1015 bool hadConflict = false;
1016 if (total_conflicts > 0)
1017 {
1018 /**
1019 * Conflict detected locally
1020 */
1021 DBUG_PRINT("info", ("Last conflicted epoch increases from %llu to %llu",
1022 last_conflicted_epoch,
1023 epoch));
1024 hadConflict = true;
1025 }
1026 else
1027 {
1028 /**
1029 * Update last_conflicted_epoch if we applied reflected or refresh ops
1030 * (Implies Secondary role in asymmetric algorithms)
1031 */
1032 assert(current_reflect_op_prepare_count >= current_reflect_op_discard_count);
1033 Uint32 current_reflect_op_apply_count = current_reflect_op_prepare_count -
1034 current_reflect_op_discard_count;
1035 if (current_reflect_op_apply_count > 0 ||
1036 current_refresh_op_count > 0)
1037 {
1038 DBUG_PRINT("info", ("Reflected (%u) or Refresh (%u) operations applied this "
1039 "epoch, increasing last conflicted epoch from %llu to %llu.",
1040 current_reflect_op_apply_count,
1041 current_refresh_op_count,
1042 last_conflicted_epoch,
1043 epoch));
1044 hadConflict = true;
1045 }
1046 }
1047
1048 /* Update status vars */
1049 if (hadConflict)
1050 {
1051 last_conflicted_epoch = epoch;
1052 }
1053 else
1054 {
1055 if (max_rep_epoch >= last_conflicted_epoch)
1056 {
1057 /**
1058 * This epoch which has looped the circle was stable -
1059 * no new conflicts have been found / corrected since
1060 * it was logged
1061 */
1062 last_stable_epoch = max_rep_epoch;
1063
1064 /**
1065 * Note that max_rep_epoch >= last_conflicted_epoch
1066 * implies that there are no currently known-about
1067 * conflicts.
1068 * On the primary this is a definitive fact as it
1069 * finds out about all conflicts immediately.
1070 * On the secondary it does not mean that there
1071 * are not committed conflicts, just that they
1072 * have not started being corrected yet.
1073 */
1074 }
1075 }
1076 }
1077
1078 resetPerAttemptCounters();
1079
1080 /* Clear per-epoch-transaction retry_trans_count */
1081 retry_trans_count = 0;
1082
1083 current_master_server_epoch_committed = true;
1084
1085 DBUG_EXECUTE_IF("ndb_slave_fail_marking_epoch_committed",
1086 {
1087 fprintf(stderr,
1088 "Slave clearing epoch committed flag "
1089 "for epoch %llu/%llu (%llu)\n",
1090 current_master_server_epoch >> 32,
1091 current_master_server_epoch & 0xffffffff,
1092 current_master_server_epoch);
1093 current_master_server_epoch_committed = false;
1094 });
1095 }
1096
1097 /**
1098 verifyNextEpoch
1099
1100 Check that a new incoming epoch from the relay log
1101 is expected given the current slave state, previous
1102 epoch etc.
1103 This is checking Generic replication errors, with
1104 a user warning thrown in too.
1105 */
1106 bool
verifyNextEpoch(Uint64 next_epoch,Uint32 master_server_id) const1107 st_ndb_slave_state::verifyNextEpoch(Uint64 next_epoch,
1108 Uint32 master_server_id) const
1109 {
1110 DBUG_ENTER("verifyNextEpoch");
1111 #ifdef HAVE_NDB_BINLOG
1112 /**
1113 WRITE_ROW to ndb_apply_status injected by MySQLD
1114 immediately upstream of us.
1115
1116 Now we do some validation of the incoming epoch transaction's
1117 epoch - to make sure that we are getting a sensible sequence
1118 of epochs.
1119 */
1120 bool first_epoch_since_slave_start = (ndb_mi_get_slave_run_id() != sql_run_id);
1121
1122 DBUG_PRINT("info", ("ndb_apply_status write from upstream master."
1123 "ServerId %u, Epoch %llu/%llu (%llu) "
1124 "Current master server epoch %llu/%llu (%llu)"
1125 "Current master server epoch committed? %u",
1126 master_server_id,
1127 next_epoch >> 32,
1128 next_epoch & 0xffffffff,
1129 next_epoch,
1130 current_master_server_epoch >> 32,
1131 current_master_server_epoch & 0xffffffff,
1132 current_master_server_epoch,
1133 current_master_server_epoch_committed));
1134 DBUG_PRINT("info", ("mi_slave_run_id=%u, ndb_slave_state_run_id=%u",
1135 ndb_mi_get_slave_run_id(), sql_run_id));
1136 DBUG_PRINT("info", ("First epoch since slave start : %u",
1137 first_epoch_since_slave_start));
1138
1139 /* Analysis of nextEpoch generally depends on whether it's the first or not */
1140 if (first_epoch_since_slave_start)
1141 {
1142 /**
1143 First epoch since slave start - might've had a CHANGE MASTER command,
1144 since we were last running, so we are not too strict about epoch
1145 changes, but we will warn.
1146 */
1147 if (next_epoch < current_master_server_epoch)
1148 {
1149 sql_print_warning("NDB Slave : At SQL thread start "
1150 "applying epoch %llu/%llu "
1151 "(%llu) from Master ServerId %u which is lower than previously "
1152 "applied epoch %llu/%llu (%llu). "
1153 "Group Master Log : %s Group Master Log Pos : %llu. "
1154 "Check slave positioning.",
1155 next_epoch >> 32,
1156 next_epoch & 0xffffffff,
1157 next_epoch,
1158 master_server_id,
1159 current_master_server_epoch >> 32,
1160 current_master_server_epoch & 0xffffffff,
1161 current_master_server_epoch,
1162 ndb_mi_get_group_master_log_name(),
1163 ndb_mi_get_group_master_log_pos());
1164 /* Slave not stopped */
1165 }
1166 else if (next_epoch == current_master_server_epoch)
1167 {
1168 /**
1169 Could warn that started on already applied epoch,
1170 but this is often harmless.
1171 */
1172 }
1173 else
1174 {
1175 /* next_epoch > current_master_server_epoch - fine. */
1176 }
1177 }
1178 else
1179 {
1180 /**
1181 ! first_epoch_since_slave_start
1182
1183 Slave has already applied some epoch in this run, so we expect
1184 either :
1185 a) previous epoch committed ok and next epoch is higher
1186 or
1187 b) previous epoch not committed and next epoch is the same
1188 (Retry case)
1189 */
1190 if (next_epoch < current_master_server_epoch)
1191 {
1192 /* Should never happen */
1193 sql_print_error("NDB Slave : SQL thread stopped as "
1194 "applying epoch %llu/%llu "
1195 "(%llu) from Master ServerId %u which is lower than previously "
1196 "applied epoch %llu/%llu (%llu). "
1197 "Group Master Log : %s Group Master Log Pos : %llu",
1198 next_epoch >> 32,
1199 next_epoch & 0xffffffff,
1200 next_epoch,
1201 master_server_id,
1202 current_master_server_epoch >> 32,
1203 current_master_server_epoch & 0xffffffff,
1204 current_master_server_epoch,
1205 ndb_mi_get_group_master_log_name(),
1206 ndb_mi_get_group_master_log_pos());
1207 /* Stop the slave */
1208 DBUG_RETURN(false);
1209 }
1210 else if (next_epoch == current_master_server_epoch)
1211 {
1212 /**
1213 This is ok if we are retrying - e.g. the
1214 last epoch was not committed
1215 */
1216 if (current_master_server_epoch_committed)
1217 {
1218 /* This epoch is committed already, why are we replaying it? */
1219 sql_print_error("NDB Slave : SQL thread stopped as attempted "
1220 "to reapply already committed epoch %llu/%llu (%llu) "
1221 "from server id %u. "
1222 "Group Master Log : %s Group Master Log Pos : %llu.",
1223 current_master_server_epoch >> 32,
1224 current_master_server_epoch & 0xffffffff,
1225 current_master_server_epoch,
1226 master_server_id,
1227 ndb_mi_get_group_master_log_name(),
1228 ndb_mi_get_group_master_log_pos());
1229 /* Stop the slave */
1230 DBUG_RETURN(false);
1231 }
1232 else
1233 {
1234 /* Probably a retry, no problem. */
1235 }
1236 }
1237 else
1238 {
1239 /**
1240 next_epoch > current_master_server_epoch
1241
1242 This is the normal case, *unless* the previous epoch
1243 did not commit - in which case it may be a bug in
1244 transaction retry.
1245 */
1246 if (!current_master_server_epoch_committed)
1247 {
1248 /**
1249 We've moved onto a new epoch without committing
1250 the last - probably a bug in transaction retry
1251 */
1252 sql_print_error("NDB Slave : SQL thread stopped as attempting to "
1253 "apply new epoch %llu/%llu (%llu) while lower "
1254 "received epoch %llu/%llu (%llu) has not been "
1255 "committed. Master server id : %u. "
1256 "Group Master Log : %s Group Master Log Pos : %llu.",
1257 next_epoch >> 32,
1258 next_epoch & 0xffffffff,
1259 next_epoch,
1260 current_master_server_epoch >> 32,
1261 current_master_server_epoch & 0xffffffff,
1262 current_master_server_epoch,
1263 master_server_id,
1264 ndb_mi_get_group_master_log_name(),
1265 ndb_mi_get_group_master_log_pos());
1266 /* Stop the slave */
1267 DBUG_RETURN(false);
1268 }
1269 else
1270 {
1271 /* Normal case of next epoch after committing last */
1272 }
1273 }
1274 }
1275 #endif
1276
1277 /* Epoch looks ok */
1278 DBUG_RETURN(true);
1279 }
1280
1281 /**
1282 atApplyStatusWrite
1283
1284 Called by Slave SQL thread when applying an event to the
1285 ndb_apply_status table
1286 */
1287 int
atApplyStatusWrite(Uint32 master_server_id,Uint32 row_server_id,Uint64 row_epoch,bool is_row_server_id_local)1288 st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
1289 Uint32 row_server_id,
1290 Uint64 row_epoch,
1291 bool is_row_server_id_local)
1292 {
1293 DBUG_ENTER("atApplyStatusWrite");
1294 if (row_server_id == master_server_id)
1295 {
1296 /* This is an apply status write from the immediate master */
1297
1298 if (!verifyNextEpoch(row_epoch,
1299 master_server_id))
1300 {
1301 /* Problem with the next epoch, stop the slave SQL thread */
1302 DBUG_RETURN(HA_ERR_ROWS_EVENT_APPLY);
1303 }
1304
1305 /* Epoch ok, record that we're working on it now... */
1306
1307 current_master_server_epoch = row_epoch;
1308 current_master_server_epoch_committed = false;
1309 assert(! is_row_server_id_local);
1310 }
1311 else if (is_row_server_id_local)
1312 {
1313 DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
1314 " which is %s.",
1315 row_server_id, row_epoch,
1316 (row_epoch > current_max_rep_epoch)?
1317 " new highest." : " older than previously applied"));
1318 if (row_epoch > current_max_rep_epoch)
1319 {
1320 /*
1321 Store new highest epoch in thdvar. If we commit successfully
1322 then this can become the new global max
1323 */
1324 current_max_rep_epoch = row_epoch;
1325 }
1326 }
1327 DBUG_RETURN(0);
1328 }
1329
1330 /**
1331 atResetSlave()
1332
1333 Called when RESET SLAVE command issued - in context of command client.
1334 */
1335 void
atResetSlave()1336 st_ndb_slave_state::atResetSlave()
1337 {
1338 /* Reset the Maximum replicated epoch vars
1339 * on slave reset
1340 * No need to touch the sql_run_id as that
1341 * will increment if the slave is started
1342 * again.
1343 */
1344 resetPerAttemptCounters();
1345
1346 retry_trans_count = 0;
1347 max_rep_epoch = 0;
1348 last_conflicted_epoch = 0;
1349 last_stable_epoch = 0;
1350
1351 /* Reset current master server epoch
1352 * This avoids warnings when replaying a lower
1353 * epoch number after a RESET SLAVE - in this
1354 * case we assume the user knows best.
1355 */
1356 current_master_server_epoch = 0;
1357 current_master_server_epoch_committed = false;
1358 }
1359
1360
1361 /**
1362 atStartSlave()
1363
1364 Called by Slave SQL thread when first applying a row to Ndb after
1365 a START SLAVE command.
1366 */
1367 void
atStartSlave()1368 st_ndb_slave_state::atStartSlave()
1369 {
1370 #ifdef HAVE_NDB_BINLOG
1371 if (trans_conflict_apply_state != SAS_NORMAL)
1372 {
1373 /*
1374 Remove conflict handling state on a SQL thread
1375 restart
1376 */
1377 atEndTransConflictHandling();
1378 trans_conflict_apply_state = SAS_NORMAL;
1379 }
1380 #endif
1381 }
1382
1383 bool
checkSlaveConflictRoleChange(enum_slave_conflict_role old_role,enum_slave_conflict_role new_role,const char ** failure_cause)1384 st_ndb_slave_state::checkSlaveConflictRoleChange(enum_slave_conflict_role old_role,
1385 enum_slave_conflict_role new_role,
1386 const char** failure_cause)
1387 {
1388 if (old_role == new_role)
1389 return true;
1390
1391 /**
1392 * Initial role is SCR_NONE
1393 * Allowed transitions :
1394 * SCR_NONE -> SCR_PASS
1395 * SCR_NONE -> SCR_PRIMARY
1396 * SCR_NONE -> SCR_SECONDARY
1397 * SCR_PRIMARY -> SCR_NONE
1398 * SCR_PRIMARY -> SCR_SECONDARY
1399 * SCR_SECONDARY -> SCR_NONE
1400 * SCR_SECONDARY -> SCR_PRIMARY
1401 * SCR_PASS -> SCR_NONE
1402 *
1403 * Disallowed transitions
1404 * SCR_PASS -> SCR_PRIMARY
1405 * SCR_PASS -> SCR_SECONDARY
1406 * SCR_PRIMARY -> SCR_PASS
1407 * SCR_SECONDARY -> SCR_PASS
1408 */
1409 bool bad_transition = false;
1410 *failure_cause = "Internal error";
1411
1412 switch (old_role)
1413 {
1414 case SCR_NONE:
1415 break;
1416 case SCR_PRIMARY:
1417 case SCR_SECONDARY:
1418 bad_transition = (new_role == SCR_PASS);
1419 break;
1420 case SCR_PASS:
1421 bad_transition = ((new_role == SCR_PRIMARY) ||
1422 (new_role == SCR_SECONDARY));
1423 break;
1424 default:
1425 assert(false);
1426 return false;
1427 }
1428
1429 if (bad_transition)
1430 {
1431 *failure_cause = "Invalid role change.";
1432 return false;
1433 }
1434
1435 #ifdef HAVE_NDB_BINLOG
1436 /* Check that Slave SQL thread is not running */
1437 if (ndb_mi_get_slave_sql_running())
1438 {
1439 *failure_cause = "Cannot change role while Slave SQL "
1440 "thread is running. Use STOP SLAVE first.";
1441 return false;
1442 }
1443 #endif
1444
1445 return true;
1446 }
1447
1448
1449 #ifdef HAVE_NDB_BINLOG
1450
1451 /**
1452 atEndTransConflictHandling
1453
1454 Called when transactional conflict handling has completed.
1455 */
1456 void
atEndTransConflictHandling()1457 st_ndb_slave_state::atEndTransConflictHandling()
1458 {
1459 DBUG_ENTER("atEndTransConflictHandling");
1460 /* Release any conflict handling state */
1461 if (trans_dependency_tracker)
1462 {
1463 current_trans_in_conflict_count =
1464 trans_dependency_tracker->get_conflict_count();
1465 trans_dependency_tracker = NULL;
1466 free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
1467 }
1468 DBUG_VOID_RETURN;
1469 }
1470
1471 /**
1472 atBeginTransConflictHandling()
1473
1474 Called by Slave SQL thread when it determines that Transactional
1475 Conflict handling is required
1476 */
1477 void
atBeginTransConflictHandling()1478 st_ndb_slave_state::atBeginTransConflictHandling()
1479 {
1480 DBUG_ENTER("atBeginTransConflictHandling");
1481 /*
1482 Allocate and initialise Transactional Conflict
1483 Resolution Handling Structures
1484 */
1485 assert(trans_dependency_tracker == NULL);
1486 trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
1487 DBUG_VOID_RETURN;
1488 }
1489
1490 /**
1491 atPrepareConflictDetection
1492
1493 Called by Slave SQL thread prior to defining an operation on
1494 a table with conflict detection defined.
1495 */
1496 int
atPrepareConflictDetection(const NdbDictionary::Table * table,const NdbRecord * key_rec,const uchar * row_data,Uint64 transaction_id,bool & handle_conflict_now)1497 st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
1498 const NdbRecord* key_rec,
1499 const uchar* row_data,
1500 Uint64 transaction_id,
1501 bool& handle_conflict_now)
1502 {
1503 DBUG_ENTER("atPrepareConflictDetection");
1504 /*
1505 Slave is preparing to apply an operation with conflict detection.
1506 If we're performing Transactional Conflict Resolution, take
1507 extra steps
1508 */
1509 switch( trans_conflict_apply_state )
1510 {
1511 case SAS_NORMAL:
1512 DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
1513 /* No special handling */
1514 break;
1515 case SAS_TRACK_TRANS_DEPENDENCIES:
1516 {
1517 DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
1518 /*
1519 Track this operation and its transaction id, to determine
1520 inter-transaction dependencies by {table, primary key}
1521 */
1522 assert( trans_dependency_tracker );
1523
1524 int res = trans_dependency_tracker
1525 ->track_operation(table,
1526 key_rec,
1527 row_data,
1528 transaction_id);
1529 if (res != 0)
1530 {
1531 sql_print_error("%s", trans_dependency_tracker->get_error_text());
1532 DBUG_RETURN(res);
1533 }
1534 /* Proceed as normal */
1535 break;
1536 }
1537 case SAS_APPLY_TRANS_DEPENDENCIES:
1538 {
1539 DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
1540 /*
1541 Check if this operation's transaction id is marked in-conflict.
1542 If it is, we tell the caller to perform conflict resolution now instead
1543 of attempting to apply the operation.
1544 */
1545 assert( trans_dependency_tracker );
1546
1547 if (trans_dependency_tracker->in_conflict(transaction_id))
1548 {
1549 DBUG_PRINT("info", ("Event for transaction %llu is conflicting. Handling.",
1550 transaction_id));
1551 current_trans_row_reject_count++;
1552 handle_conflict_now = true;
1553 DBUG_RETURN(0);
1554 }
1555
1556 /*
1557 This transaction is not marked in-conflict, so continue with normal
1558 processing.
1559 Note that normal processing may subsequently detect a conflict which
1560 didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
1561 In this case, we will rollback and repeat the TRACK_DEPENDENCIES
1562 stage.
1563 */
1564 DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
1565 transaction_id));
1566 break;
1567 }
1568 }
1569 DBUG_RETURN(0);
1570 }
1571
1572 /**
1573 atTransConflictDetected
1574
1575 Called by the Slave SQL thread when a conflict is detected on
1576 an executed operation.
1577 */
1578 int
atTransConflictDetected(Uint64 transaction_id)1579 st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
1580 {
1581 DBUG_ENTER("atTransConflictDetected");
1582
1583 /*
1584 The Slave has detected a conflict on an operation applied
1585 to a table with Transactional Conflict Resolution defined.
1586 Handle according to current state.
1587 */
1588 conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
1589 current_trans_row_conflict_count++;
1590
1591 switch (trans_conflict_apply_state)
1592 {
1593 case SAS_NORMAL:
1594 {
1595 DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
1596 "Requires multi-pass resolution. Will transition to "
1597 "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
1598 /*
1599 Conflict on table with transactional conflict resolution
1600 defined.
1601 This is the trigger that we will do transactional conflict
1602 resolution.
1603 Record that we need to do multiple passes to correctly
1604 perform resolution.
1605 TODO : Early exit from applying epoch?
1606 */
1607 break;
1608 }
1609 case SAS_TRACK_TRANS_DEPENDENCIES:
1610 {
1611 DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
1612 "had conflict",
1613 transaction_id));
1614 /*
1615 Conflict on table with transactional conflict resolution
1616 defined.
1617 We will mark the operation's transaction_id as in-conflict,
1618 so that any other operations on the transaction are also
1619 considered in-conflict, and any dependent transactions are also
1620 considered in-conflict.
1621 */
1622 assert(trans_dependency_tracker != NULL);
1623 int res = trans_dependency_tracker
1624 ->mark_conflict(transaction_id);
1625
1626 if (res != 0)
1627 {
1628 sql_print_error("%s", trans_dependency_tracker->get_error_text());
1629 DBUG_RETURN(res);
1630 }
1631 break;
1632 }
1633 case SAS_APPLY_TRANS_DEPENDENCIES:
1634 {
1635 /*
1636 This must be a new conflict, not noticed on the previous
1637 pass.
1638 */
1639 DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected. "
1640 "Must be further conflict. Will return to "
1641 "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
1642 // TODO : Early exit from applying epoch
1643 break;
1644 }
1645 default:
1646 break;
1647 }
1648
1649 DBUG_RETURN(0);
1650 }
1651
1652 /**
1653 atConflictPreCommit
1654
1655 Called by the Slave SQL thread prior to committing a Slave transaction.
1656 This method can request that the Slave transaction is retried.
1657
1658
1659 State transitions :
1660
1661 START SLAVE /
1662 RESET SLAVE /
1663 STARTUP
1664 |
1665 |
1666 v
1667 ****************
1668 * SAS_NORMAL *
1669 ****************
1670 ^ |
1671 No transactional | | Conflict on transactional table
1672 conflicts | | (Rollback)
1673 (Commit) | |
1674 | v
1675 **********************************
1676 * SAS_TRACK_TRANS_DEPENDENCIES *
1677 **********************************
1678 ^ I ^
1679 More I I Dependencies |
1680 conflicts I I determined | No new conflicts
1681 found I I (Rollback) | (Commit)
1682 (Rollback) I I |
1683 I v |
1684 **********************************
1685 * SAS_APPLY_TRANS_DEPENDENCIES *
1686 **********************************
1687
1688
1689 Operation
1690 The initial state is SAS_NORMAL.
1691
1692 On detecting a conflict on a transactional conflict detetecing table,
1693 SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
1694 rolled back and reapplied.
1695
1696 In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
1697 conflicts are tracked as the epoch transaction is applied.
1698
1699 Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
1700 the epoch transaction is rolled back and reapplied.
1701
1702 In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
1703 marked as in-conflict are not applied.
1704
1705 If this results in no new conflicts, the epoch transaction is committed,
1706 and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
1707 the next replicated epch transaction.
1708 If it results in new conflicts, the epoch transactions is rolled back, and
1709 the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
1710 the new set of dependencies.
1711
1712 If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
1713 the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
1714 state.
1715
1716
1717 Properties
1718 1) Normally, there is no transaction dependency tracking overhead paid by
1719 the slave.
1720
1721 2) On first detecting a transactional conflict, the epoch transaction must be
1722 applied at least three times, with two rollbacks.
1723
1724 3) Transactional conflicts detected in subsequent epochs require the epoch
1725 transaction to be applied two times, with one rollback.
1726
1727 4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
1728 DEPENDENCIES occurs when further transactional conflicts are discovered
1729 in SAS_APPLY_TRANS_DEPENDENCIES state. This implies that the conflicts
1730 discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
1731 so we revisit that state to get a more complete picture.
1732
1733 5) The number of iterations of this loop is fixed to a hard coded limit, after
1734 which the Slave will stop with an error. This should be an unlikely
1735 occurrence, as it requires not just n conflicts, but at least 1 new conflict
1736 appearing between the transactions in the epoch transaction and the
1737 database between the two states, n times in a row.
1738
1739 6) Where conflicts are occasional, as expected, the post-commit transition to
1740 SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
1741 transaction having its transaction dependencies needlessly tracked.
1742
1743 */
1744 int
atConflictPreCommit(bool & retry_slave_trans)1745 st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
1746 {
1747 DBUG_ENTER("atConflictPreCommit");
1748
1749 /*
1750 Prior to committing a Slave transaction, we check whether
1751 Transactional conflicts have been detected which require
1752 us to retry the slave transaction
1753 */
1754 retry_slave_trans = false;
1755 switch(trans_conflict_apply_state)
1756 {
1757 case SAS_NORMAL:
1758 {
1759 DBUG_PRINT("info", ("SAS_NORMAL"));
1760 /*
1761 Normal case. Only if we defined conflict detection on a table
1762 with transactional conflict detection, and saw conflicts (on any table)
1763 do we go to another state
1764 */
1765 if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
1766 {
1767 DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
1768 "SAS_TRACK_TRANS_DEPENDENCIES."));
1769 assert(conflict_flags & SCS_OPS_DEFINED);
1770 /* Transactional conflict resolution required, switch state */
1771 atBeginTransConflictHandling();
1772 resetPerAttemptCounters();
1773 trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
1774 retry_slave_trans = true;
1775 }
1776 break;
1777 }
1778 case SAS_TRACK_TRANS_DEPENDENCIES:
1779 {
1780 DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
1781
1782 if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
1783 {
1784 /*
1785 Conflict on table with transactional detection
1786 this pass, we have collected the details and
1787 dependencies, now transition to
1788 SAS_APPLY_TRANS_DEPENDENCIES and
1789 reapply the epoch transaction without the
1790 conflicting transactions.
1791 */
1792 assert(conflict_flags & SCS_OPS_DEFINED);
1793 DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
1794 "SAS_APPLY_TRANS_DEPENDENCIES"));
1795
1796 trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
1797 trans_detect_iter_count++;
1798 retry_slave_trans = true;
1799 break;
1800 }
1801 else
1802 {
1803 /*
1804 No transactional conflicts detected this pass, lets
1805 return to SAS_NORMAL state after commit for more efficient
1806 application of epoch transactions
1807 */
1808 DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
1809 "SAS_NORMAL"));
1810 atEndTransConflictHandling();
1811 trans_conflict_apply_state = SAS_NORMAL;
1812 break;
1813 }
1814 }
1815 case SAS_APPLY_TRANS_DEPENDENCIES:
1816 {
1817 DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
1818 assert(conflict_flags & SCS_OPS_DEFINED);
1819 /*
1820 We've applied the Slave epoch transaction subject to the
1821 conflict detection. If any further transactional
1822 conflicts have been observed, then we must repeat the
1823 process.
1824 */
1825 atEndTransConflictHandling();
1826 atBeginTransConflictHandling();
1827 trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
1828
1829 if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
1830 {
1831 DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
1832 "TRACK_TRANS_DEPENDENCIES pass"));
1833 /*
1834 Further conflict observed when applying, need
1835 to re-determine dependencies
1836 */
1837 resetPerAttemptCounters();
1838 retry_slave_trans = true;
1839 break;
1840 }
1841
1842
1843 DBUG_PRINT("info", ("No further conflicts detected, committing and "
1844 "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
1845 /*
1846 With dependencies taken into account, no further
1847 conflicts detected, can now proceed to commit
1848 */
1849 break;
1850 }
1851 }
1852
1853 /*
1854 Clear conflict flags, to ensure that we detect any new conflicts
1855 */
1856 conflict_flags = 0;
1857
1858 if (retry_slave_trans)
1859 {
1860 DBUG_PRINT("info", ("Requesting transaction restart"));
1861 DBUG_RETURN(1);
1862 }
1863
1864 DBUG_PRINT("info", ("Allowing commit to proceed"));
1865 DBUG_RETURN(0);
1866 }
1867
1868
1869
1870 /**
1871 * Conflict function interpreted programs
1872 */
1873
1874
1875 /**
1876 CFT_NDB_OLD
1877
1878 To perform conflict detection, an interpreted program is used to read
1879 the timestamp stored locally and compare to what was on the master.
1880 If timestamp is not equal, an error for this operation (9998) will be raised,
1881 and new row will not be applied. The error codes for the operations will
1882 be checked on return. For this to work is is vital that the operation
1883 is run with ignore error option.
1884
1885 As an independent feature, phase 2 also saves the
1886 conflicts into the table's exceptions table.
1887 */
1888 static int
row_conflict_fn_old(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)1889 row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
1890 enum_conflicting_op_type op_type,
1891 const NdbRecord* data_record,
1892 const uchar* old_data,
1893 const uchar* new_data,
1894 const MY_BITMAP* bi_cols,
1895 const MY_BITMAP* ai_cols,
1896 NdbInterpretedCode* code)
1897 {
1898 DBUG_ENTER("row_conflict_fn_old");
1899 uint32 resolve_column= cfn_share->m_resolve_column;
1900 uint32 resolve_size= cfn_share->m_resolve_size;
1901 const uchar* field_ptr = (const uchar*)
1902 NdbDictionary::getValuePtr(data_record,
1903 (const char*) old_data,
1904 cfn_share->m_resolve_column);
1905
1906 assert((resolve_size == 4) || (resolve_size == 8));
1907
1908 if (unlikely(!bitmap_is_set(bi_cols, resolve_column)))
1909 {
1910 sql_print_information("NDB Slave: missing data for %s "
1911 "timestamp column %u.",
1912 cfn_share->m_conflict_fn->name,
1913 resolve_column);
1914 DBUG_RETURN(1);
1915 }
1916
1917 const uint label_0= 0;
1918 const Uint32 RegOldValue= 1, RegCurrentValue= 2;
1919 int r;
1920
1921 DBUG_PRINT("info",
1922 ("Adding interpreted filter, existing value must eq event old value"));
1923 /*
1924 * read old value from record
1925 */
1926 union {
1927 uint32 old_value_32;
1928 uint64 old_value_64;
1929 };
1930 {
1931 if (resolve_size == 4)
1932 {
1933 memcpy(&old_value_32, field_ptr, resolve_size);
1934 DBUG_PRINT("info", (" old_value_32: %u", old_value_32));
1935 }
1936 else
1937 {
1938 memcpy(&old_value_64, field_ptr, resolve_size);
1939 DBUG_PRINT("info", (" old_value_64: %llu",
1940 (unsigned long long) old_value_64));
1941 }
1942 }
1943
1944 /*
1945 * Load registers RegOldValue and RegCurrentValue
1946 */
1947 if (resolve_size == 4)
1948 r= code->load_const_u32(RegOldValue, old_value_32);
1949 else
1950 r= code->load_const_u64(RegOldValue, old_value_64);
1951 assert(r == 0);
1952 r= code->read_attr(RegCurrentValue, resolve_column);
1953 assert(r == 0);
1954 /*
1955 * if RegOldValue == RegCurrentValue goto label_0
1956 * else raise error for this row
1957 */
1958 r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
1959 assert(r == 0);
1960 r= code->interpret_exit_nok(error_conflict_fn_violation);
1961 assert(r == 0);
1962 r= code->def_label(label_0);
1963 assert(r == 0);
1964 r= code->interpret_exit_ok();
1965 assert(r == 0);
1966 r= code->finalise();
1967 assert(r == 0);
1968 DBUG_RETURN(r);
1969 }
1970
1971 static int
row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)1972 row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
1973 enum_conflicting_op_type op_type,
1974 const NdbRecord* data_record,
1975 const uchar* old_data,
1976 const uchar* new_data,
1977 const MY_BITMAP* bi_cols,
1978 const MY_BITMAP* ai_cols,
1979 NdbInterpretedCode* code)
1980 {
1981 DBUG_ENTER("row_conflict_fn_max_update_only");
1982 uint32 resolve_column= cfn_share->m_resolve_column;
1983 uint32 resolve_size= cfn_share->m_resolve_size;
1984 const uchar* field_ptr = (const uchar*)
1985 NdbDictionary::getValuePtr(data_record,
1986 (const char*) new_data,
1987 cfn_share->m_resolve_column);
1988
1989 assert((resolve_size == 4) || (resolve_size == 8));
1990
1991 if (unlikely(!bitmap_is_set(ai_cols, resolve_column)))
1992 {
1993 sql_print_information("NDB Slave: missing data for %s "
1994 "timestamp column %u.",
1995 cfn_share->m_conflict_fn->name,
1996 resolve_column);
1997 DBUG_RETURN(1);
1998 }
1999
2000 const uint label_0= 0;
2001 const Uint32 RegNewValue= 1, RegCurrentValue= 2;
2002 int r;
2003
2004 DBUG_PRINT("info",
2005 ("Adding interpreted filter, existing value must be lt event new"));
2006 /*
2007 * read new value from record
2008 */
2009 union {
2010 uint32 new_value_32;
2011 uint64 new_value_64;
2012 };
2013 {
2014 if (resolve_size == 4)
2015 {
2016 memcpy(&new_value_32, field_ptr, resolve_size);
2017 DBUG_PRINT("info", (" new_value_32: %u", new_value_32));
2018 }
2019 else
2020 {
2021 memcpy(&new_value_64, field_ptr, resolve_size);
2022 DBUG_PRINT("info", (" new_value_64: %llu",
2023 (unsigned long long) new_value_64));
2024 }
2025 }
2026 /*
2027 * Load registers RegNewValue and RegCurrentValue
2028 */
2029 if (resolve_size == 4)
2030 r= code->load_const_u32(RegNewValue, new_value_32);
2031 else
2032 r= code->load_const_u64(RegNewValue, new_value_64);
2033 assert(r == 0);
2034 r= code->read_attr(RegCurrentValue, resolve_column);
2035 assert(r == 0);
2036 /*
2037 * if RegNewValue > RegCurrentValue goto label_0
2038 * else raise error for this row
2039 */
2040 r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
2041 assert(r == 0);
2042 r= code->interpret_exit_nok(error_conflict_fn_violation);
2043 assert(r == 0);
2044 r= code->def_label(label_0);
2045 assert(r == 0);
2046 r= code->interpret_exit_ok();
2047 assert(r == 0);
2048 r= code->finalise();
2049 assert(r == 0);
2050 DBUG_RETURN(r);
2051 }
2052
2053 /**
2054 CFT_NDB_MAX
2055
2056 To perform conflict resolution, an interpreted program is used to read
2057 the timestamp stored locally and compare to what is going to be applied.
2058 If timestamp is lower, an error for this operation (9999) will be raised,
2059 and new row will not be applied. The error codes for the operations will
2060 be checked on return. For this to work is is vital that the operation
2061 is run with ignore error option.
2062
2063 Note that for delete, this algorithm reverts to the OLD algorithm.
2064 */
2065 static int
row_conflict_fn_max(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2066 row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
2067 enum_conflicting_op_type op_type,
2068 const NdbRecord* data_record,
2069 const uchar* old_data,
2070 const uchar* new_data,
2071 const MY_BITMAP* bi_cols,
2072 const MY_BITMAP* ai_cols,
2073 NdbInterpretedCode* code)
2074 {
2075 switch(op_type)
2076 {
2077 case WRITE_ROW:
2078 abort();
2079 return 1;
2080 case UPDATE_ROW:
2081 return row_conflict_fn_max_update_only(cfn_share,
2082 op_type,
2083 data_record,
2084 old_data,
2085 new_data,
2086 bi_cols,
2087 ai_cols,
2088 code);
2089 case DELETE_ROW:
2090 /* Can't use max of new image, as there's no new image
2091 * for DELETE
2092 * Use OLD instead
2093 */
2094 return row_conflict_fn_old(cfn_share,
2095 op_type,
2096 data_record,
2097 old_data,
2098 new_data,
2099 bi_cols,
2100 ai_cols,
2101 code);
2102 default:
2103 abort();
2104 return 1;
2105 }
2106 }
2107
2108
2109 /**
2110 CFT_NDB_MAX_DEL_WIN
2111
2112 To perform conflict resolution, an interpreted program is used to read
2113 the timestamp stored locally and compare to what is going to be applied.
2114 If timestamp is lower, an error for this operation (9999) will be raised,
2115 and new row will not be applied. The error codes for the operations will
2116 be checked on return. For this to work is is vital that the operation
2117 is run with ignore error option.
2118
2119 In this variant, replicated DELETEs alway succeed - no filter is added
2120 to them.
2121 */
2122
2123 static int
row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2124 row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
2125 enum_conflicting_op_type op_type,
2126 const NdbRecord* data_record,
2127 const uchar* old_data,
2128 const uchar* new_data,
2129 const MY_BITMAP* bi_cols,
2130 const MY_BITMAP* ai_cols,
2131 NdbInterpretedCode* code)
2132 {
2133 switch(op_type)
2134 {
2135 case WRITE_ROW:
2136 abort();
2137 return 1;
2138 case UPDATE_ROW:
2139 return row_conflict_fn_max_update_only(cfn_share,
2140 op_type,
2141 data_record,
2142 old_data,
2143 new_data,
2144 bi_cols,
2145 ai_cols,
2146 code);
2147 case DELETE_ROW:
2148 /* This variant always lets a received DELETE_ROW
2149 * succeed.
2150 */
2151 return 0;
2152 default:
2153 abort();
2154 return 1;
2155 }
2156 }
2157
2158
2159 /**
2160 CFT_NDB_EPOCH
2161
2162 */
2163
2164 static int
row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2165 row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
2166 enum_conflicting_op_type op_type,
2167 const NdbRecord* data_record,
2168 const uchar* old_data,
2169 const uchar* new_data,
2170 const MY_BITMAP* bi_cols,
2171 const MY_BITMAP* ai_cols,
2172 NdbInterpretedCode* code)
2173 {
2174 DBUG_ENTER("row_conflict_fn_epoch");
2175 switch(op_type)
2176 {
2177 case WRITE_ROW:
2178 abort();
2179 DBUG_RETURN(1);
2180 case UPDATE_ROW:
2181 case DELETE_ROW:
2182 case READ_ROW: /* Read tracking */
2183 {
2184 const uint label_0= 0;
2185 const Uint32
2186 RegAuthor= 1, RegZero= 2,
2187 RegMaxRepEpoch= 1, RegRowEpoch= 2;
2188 int r;
2189
2190 r= code->load_const_u32(RegZero, 0);
2191 assert(r == 0);
2192 r= code->read_attr(RegAuthor, NdbDictionary::Column::ROW_AUTHOR);
2193 assert(r == 0);
2194 /* If last author was not local, assume no conflict */
2195 r= code->branch_ne(RegZero, RegAuthor, label_0);
2196 assert(r == 0);
2197
2198 /*
2199 * Load registers RegMaxRepEpoch and RegRowEpoch
2200 */
2201 r= code->load_const_u64(RegMaxRepEpoch, g_ndb_slave_state.max_rep_epoch);
2202 assert(r == 0);
2203 r= code->read_attr(RegRowEpoch, NdbDictionary::Column::ROW_GCI64);
2204 assert(r == 0);
2205
2206 /*
2207 * if RegRowEpoch <= RegMaxRepEpoch goto label_0
2208 * else raise error for this row
2209 */
2210 r= code->branch_le(RegRowEpoch, RegMaxRepEpoch, label_0);
2211 assert(r == 0);
2212 r= code->interpret_exit_nok(error_conflict_fn_violation);
2213 assert(r == 0);
2214 r= code->def_label(label_0);
2215 assert(r == 0);
2216 r= code->interpret_exit_ok();
2217 assert(r == 0);
2218 r= code->finalise();
2219 assert(r == 0);
2220 DBUG_RETURN(r);
2221 }
2222 default:
2223 abort();
2224 DBUG_RETURN(1);
2225 }
2226 }
2227
2228 /**
2229 * CFT_NDB_EPOCH2
2230 */
2231
2232 static int
row_conflict_fn_epoch2_primary(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2233 row_conflict_fn_epoch2_primary(NDB_CONFLICT_FN_SHARE* cfn_share,
2234 enum_conflicting_op_type op_type,
2235 const NdbRecord* data_record,
2236 const uchar* old_data,
2237 const uchar* new_data,
2238 const MY_BITMAP* bi_cols,
2239 const MY_BITMAP* ai_cols,
2240 NdbInterpretedCode* code)
2241 {
2242 DBUG_ENTER("row_conflict_fn_epoch2_primary");
2243
2244 /* We use the normal NDB$EPOCH detection function */
2245 DBUG_RETURN(row_conflict_fn_epoch(cfn_share,
2246 op_type,
2247 data_record,
2248 old_data,
2249 new_data,
2250 bi_cols,
2251 ai_cols,
2252 code));
2253 }
2254
2255 static int
row_conflict_fn_epoch2_secondary(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2256 row_conflict_fn_epoch2_secondary(NDB_CONFLICT_FN_SHARE* cfn_share,
2257 enum_conflicting_op_type op_type,
2258 const NdbRecord* data_record,
2259 const uchar* old_data,
2260 const uchar* new_data,
2261 const MY_BITMAP* bi_cols,
2262 const MY_BITMAP* ai_cols,
2263 NdbInterpretedCode* code)
2264 {
2265 DBUG_ENTER("row_conflict_fn_epoch2_secondary");
2266
2267 /* Only called for reflected update and delete operations
2268 * on the secondary.
2269 * These are returning operations which should only be
2270 * applied if the row in the database was last written
2271 * remotely (by the Primary)
2272 */
2273
2274 switch(op_type)
2275 {
2276 case WRITE_ROW:
2277 abort();
2278 DBUG_RETURN(1);
2279 case UPDATE_ROW:
2280 case DELETE_ROW:
2281 {
2282 const uint label_0= 0;
2283 const Uint32
2284 RegAuthor= 1, RegZero= 2;
2285 int r;
2286
2287 r= code->load_const_u32(RegZero, 0);
2288 assert(r == 0);
2289 r= code->read_attr(RegAuthor, NdbDictionary::Column::ROW_AUTHOR);
2290 assert(r == 0);
2291 r= code->branch_eq(RegZero, RegAuthor, label_0);
2292 assert(r == 0);
2293 /* Last author was not local, no conflict, apply */
2294 r= code->interpret_exit_ok();
2295 assert(r == 0);
2296 r= code->def_label(label_0);
2297 assert(r == 0);
2298 /* Last author was secondary-local, conflict, do not apply */
2299 r= code->interpret_exit_nok(error_conflict_fn_violation);
2300 assert(r == 0);
2301
2302
2303 r= code->finalise();
2304 assert(r == 0);
2305 DBUG_RETURN(r);
2306 }
2307 default:
2308 abort();
2309 DBUG_RETURN(1);
2310 }
2311 }
2312
2313 static int
row_conflict_fn_epoch2(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2314 row_conflict_fn_epoch2(NDB_CONFLICT_FN_SHARE* cfn_share,
2315 enum_conflicting_op_type op_type,
2316 const NdbRecord* data_record,
2317 const uchar* old_data,
2318 const uchar* new_data,
2319 const MY_BITMAP* bi_cols,
2320 const MY_BITMAP* ai_cols,
2321 NdbInterpretedCode* code)
2322 {
2323 DBUG_ENTER("row_conflict_fn_epoch2");
2324
2325 /**
2326 * NdbEpoch2 behaviour depends on the Slave conflict role variable
2327 *
2328 */
2329 switch(opt_ndb_slave_conflict_role)
2330 {
2331 case SCR_NONE:
2332 /* This is a problem */
2333 DBUG_RETURN(1);
2334 case SCR_PRIMARY:
2335 DBUG_RETURN(row_conflict_fn_epoch2_primary(cfn_share,
2336 op_type,
2337 data_record,
2338 old_data,
2339 new_data,
2340 bi_cols,
2341 ai_cols,
2342 code));
2343 case SCR_SECONDARY:
2344 DBUG_RETURN(row_conflict_fn_epoch2_secondary(cfn_share,
2345 op_type,
2346 data_record,
2347 old_data,
2348 new_data,
2349 bi_cols,
2350 ai_cols,
2351 code));
2352 case SCR_PASS:
2353 /* Do nothing */
2354 DBUG_RETURN(0);
2355
2356 default:
2357 break;
2358 }
2359
2360 abort();
2361
2362 DBUG_RETURN(1);
2363 }
2364
2365 /**
2366 * Conflict function setup infrastructure
2367 */
2368
2369 static const st_conflict_fn_arg_def resolve_col_args[]=
2370 {
2371 /* Arg type Optional */
2372 { CFAT_COLUMN_NAME, false },
2373 { CFAT_END, false }
2374 };
2375
2376 static const st_conflict_fn_arg_def epoch_fn_args[]=
2377 {
2378 /* Arg type Optional */
2379 { CFAT_EXTRA_GCI_BITS, true },
2380 { CFAT_END, false }
2381 };
2382
2383 static const st_conflict_fn_def conflict_fns[]=
2384 {
2385 { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
2386 &resolve_col_args[0], row_conflict_fn_max_del_win, 0 },
2387 { "NDB$MAX", CFT_NDB_MAX,
2388 &resolve_col_args[0], row_conflict_fn_max, 0 },
2389 { "NDB$OLD", CFT_NDB_OLD,
2390 &resolve_col_args[0], row_conflict_fn_old, 0 },
2391 { "NDB$EPOCH2_TRANS", CFT_NDB_EPOCH2_TRANS,
2392 &epoch_fn_args[0], row_conflict_fn_epoch2,
2393 CF_REFLECT_SEC_OPS | CF_USE_ROLE_VAR | CF_TRANSACTIONAL},
2394 { "NDB$EPOCH2", CFT_NDB_EPOCH2,
2395 &epoch_fn_args[0], row_conflict_fn_epoch2,
2396 CF_REFLECT_SEC_OPS | CF_USE_ROLE_VAR
2397 },
2398 { "NDB$EPOCH_TRANS", CFT_NDB_EPOCH_TRANS,
2399 &epoch_fn_args[0], row_conflict_fn_epoch, CF_TRANSACTIONAL},
2400 { "NDB$EPOCH", CFT_NDB_EPOCH,
2401 &epoch_fn_args[0], row_conflict_fn_epoch, 0 }
2402 };
2403
2404 static unsigned n_conflict_fns=
2405 sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
2406
2407
2408 int
parse_conflict_fn_spec(const char * conflict_fn_spec,const st_conflict_fn_def ** conflict_fn,st_conflict_fn_arg * args,Uint32 * max_args,char * msg,uint msg_len)2409 parse_conflict_fn_spec(const char* conflict_fn_spec,
2410 const st_conflict_fn_def** conflict_fn,
2411 st_conflict_fn_arg* args,
2412 Uint32* max_args,
2413 char *msg, uint msg_len)
2414 {
2415 DBUG_ENTER("parse_conflict_fn_spec");
2416
2417 Uint32 no_args = 0;
2418 const char *ptr= conflict_fn_spec;
2419 const char *error_str= "unknown conflict resolution function";
2420 /* remove whitespace */
2421 while (*ptr == ' ' && *ptr != '\0') ptr++;
2422
2423 DBUG_PRINT("info", ("parsing %s", conflict_fn_spec));
2424
2425 for (unsigned i= 0; i < n_conflict_fns; i++)
2426 {
2427 const st_conflict_fn_def &fn= conflict_fns[i];
2428
2429 uint len= (uint)strlen(fn.name);
2430 if (strncmp(ptr, fn.name, len))
2431 continue;
2432
2433 DBUG_PRINT("info", ("found function %s", fn.name));
2434
2435 /* skip function name */
2436 ptr+= len;
2437
2438 /* remove whitespace */
2439 while (*ptr == ' ' && *ptr != '\0') ptr++;
2440
2441 /* next '(' */
2442 if (*ptr != '(')
2443 {
2444 error_str= "missing '('";
2445 DBUG_PRINT("info", ("parse error %s", error_str));
2446 break;
2447 }
2448 ptr++;
2449
2450 /* find all arguments */
2451 for (;;)
2452 {
2453 if (no_args >= *max_args)
2454 {
2455 error_str= "too many arguments";
2456 DBUG_PRINT("info", ("parse error %s", error_str));
2457 break;
2458 }
2459
2460 /* expected type */
2461 enum enum_conflict_fn_arg_type type=
2462 conflict_fns[i].arg_defs[no_args].arg_type;
2463
2464 /* remove whitespace */
2465 while (*ptr == ' ' && *ptr != '\0') ptr++;
2466
2467 if (type == CFAT_END)
2468 {
2469 args[no_args].type= type;
2470 error_str= NULL;
2471 break;
2472 }
2473
2474 /* arg */
2475 /* Todo : Should support comma as an arg separator? */
2476 const char *start_arg= ptr;
2477 while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
2478 const char *end_arg= ptr;
2479
2480 bool optional_arg = conflict_fns[i].arg_defs[no_args].optional;
2481 /* any arg given? */
2482 if (start_arg == end_arg)
2483 {
2484 if (!optional_arg)
2485 {
2486 error_str= "missing function argument";
2487 DBUG_PRINT("info", ("parse error %s", error_str));
2488 break;
2489 }
2490 else
2491 {
2492 /* Arg was optional, and not present
2493 * Must be at end of args, finish parsing
2494 */
2495 args[no_args].type= CFAT_END;
2496 error_str= NULL;
2497 break;
2498 }
2499 }
2500
2501 uint len= (uint)(end_arg - start_arg);
2502 args[no_args].type= type;
2503
2504 DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
2505
2506 bool arg_processing_error = false;
2507 switch (type)
2508 {
2509 case CFAT_COLUMN_NAME:
2510 {
2511 /* Copy column name out into argument's buffer */
2512 char* dest= &args[no_args].resolveColNameBuff[0];
2513
2514 memcpy(dest, start_arg, (len < (uint) NAME_CHAR_LEN ?
2515 len :
2516 NAME_CHAR_LEN));
2517 dest[len]= '\0';
2518 break;
2519 }
2520 case CFAT_EXTRA_GCI_BITS:
2521 {
2522 /* Map string to number and check it's in range etc */
2523 char* end_of_arg = (char*) end_arg;
2524 Uint32 bits = strtoul(start_arg, &end_of_arg, 0);
2525 DBUG_PRINT("info", ("Using %u as the number of extra bits", bits));
2526
2527 if (bits > 31)
2528 {
2529 arg_processing_error= true;
2530 error_str= "Too many extra Gci bits";
2531 DBUG_PRINT("info", ("%s", error_str));
2532 break;
2533 }
2534 /* Num bits seems ok */
2535 args[no_args].extraGciBits = bits;
2536 break;
2537 }
2538 case CFAT_END:
2539 abort();
2540 }
2541
2542 if (arg_processing_error)
2543 break;
2544 no_args++;
2545 }
2546
2547 if (error_str)
2548 break;
2549
2550 /* remove whitespace */
2551 while (*ptr == ' ' && *ptr != '\0') ptr++;
2552
2553 /* next ')' */
2554 if (*ptr != ')')
2555 {
2556 error_str= "missing ')'";
2557 break;
2558 }
2559 ptr++;
2560
2561 /* remove whitespace */
2562 while (*ptr == ' ' && *ptr != '\0') ptr++;
2563
2564 /* garbage in the end? */
2565 if (*ptr != '\0')
2566 {
2567 error_str= "garbage in the end";
2568 break;
2569 }
2570
2571 /* Update ptrs to conflict fn + # of args */
2572 *conflict_fn = &conflict_fns[i];
2573 *max_args = no_args;
2574
2575 DBUG_RETURN(0);
2576 }
2577 /* parse error */
2578 my_snprintf(msg, msg_len, "%s, %s at '%s'",
2579 conflict_fn_spec, error_str, ptr);
2580 DBUG_PRINT("info", ("%s", msg));
2581 DBUG_RETURN(-1);
2582 }
2583
2584 static uint
slave_check_resolve_col_type(const NDBTAB * ndbtab,uint field_index)2585 slave_check_resolve_col_type(const NDBTAB *ndbtab,
2586 uint field_index)
2587 {
2588 DBUG_ENTER("slave_check_resolve_col_type");
2589 const NDBCOL *c= ndbtab->getColumn(field_index);
2590 uint sz= 0;
2591 switch (c->getType())
2592 {
2593 case NDBCOL::Unsigned:
2594 sz= sizeof(Uint32);
2595 DBUG_PRINT("info", ("resolve column Uint32 %u",
2596 field_index));
2597 break;
2598 case NDBCOL::Bigunsigned:
2599 sz= sizeof(Uint64);
2600 DBUG_PRINT("info", ("resolve column Uint64 %u",
2601 field_index));
2602 break;
2603 default:
2604 DBUG_PRINT("info", ("resolve column %u has wrong type",
2605 field_index));
2606 break;
2607 }
2608 DBUG_RETURN(sz);
2609 }
2610
2611 static int
slave_set_resolve_fn(Ndb * ndb,NDB_CONFLICT_FN_SHARE ** ppcfn_share,const char * dbName,const char * tabName,const NDBTAB * ndbtab,uint field_index,uint resolve_col_sz,const st_conflict_fn_def * conflict_fn,uint8 flags)2612 slave_set_resolve_fn(Ndb* ndb,
2613 NDB_CONFLICT_FN_SHARE** ppcfn_share,
2614 const char* dbName,
2615 const char* tabName,
2616 const NDBTAB *ndbtab, uint field_index,
2617 uint resolve_col_sz,
2618 const st_conflict_fn_def* conflict_fn,
2619 uint8 flags)
2620 {
2621 DBUG_ENTER("slave_set_resolve_fn");
2622
2623 NdbDictionary::Dictionary *dict= ndb->getDictionary();
2624 NDB_CONFLICT_FN_SHARE *cfn_share= *ppcfn_share;
2625 const char *ex_suffix= (char *)NDB_EXCEPTIONS_TABLE_SUFFIX;
2626 if (cfn_share == NULL)
2627 {
2628 *ppcfn_share= cfn_share=
2629 (NDB_CONFLICT_FN_SHARE*) my_malloc(PSI_INSTRUMENT_ME,
2630 sizeof(NDB_CONFLICT_FN_SHARE),
2631 MYF(MY_WME | ME_FATALERROR));
2632 slave_reset_conflict_fn(cfn_share);
2633 }
2634 cfn_share->m_conflict_fn= conflict_fn;
2635
2636 /* Calculate resolve col stuff (if relevant) */
2637 cfn_share->m_resolve_size= resolve_col_sz;
2638 cfn_share->m_resolve_column= field_index;
2639 cfn_share->m_flags = flags;
2640
2641 /* Init Exceptions Table Writer */
2642 new (&cfn_share->m_ex_tab_writer) ExceptionsTableWriter();
2643 /* Check for '$EX' or '$ex' suffix in table name */
2644 for (int tries= 2;
2645 tries-- > 0;
2646 ex_suffix=
2647 (tries == 1)
2648 ? (const char *)NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER
2649 : NullS)
2650 {
2651 /* get exceptions table */
2652 char ex_tab_name[FN_REFLEN];
2653 strxnmov(ex_tab_name, sizeof(ex_tab_name), tabName,
2654 ex_suffix, NullS);
2655 ndb->setDatabaseName(dbName);
2656 Ndb_table_guard ndbtab_g(dict, ex_tab_name);
2657 const NDBTAB *ex_tab= ndbtab_g.get_table();
2658 if (ex_tab)
2659 {
2660 char msgBuf[ FN_REFLEN ];
2661 const char* msg = NULL;
2662 if (cfn_share->m_ex_tab_writer.init(ndbtab,
2663 ex_tab,
2664 msgBuf,
2665 sizeof(msgBuf),
2666 &msg) == 0)
2667 {
2668 /* Ok */
2669 /* Hold our table reference outside the table_guard scope */
2670 ndbtab_g.release();
2671
2672 /* Table looked suspicious, warn user */
2673 if (msg)
2674 sql_print_warning("%s", msg);
2675
2676 if (opt_ndb_extra_logging)
2677 {
2678 sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
2679 dbName,
2680 tabName,
2681 dbName,
2682 ex_tab_name);
2683 }
2684 }
2685 else
2686 {
2687 sql_print_warning("%s", msg);
2688 }
2689 break;
2690 } /* if (ex_tab) */
2691 }
2692 DBUG_RETURN(0);
2693 }
2694
2695
2696 bool
is_exceptions_table(const char * table_name)2697 is_exceptions_table(const char *table_name)
2698 {
2699 size_t len = strlen(table_name);
2700 size_t suffixlen = strlen(NDB_EXCEPTIONS_TABLE_SUFFIX);
2701 if(len > suffixlen &&
2702 (strcmp(table_name + len - suffixlen,
2703 lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
2704 NDB_EXCEPTIONS_TABLE_SUFFIX) == 0))
2705 {
2706 return true;
2707 }
2708 return false;
2709 }
2710
2711 int
setup_conflict_fn(Ndb * ndb,NDB_CONFLICT_FN_SHARE ** ppcfn_share,const char * dbName,const char * tabName,bool tableUsesBlobs,bool tableBinlogUseUpdate,const NDBTAB * ndbtab,char * msg,uint msg_len,const st_conflict_fn_def * conflict_fn,const st_conflict_fn_arg * args,const Uint32 num_args)2712 setup_conflict_fn(Ndb* ndb,
2713 NDB_CONFLICT_FN_SHARE** ppcfn_share,
2714 const char* dbName,
2715 const char* tabName,
2716 bool tableUsesBlobs,
2717 bool tableBinlogUseUpdate,
2718 const NDBTAB *ndbtab,
2719 char *msg, uint msg_len,
2720 const st_conflict_fn_def* conflict_fn,
2721 const st_conflict_fn_arg* args,
2722 const Uint32 num_args)
2723 {
2724 DBUG_ENTER("setup_conflict_fn");
2725
2726 if(is_exceptions_table(tabName))
2727 {
2728 my_snprintf(msg, msg_len,
2729 "Ndb Slave: Table %s.%s is exceptions table: not using conflict function %s",
2730 dbName,
2731 tabName,
2732 conflict_fn->name);
2733 DBUG_PRINT("info", ("%s", msg));
2734 DBUG_RETURN(0);
2735 }
2736
2737 /* setup the function */
2738 switch (conflict_fn->type)
2739 {
2740 case CFT_NDB_MAX:
2741 case CFT_NDB_OLD:
2742 case CFT_NDB_MAX_DEL_WIN:
2743 {
2744 if (num_args != 1)
2745 {
2746 my_snprintf(msg, msg_len,
2747 "Incorrect arguments to conflict function");
2748 DBUG_PRINT("info", ("%s", msg));
2749 DBUG_RETURN(-1);
2750 }
2751
2752 /* Now try to find the column in the table */
2753 int colNum = -1;
2754 const char* resolveColName = args[0].resolveColNameBuff;
2755 int resolveColNameLen = (int)strlen(resolveColName);
2756
2757 for (int j=0; j< ndbtab->getNoOfColumns(); j++)
2758 {
2759 const char* colName = ndbtab->getColumn(j)->getName();
2760
2761 if (strncmp(colName,
2762 resolveColName,
2763 resolveColNameLen) == 0 &&
2764 colName[resolveColNameLen] == '\0')
2765 {
2766 colNum = j;
2767 break;
2768 }
2769 }
2770 if (colNum == -1)
2771 {
2772 my_snprintf(msg, msg_len,
2773 "Could not find resolve column %s.",
2774 resolveColName);
2775 DBUG_PRINT("info", ("%s", msg));
2776 DBUG_RETURN(-1);
2777 }
2778
2779 const uint resolve_col_sz= slave_check_resolve_col_type(ndbtab, colNum);
2780 if (resolve_col_sz == 0)
2781 {
2782 /* wrong data type */
2783 slave_reset_conflict_fn(*ppcfn_share);
2784 my_snprintf(msg, msg_len,
2785 "Column '%s' has wrong datatype",
2786 resolveColName);
2787 DBUG_PRINT("info", ("%s", msg));
2788 DBUG_RETURN(-1);
2789 }
2790
2791 if (slave_set_resolve_fn(ndb,
2792 ppcfn_share,
2793 dbName,
2794 tabName,
2795 ndbtab,
2796 colNum, resolve_col_sz,
2797 conflict_fn, CFF_NONE))
2798 {
2799 my_snprintf(msg, msg_len,
2800 "Unable to setup conflict resolution using column '%s'",
2801 resolveColName);
2802 DBUG_PRINT("info", ("%s", msg));
2803 DBUG_RETURN(-1);
2804 }
2805
2806 /* Success, update message */
2807 my_snprintf(msg, msg_len,
2808 "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
2809 dbName,
2810 tabName,
2811 conflict_fn->name,
2812 resolveColName);
2813 break;
2814 }
2815 case CFT_NDB_EPOCH2:
2816 case CFT_NDB_EPOCH2_TRANS:
2817 {
2818 /* Check how updates will be logged... */
2819 bool log_update_as_write = (!tableBinlogUseUpdate);
2820
2821 if (log_update_as_write)
2822 {
2823 my_snprintf(msg, msg_len,
2824 "Table %s.%s configured to log updates as writes. "
2825 "Not suitable for %s.",
2826 dbName,
2827 tabName,
2828 conflict_fn->name);
2829 DBUG_PRINT("info", ("%s", msg));
2830 DBUG_RETURN(-1);
2831 }
2832
2833 /* Fall through for the rest of the EPOCH* processing... */
2834 }
2835 case CFT_NDB_EPOCH:
2836 case CFT_NDB_EPOCH_TRANS:
2837 {
2838 if (num_args > 1)
2839 {
2840 my_snprintf(msg, msg_len,
2841 "Too many arguments to conflict function");
2842 DBUG_PRINT("info", ("%s", msg));
2843 DBUG_RETURN(-1);
2844 }
2845
2846 /* Check that table doesn't have Blobs as we don't support that */
2847 if (tableUsesBlobs)
2848 {
2849 my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for %s.",
2850 conflict_fn->name);
2851 DBUG_PRINT("info", ("%s", msg));
2852 DBUG_RETURN(-1);
2853 }
2854
2855 /* Check that table has required extra meta-columns */
2856 /* Todo : Could warn if extra gcibits is insufficient to
2857 * represent SavePeriod/EpochPeriod
2858 */
2859 if (ndbtab->getExtraRowGciBits() == 0)
2860 sql_print_information("NDB Slave: Table %s.%s : %s, low epoch resolution",
2861 dbName,
2862 tabName,
2863 conflict_fn->name);
2864
2865 if (ndbtab->getExtraRowAuthorBits() == 0)
2866 {
2867 my_snprintf(msg, msg_len, "No extra row author bits in table.");
2868 DBUG_PRINT("info", ("%s", msg));
2869 DBUG_RETURN(-1);
2870 }
2871
2872 if (slave_set_resolve_fn(ndb,
2873 ppcfn_share,
2874 dbName,
2875 tabName,
2876 ndbtab,
2877 0, // field_no
2878 0, // resolve_col_sz
2879 conflict_fn, CFF_REFRESH_ROWS))
2880 {
2881 my_snprintf(msg, msg_len,
2882 "unable to setup conflict resolution");
2883 DBUG_PRINT("info", ("%s", msg));
2884 DBUG_RETURN(-1);
2885 }
2886 /* Success, update message */
2887 my_snprintf(msg, msg_len,
2888 "NDB Slave: Table %s.%s using conflict_fn %s.",
2889 dbName,
2890 tabName,
2891 conflict_fn->name);
2892
2893 break;
2894 }
2895 case CFT_NUMBER_OF_CFTS:
2896 case CFT_NDB_UNDEF:
2897 abort();
2898 }
2899 DBUG_RETURN(0);
2900 }
2901
2902
2903 void
teardown_conflict_fn(Ndb * ndb,NDB_CONFLICT_FN_SHARE * cfn_share)2904 teardown_conflict_fn(Ndb* ndb, NDB_CONFLICT_FN_SHARE* cfn_share)
2905 {
2906 if (cfn_share &&
2907 cfn_share->m_ex_tab_writer.hasTable() &&
2908 ndb)
2909 {
2910 cfn_share->m_ex_tab_writer.mem_free(ndb);
2911 }
2912
2913 // Release the NDB_CONFLICT_FN_SHARE which was allocated
2914 // in setup_conflict_fn()
2915 my_free(cfn_share);
2916 }
2917
2918
slave_reset_conflict_fn(NDB_CONFLICT_FN_SHARE * cfn_share)2919 void slave_reset_conflict_fn(NDB_CONFLICT_FN_SHARE *cfn_share)
2920 {
2921 if (cfn_share)
2922 {
2923 memset(cfn_share, 0, sizeof(*cfn_share));
2924 }
2925 }
2926
2927 #endif
2928
2929
2930 /**
2931 * Variables related to conflict handling
2932 * All prefixed 'ndb_conflict'
2933 */
2934
2935 SHOW_VAR ndb_status_conflict_variables[]= {
2936 {"fn_max", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2937 {"fn_old", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2938 {"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2939 {"fn_epoch", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2940 {"fn_epoch_trans", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH_TRANS], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2941 {"fn_epoch2", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH2], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2942 {"fn_epoch2_trans", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH2_TRANS], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2943 {"trans_row_conflict_count", (char*) &g_ndb_slave_state.trans_row_conflict_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2944 {"trans_row_reject_count", (char*) &g_ndb_slave_state.trans_row_reject_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2945 {"trans_reject_count", (char*) &g_ndb_slave_state.trans_in_conflict_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2946 {"trans_detect_iter_count", (char*) &g_ndb_slave_state.trans_detect_iter_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2947 {"trans_conflict_commit_count",
2948 (char*) &g_ndb_slave_state.trans_conflict_commit_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2949 {"epoch_delete_delete_count", (char*) &g_ndb_slave_state.total_delete_delete_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2950 {"reflected_op_prepare_count", (char*) &g_ndb_slave_state.total_reflect_op_prepare_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2951 {"reflected_op_discard_count", (char*) &g_ndb_slave_state.total_reflect_op_discard_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2952 {"refresh_op_count", (char*) &g_ndb_slave_state.total_refresh_op_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2953 {"last_conflict_epoch", (char*) &g_ndb_slave_state.last_conflicted_epoch, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2954 {"last_stable_epoch", (char*) &g_ndb_slave_state.last_stable_epoch, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2955 {NullS, NullS, SHOW_LONG, SHOW_SCOPE_GLOBAL}
2956 };
2957
2958 int
show_ndb_conflict_status_vars(THD * thd,struct st_mysql_show_var * var,char * buff)2959 show_ndb_conflict_status_vars(THD *thd, struct st_mysql_show_var *var, char *buff)
2960 {
2961 /* Just a function to allow moving array into this file */
2962 var->type = SHOW_ARRAY;
2963 var->value = (char*) &ndb_status_conflict_variables;
2964 return 0;
2965 }
2966
2967