1 /*
2    Copyright (c) 2012, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 #include <my_global.h> /* For config defines */
25 
26 #include "ha_ndbcluster_glue.h"
27 #include "ndb_conflict.h"
28 #include "ndb_binlog_extra_row_info.h"
29 #include "ndb_table_guard.h"
30 
31 extern st_ndb_slave_state g_ndb_slave_state;
32 
33 #ifdef HAVE_NDB_BINLOG
34 #include "ndb_mi.h"
35 extern ulong opt_ndb_slave_conflict_role;
36 extern ulong opt_ndb_extra_logging;
37 
38 #define NDBTAB NdbDictionary::Table
39 #define NDBCOL NdbDictionary::Column
40 
41 
42 #define NDB_EXCEPTIONS_TABLE_SUFFIX "$EX"
43 #define NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER "$ex"
44 
45 #define NDB_EXCEPTIONS_TABLE_COLUMN_PREFIX "NDB$"
46 #define NDB_EXCEPTIONS_TABLE_OP_TYPE "NDB$OP_TYPE"
47 #define NDB_EXCEPTIONS_TABLE_CONFLICT_CAUSE "NDB$CFT_CAUSE"
48 #define NDB_EXCEPTIONS_TABLE_ORIG_TRANSID "NDB$ORIG_TRANSID"
49 #define NDB_EXCEPTIONS_TABLE_COLUMN_OLD_SUFFIX "$OLD"
50 #define NDB_EXCEPTIONS_TABLE_COLUMN_NEW_SUFFIX "$NEW"
51 
52 
53 /*
54   Return true if a column has a specific prefix.
55 */
56 bool
has_prefix_ci(const char * col_name,const char * prefix,CHARSET_INFO * cs)57 ExceptionsTableWriter::has_prefix_ci(const char *col_name,
58                                      const char *prefix,
59                                      CHARSET_INFO *cs)
60 {
61   uint col_len= strlen(col_name);
62   uint prefix_len= strlen(prefix);
63   if (col_len < prefix_len)
64     return false;
65   char col_name_prefix[FN_HEADLEN];
66   strncpy(col_name_prefix, col_name, prefix_len);
67   col_name_prefix[prefix_len]= '\0';
68   return (my_strcasecmp(cs,
69                         col_name_prefix,
70                         prefix) == 0);
71 }
72 
73 /*
74   Return true if a column has a specific suffix
75   and sets the column_real_name to the column name
76   without the suffix.
77 */
78 bool
has_suffix_ci(const char * col_name,const char * suffix,CHARSET_INFO * cs,char * col_name_real)79 ExceptionsTableWriter::has_suffix_ci(const char *col_name,
80                                      const char *suffix,
81                                      CHARSET_INFO *cs,
82                                      char *col_name_real)
83 {
84   uint col_len= strlen(col_name);
85   uint suffix_len= strlen(suffix);
86   const char *col_name_endp= col_name + col_len;
87   strcpy(col_name_real, col_name);
88   if (col_len > suffix_len &&
89       my_strcasecmp(cs,
90                     col_name_endp - suffix_len,
91                     suffix) == 0)
92   {
93     col_name_real[col_len - suffix_len]= '\0';
94     return true;
95   }
96   return false;
97 }
98 
99 /*
100   Search for column_name in table and
101   return true if found. Also return what
102   position column was found in pos and possible
103   position in the primary key in key_pos.
104  */
105 bool
find_column_name_ci(CHARSET_INFO * cs,const char * col_name,const NdbDictionary::Table * table,int * pos,int * key_pos)106 ExceptionsTableWriter::find_column_name_ci(CHARSET_INFO *cs,
107                                            const char *col_name,
108                                            const NdbDictionary::Table* table,
109                                            int *pos,
110                                            int *key_pos)
111 {
112   int ncol= table->getNoOfColumns();
113   for(int m= 0; m < ncol; m++)
114   {
115     const NdbDictionary::Column* col= table->getColumn(m);
116     const char *tcol_name= col->getName();
117     if (col->getPrimaryKey())
118       (*key_pos)++;
119     if (my_strcasecmp(cs, col_name, tcol_name) == 0)
120     {
121       *pos= m;
122       return true;
123     }
124   }
125   return false;
126 }
127 
128 
129 bool
check_mandatory_columns(const NdbDictionary::Table * exceptionsTable)130 ExceptionsTableWriter::check_mandatory_columns(const NdbDictionary::Table* exceptionsTable)
131 {
132   DBUG_ENTER("ExceptionsTableWriter::check_mandatory_columns");
133   if (/* server id */
134       exceptionsTable->getColumn(0)->getType() == NDBCOL::Unsigned &&
135       exceptionsTable->getColumn(0)->getPrimaryKey() &&
136       /* master_server_id */
137       exceptionsTable->getColumn(1)->getType() == NDBCOL::Unsigned &&
138       exceptionsTable->getColumn(1)->getPrimaryKey() &&
139       /* master_epoch */
140       exceptionsTable->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
141       exceptionsTable->getColumn(2)->getPrimaryKey() &&
142       /* count */
143       exceptionsTable->getColumn(3)->getType() == NDBCOL::Unsigned &&
144       exceptionsTable->getColumn(3)->getPrimaryKey()
145       )
146     DBUG_RETURN(true);
147   else
148     DBUG_RETURN(false);
149 }
150 
151 bool
check_pk_columns(const NdbDictionary::Table * mainTable,const NdbDictionary::Table * exceptionsTable,int & k)152 ExceptionsTableWriter::check_pk_columns(const NdbDictionary::Table* mainTable,
153                                         const NdbDictionary::Table* exceptionsTable,
154                                         int &k)
155 {
156   DBUG_ENTER("ExceptionsTableWriter::check_pk_columns");
157   const int fixed_cols= 4;
158   int ncol= mainTable->getNoOfColumns();
159   int nkey= mainTable->getNoOfPrimaryKeys();
160   /* Check columns that are part of the primary key */
161   for (int i= k= 0; i < ncol && k < nkey; i++)
162   {
163       const NdbDictionary::Column* col= mainTable->getColumn(i);
164       if (col->getPrimaryKey())
165       {
166         const NdbDictionary::Column* ex_col=
167           exceptionsTable->getColumn(fixed_cols + k);
168         if(!(ex_col != NULL &&
169              col->getType() == ex_col->getType() &&
170              col->getLength() == ex_col->getLength() &&
171              col->getNullable() == ex_col->getNullable()))
172          {
173            /*
174               Primary key type of the original table doesn't match
175               the primary key column of the execption table.
176               Assume that the table format has been extended and
177               check more below.
178            */
179            DBUG_PRINT("info", ("Primary key column columns don't match, assume extended table"));
180            m_extended= true;
181            break;
182          }
183         /*
184           Store mapping of Exception table key# to
185           orig table attrid
186         */
187         DBUG_PRINT("info", ("%u: Setting m_key_attrids[%i]= %i", __LINE__, k, i));
188         m_key_attrids[k]= i;
189         k++;
190       }
191     }
192   DBUG_RETURN(true);
193 }
194 
195 bool
check_optional_columns(const NdbDictionary::Table * mainTable,const NdbDictionary::Table * exceptionsTable,char * msg_buf,uint msg_buf_len,const char ** msg,int & k,char * error_details,uint error_details_len)196 ExceptionsTableWriter::check_optional_columns(const NdbDictionary::Table* mainTable,
197                                               const NdbDictionary::Table* exceptionsTable,
198                                               char* msg_buf,
199                                               uint msg_buf_len,
200                                               const char** msg,
201                                               int &k,
202                                               char *error_details,
203                                               uint error_details_len)
204 {
205   DBUG_ENTER("ExceptionsTableWriter::check_optional_columns");
206   /*
207     Check optional columns.
208     Check if table has been extended by looking for
209     the NDB$ prefix. By looking at the columns in
210     reverse order we can determine if table has been
211     extended and then double check that the original
212     mandatory columns also have the NDB$ prefix.
213     If an incomplete primary key has been found or
214     additional non-primary key attributes from the
215     original table then table is also assumed to be
216     extended.
217   */
218   const char* ex_tab_name= exceptionsTable->getName();
219   const int fixed_cols= 4;
220   bool ok= true;
221   int xncol= exceptionsTable->getNoOfColumns();
222   int i;
223   for (i= xncol - 1; i >= 0; i--)
224   {
225     const NdbDictionary::Column* col= exceptionsTable->getColumn(i);
226     const char* col_name= col->getName();
227     /*
228       We really need the CHARSET_INFO from when the table was
229       created but NdbDictionary::Table doesn't save this. This
230       means we cannot handle tables and execption tables defined
231       with a charset different than the system charset.
232     */
233     CHARSET_INFO *cs= system_charset_info;
234     bool has_prefix= false;
235 
236     if (has_prefix_ci(col_name, NDB_EXCEPTIONS_TABLE_COLUMN_PREFIX, cs))
237     {
238       has_prefix= true;
239       m_extended= true;
240       DBUG_PRINT("info",
241                  ("Exceptions table %s is extended with column %s",
242                   ex_tab_name, col_name));
243     }
244     /* Check that mandatory columns have NDB$ prefix */
245     if (i < 4)
246     {
247       if (m_extended && !has_prefix)
248       {
249         my_snprintf(msg_buf, msg_buf_len,
250                     "Exceptions table %s is extended, but mandatory column %s  doesn't have the \'%s\' prefix",
251                     ex_tab_name,
252                     col_name,
253                     NDB_EXCEPTIONS_TABLE_COLUMN_PREFIX);
254         *msg= msg_buf;
255         DBUG_RETURN(false);
256       }
257     }
258     k= i - fixed_cols;
259     /* Check for extended columns */
260     if (my_strcasecmp(cs,
261                       col_name,
262                       NDB_EXCEPTIONS_TABLE_OP_TYPE) == 0)
263     {
264       /* Check if ENUM or INT UNSIGNED */
265       if (exceptionsTable->getColumn(i)->getType() != NDBCOL::Char &&
266           exceptionsTable->getColumn(i)->getType() != NDBCOL::Unsigned)
267       {
268         my_snprintf(error_details, error_details_len,
269                     "Table %s has incorrect type %u for NDB$OP_TYPE",
270                     exceptionsTable->getName(),
271                     exceptionsTable->getColumn(i)->getType());
272         DBUG_PRINT("info", ("%s", error_details));
273         ok= false;
274         break;
275       }
276       m_extended= true;
277       m_op_type_pos= i;
278       continue;
279     }
280     if (my_strcasecmp(cs,
281                       col_name,
282                       NDB_EXCEPTIONS_TABLE_CONFLICT_CAUSE) == 0)
283     {
284       /* Check if ENUM or INT UNSIGNED */
285       if (exceptionsTable->getColumn(i)->getType() != NDBCOL::Char &&
286           exceptionsTable->getColumn(i)->getType() != NDBCOL::Unsigned)
287       {
288         my_snprintf(error_details, error_details_len,
289                     "Table %s has incorrect type %u for NDB$CFT_CAUSE",
290                     exceptionsTable->getName(),
291                     exceptionsTable->getColumn(i)->getType());
292         DBUG_PRINT("info", ("%s", error_details));
293         ok= false;
294         break;
295       }
296       m_extended= true;
297       m_conflict_cause_pos= i;
298       continue;
299     }
300     if (my_strcasecmp(cs,
301                       col_name,
302                       NDB_EXCEPTIONS_TABLE_ORIG_TRANSID) == 0)
303     {
304       if (exceptionsTable->getColumn(i)->getType() != NDBCOL::Bigunsigned)
305       {
306         my_snprintf(error_details, error_details_len,
307                     "Table %s has incorrect type %u for NDB$ORIG_TRANSID",
308                     exceptionsTable->getName(),
309                     exceptionsTable->getColumn(i)->getType());
310         DBUG_PRINT("info", ("%s", error_details));
311         ok= false;
312         break;
313       }
314       m_extended= true;
315       m_orig_transid_pos= i;
316       continue;
317     }
318     /*
319       Check for any optional columns from the original table in the extended
320       table. Compare column types of columns with names matching a column in
321       the original table. If a non-primary key column is found we assume that
322       the table is extended.
323     */
324     if (i >= fixed_cols)
325     {
326       int match= -1;
327       int match_k= -1;
328       COLUMN_VERSION column_version= DEFAULT;
329       char col_name_real[FN_HEADLEN];
330       /* Check for old or new column reference */
331       if (has_suffix_ci(col_name,
332                         NDB_EXCEPTIONS_TABLE_COLUMN_OLD_SUFFIX,
333                         cs,
334                         col_name_real))
335       {
336         DBUG_PRINT("info", ("Found reference to old column %s", col_name));
337         column_version= OLD;
338       }
339       else if (has_suffix_ci(col_name,
340                              NDB_EXCEPTIONS_TABLE_COLUMN_NEW_SUFFIX,
341                              cs,
342                              col_name_real))
343       {
344         DBUG_PRINT("info", ("Found reference to new column %s", col_name));
345         column_version= NEW;
346       }
347       DBUG_PRINT("info", ("Checking for original column %s", col_name_real));
348       /*
349         We really need the CHARSET_INFO from when the table was
350         created but NdbDictionary::Table doesn't save this. This
351         means we cannot handle tables end execption tables defined
352         with a charset different than the system charset.
353       */
354       CHARSET_INFO *mcs= system_charset_info;
355       if (! find_column_name_ci(mcs, col_name_real, mainTable, &match, &match_k))
356       {
357         if (! strcmp(col_name, col_name_real))
358         {
359           /*
360             Column did have $OLD or $NEW suffix, but it didn't
361             match. Check if that is the real name of the column.
362           */
363           match_k= -1;
364           if (find_column_name_ci(mcs, col_name, mainTable, &match, &match_k))
365           {
366             DBUG_PRINT("info", ("Column %s in main table %s has an unfortunate name",
367                                 col_name, mainTable->getName()));
368           }
369         }
370       }
371       /*
372         Check that old or new references are nullable
373         or have a default value.
374       */
375       if (column_version != DEFAULT &&
376           match_k != -1)
377       {
378         if ((! col->getNullable()) &&
379             col->getDefaultValue() == NULL)
380         {
381           my_snprintf(error_details, error_details_len,
382                       "Old or new column reference %s in table %s is not nullable and doesn't have a default value",
383                       col->getName(), exceptionsTable->getName());
384           DBUG_PRINT("info", ("%s", error_details));
385           ok= false;
386           break;
387         }
388       }
389 
390       if (match == -1)
391       {
392         /*
393            Column do not have the same name, could be allowed
394            if column is nullable or has a default value,
395            continue checking, but give a warning to user
396         */
397         if ((! col->getNullable()) &&
398             col->getDefaultValue() == NULL)
399         {
400           my_snprintf(error_details, error_details_len,
401                       "Extra column %s in table %s is not nullable and doesn't have a default value",
402                       col->getName(), exceptionsTable->getName());
403           DBUG_PRINT("info", ("%s", error_details));
404           ok= false;
405           break;
406         }
407         my_snprintf(error_details, error_details_len,
408                     "Column %s in extension table %s not found in %s",
409                     col->getName(), exceptionsTable->getName(),
410                     mainTable->getName());
411         DBUG_PRINT("info", ("%s", error_details));
412         my_snprintf(msg_buf, msg_buf_len,
413                     "NDB Slave: exceptions table %s has suspicious "
414                     "definition ((column %d): %s",
415                     ex_tab_name, fixed_cols + k, error_details);
416         continue;
417       }
418       /* We have a matching name */
419       const NdbDictionary::Column* mcol= mainTable->getColumn(match);
420       if (col->getType() == mcol->getType())
421       {
422         DBUG_PRINT("info", ("Comparing column %s in exceptions table with column %s in main table", col->getName(), mcol->getName()));
423         /* We have matching type */
424         if (!mcol->getPrimaryKey())
425         {
426           /*
427             Matching non-key column found.
428             Check that column is nullable
429             or has a default value.
430           */
431           if (col->getNullable() ||
432               col->getDefaultValue() != NULL)
433           {
434             DBUG_PRINT("info", ("Mapping column %s %s(%i) to %s(%i)",
435                                 col->getName(),
436                                 mainTable->getName(), match,
437                                 exceptionsTable->getName(), i));
438             /* Save position */
439             m_data_pos[i]= match;
440             m_column_version[i]= column_version;
441           }
442           else
443           {
444             my_snprintf(error_details, error_details_len,
445                         "Data column %s in table %s is not nullable and doesn't have a default value",
446                         col->getName(), exceptionsTable->getName());
447             DBUG_PRINT("info", ("%s", error_details));
448             ok= false;
449             break;
450           }
451         }
452         else
453         {
454           /* Column is part of the primary key */
455           if (column_version != DEFAULT)
456           {
457             my_snprintf(error_details, error_details_len,
458                         "Old or new values of primary key columns cannot be referenced since primary keys cannot be updated, column %s in table %s",
459                         col->getName(), exceptionsTable->getName());
460             DBUG_PRINT("info", ("%s", error_details));
461             ok= false;
462             break;
463           }
464           if (col->getNullable() == mcol->getNullable())
465           {
466             /*
467               Columns are both nullable or not nullable.
468               Save position.
469             */
470             if (m_key_data_pos[match_k] != -1)
471             {
472               my_snprintf(error_details, error_details_len,
473                           "Multiple references to the same key column %s in table %s",
474                           col->getName(), exceptionsTable->getName());
475               DBUG_PRINT("info", ("%s", error_details));
476               ok= false;
477               break;
478             }
479             DBUG_PRINT("info", ("Setting m_key_data_pos[%i]= %i", match_k, i));
480             m_key_data_pos[match_k]= i;
481 
482             if (i == fixed_cols + match_k)
483             {
484               /* Found key column in correct position */
485               if (!m_extended)
486                 continue;
487             }
488             /*
489               Store mapping of Exception table key# to
490               orig table attrid
491             */
492             DBUG_PRINT("info", ("%u: Setting m_key_attrids[%i]= %i", __LINE__, match_k, match));
493             m_key_attrids[match_k]= match;
494             m_extended= true;
495           }
496           else if (column_version == DEFAULT)
497           {
498             /*
499                Columns have same name and same type
500                Column with this name is part of primary key,
501                but both columns are not declared not null
502             */
503             my_snprintf(error_details, error_details_len,
504                         "Pk column %s not declared not null in both tables",
505                         col->getName());
506             DBUG_PRINT("info", ("%s", error_details));
507             ok= false;
508             break;
509           }
510         }
511       }
512       else
513       {
514         /*
515            Columns have same name, but not the same type
516         */
517         my_snprintf(error_details, error_details_len,
518                     "Column %s has matching name to column %s for table %s, but wrong type, %u versus %u",
519                     col->getName(), mcol->getName(),
520                     mainTable->getName(),
521                     col->getType(), mcol->getType());
522         DBUG_PRINT("info", ("%s", error_details));
523         ok= false;
524         break;
525       }
526     }
527   }
528 
529   DBUG_RETURN(ok);
530 }
531 
532 int
init(const NdbDictionary::Table * mainTable,const NdbDictionary::Table * exceptionsTable,char * msg_buf,uint msg_buf_len,const char ** msg)533 ExceptionsTableWriter::init(const NdbDictionary::Table* mainTable,
534                             const NdbDictionary::Table* exceptionsTable,
535                             char* msg_buf,
536                             uint msg_buf_len,
537                             const char** msg)
538 {
539   DBUG_ENTER("ExceptionsTableWriter::init");
540   const char* ex_tab_name= exceptionsTable->getName();
541   const int fixed_cols= 4;
542   *msg= NULL;
543   *msg_buf= '\0';
544 
545   DBUG_PRINT("info", ("Checking definition of exceptions table %s",
546                       ex_tab_name));
547   /*
548     Check that the table have the corrct number of columns
549     and the mandatory columns.
550    */
551 
552   bool ok=
553     exceptionsTable->getNoOfColumns() >= fixed_cols &&
554     exceptionsTable->getNoOfPrimaryKeys() == 4 &&
555     check_mandatory_columns(exceptionsTable);
556 
557   if (ok)
558   {
559     char error_details[ FN_REFLEN ];
560     uint error_details_len= sizeof(error_details);
561     error_details[0]= '\0';
562     int ncol= mainTable->getNoOfColumns();
563     int nkey= mainTable->getNoOfPrimaryKeys();
564     int xncol= exceptionsTable->getNoOfColumns();
565     int i, k;
566     /* Initialize position arrays */
567     for(k=0; k < nkey; k++)
568       m_key_data_pos[k]= -1;
569     for(i=0; i < xncol; i++)
570       m_data_pos[i]= -1;
571     /* Initialize nullability information */
572     for(i=0; i < ncol; i++)
573     {
574       const NdbDictionary::Column* col= mainTable->getColumn(i);
575       m_col_nullable[i]= col->getNullable();
576     }
577 
578     /*
579       Check that the primary key columns in the main table
580       are referenced correctly.
581       Then check if the table is extended with optional
582       columns.
583      */
584     ok=
585       check_pk_columns(mainTable, exceptionsTable, k) &&
586       check_optional_columns(mainTable,
587                              exceptionsTable,
588                              msg_buf,
589                              msg_buf_len,
590                              msg,
591                              k,
592                              error_details,
593                              error_details_len);
594     if (ok)
595     {
596       m_ex_tab= exceptionsTable;
597       m_pk_cols= nkey;
598       m_cols= ncol;
599       m_xcols= xncol;
600       if (m_extended && strlen(msg_buf) > 0)
601         *msg= msg_buf;
602       DBUG_RETURN(0);
603     }
604     else
605       my_snprintf(msg_buf, msg_buf_len,
606                   "NDB Slave: exceptions table %s has wrong "
607                   "definition (column %d): %s",
608                   ex_tab_name, fixed_cols + k, error_details);
609   }
610   else
611     my_snprintf(msg_buf, msg_buf_len,
612                 "NDB Slave: exceptions table %s has wrong "
613                 "definition (initial %d columns)",
614                 ex_tab_name, fixed_cols);
615 
616   *msg= msg_buf;
617   DBUG_RETURN(-1);
618 }
619 
620 void
mem_free(Ndb * ndb)621 ExceptionsTableWriter::mem_free(Ndb* ndb)
622 {
623   if (m_ex_tab)
624   {
625     NdbDictionary::Dictionary* dict = ndb->getDictionary();
626     dict->removeTableGlobal(*m_ex_tab, 0);
627     m_ex_tab= 0;
628   }
629 }
630 
631 int
writeRow(NdbTransaction * trans,const NdbRecord * keyRecord,const NdbRecord * dataRecord,uint32 server_id,uint32 master_server_id,uint64 master_epoch,const uchar * oldRowPtr,const uchar * newRowPtr,enum_conflicting_op_type op_type,enum_conflict_cause conflict_cause,uint64 orig_transid,const MY_BITMAP * write_set,NdbError & err)632 ExceptionsTableWriter::writeRow(NdbTransaction* trans,
633                                 const NdbRecord* keyRecord,
634                                 const NdbRecord* dataRecord,
635                                 uint32 server_id,
636                                 uint32 master_server_id,
637                                 uint64 master_epoch,
638                                 const uchar* oldRowPtr,
639                                 const uchar* newRowPtr,
640                                 enum_conflicting_op_type op_type,
641                                 enum_conflict_cause conflict_cause,
642                                 uint64 orig_transid,
643                                 const MY_BITMAP *write_set,
644                                 NdbError& err)
645 {
646   DBUG_ENTER("ExceptionsTableWriter::writeRow");
647   DBUG_PRINT("info", ("op_type(pos):%u(%u), conflict_cause(pos):%u(%u), orig_transid:%llu(%u)",
648                       op_type, m_op_type_pos,
649                       conflict_cause, m_conflict_cause_pos,
650                       orig_transid, m_orig_transid_pos));
651   assert(write_set != NULL);
652   assert(err.code == 0);
653   const uchar* rowPtr= (op_type == DELETE_ROW)? oldRowPtr : newRowPtr;
654 
655   do
656   {
657     /* Have exceptions table, add row to it */
658     const NDBTAB *ex_tab= m_ex_tab;
659 
660     /* get insert op */
661     NdbOperation *ex_op= trans->getNdbOperation(ex_tab);
662     if (ex_op == NULL)
663     {
664       err= trans->getNdbError();
665       break;
666     }
667     if (ex_op->insertTuple() == -1)
668     {
669       err= ex_op->getNdbError();
670       break;
671     }
672     {
673       uint32 count= (uint32)++m_count;
674       /* Set mandatory columns */
675       if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
676           ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
677           ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
678           ex_op->setValue((Uint32)3, (const char *)&(count)))
679       {
680         err= ex_op->getNdbError();
681         break;
682       }
683       /* Set optional columns */
684       if (m_extended)
685       {
686         if (m_op_type_pos)
687         {
688           if (m_ex_tab->getColumn(m_op_type_pos)->getType()
689               == NDBCOL::Char)
690           {
691             /* Defined as ENUM */
692             char op_type_val= (char)op_type;
693             if (ex_op->setValue((Uint32)m_op_type_pos,
694                                 (const char *)&(op_type_val)))
695             {
696               err= ex_op->getNdbError();
697               break;
698             }
699           }
700           else
701           {
702             uint32 op_type_val= op_type;
703             if (ex_op->setValue((Uint32)m_op_type_pos,
704                                 (const char *)&(op_type_val)))
705             {
706               err= ex_op->getNdbError();
707               break;
708             }
709           }
710         }
711         if (m_conflict_cause_pos)
712         {
713           if (m_ex_tab->getColumn(m_conflict_cause_pos)->getType()
714               == NDBCOL::Char)
715           {
716             /* Defined as ENUM */
717             char conflict_cause_val= (char)conflict_cause;
718             if (ex_op->setValue((Uint32)m_conflict_cause_pos,
719                                 (const char *)&(conflict_cause_val)))
720             {
721               err= ex_op->getNdbError();
722               break;
723             }
724           }
725           else
726           {
727             uint32 conflict_cause_val= conflict_cause;
728             if (ex_op->setValue((Uint32)m_conflict_cause_pos,
729                                 (const char *)&(conflict_cause_val)))
730             {
731               err= ex_op->getNdbError();
732               break;
733             }
734           }
735         }
736         if (m_orig_transid_pos != 0)
737         {
738           const NdbDictionary::Column* col= m_ex_tab->getColumn(m_orig_transid_pos);
739           if (orig_transid == Ndb_binlog_extra_row_info::InvalidTransactionId
740               &&
741               col->getNullable())
742           {
743             if (ex_op->setValue((Uint32) m_orig_transid_pos, (char*)NULL))
744             {
745               err= ex_op->getNdbError();
746               break;
747             }
748           }
749           else
750           {
751             DBUG_PRINT("info", ("Setting orig_transid (%u) for table %s", m_orig_transid_pos, ex_tab->getName()));
752             uint64 orig_transid_val= orig_transid;
753             if (ex_op->setValue((Uint32)m_orig_transid_pos,
754                                 (const char *)&(orig_transid_val)))
755             {
756               err= ex_op->getNdbError();
757               break;
758             }
759           }
760         }
761       }
762     }
763     /* copy primary keys */
764     {
765       int nkey= m_pk_cols;
766       int k;
767       for (k= 0; k < nkey; k++)
768       {
769         assert(rowPtr != NULL);
770         if (m_key_data_pos[k] != -1)
771         {
772           const uchar* data=
773             (const uchar*) NdbDictionary::getValuePtr(keyRecord,
774                                                       (const char*) rowPtr,
775                                                       m_key_attrids[k]);
776           if (ex_op->setValue((Uint32) m_key_data_pos[k], (const char*)data) == -1)
777           {
778             err= ex_op->getNdbError();
779             break;
780           }
781         }
782       }
783     }
784     /* Copy additional data */
785     if (m_extended)
786     {
787       int xncol= m_xcols;
788       int i;
789       for (i= 0; i < xncol; i++)
790       {
791         const NdbDictionary::Column* col= m_ex_tab->getColumn(i);
792         const uchar* default_value=  (const uchar*) col->getDefaultValue();
793         DBUG_PRINT("info", ("Checking column %s(%i)%s", col->getName(), i,
794                             (default_value)?", has default value":""));
795         assert(rowPtr != NULL);
796         if (m_data_pos[i] != -1)
797         {
798           const uchar* row_vPtr= NULL;
799           switch (m_column_version[i]) {
800           case DEFAULT:
801             row_vPtr= rowPtr;
802             break;
803           case OLD:
804             if (op_type != WRITE_ROW)
805               row_vPtr= oldRowPtr;
806             break;
807           case NEW:
808             if (op_type != DELETE_ROW)
809               row_vPtr= newRowPtr;
810           }
811           if (row_vPtr == NULL ||
812               (m_col_nullable[m_data_pos[i]] &&
813                NdbDictionary::isNull(dataRecord,
814                                      (const char*) row_vPtr,
815                                      m_data_pos[i])))
816           {
817             DBUG_PRINT("info", ("Column %s is set to NULL because it is NULL", col->getName()));
818             if (ex_op->setValue((Uint32) i, (char*)NULL))
819             {
820               err= ex_op->getNdbError();
821               break;
822             }
823           }
824           else if (write_set != NULL && bitmap_is_set(write_set, m_data_pos[i]))
825           {
826             DBUG_PRINT("info", ("Column %s is set", col->getName()));
827             const uchar* data=
828               (const uchar*) NdbDictionary::getValuePtr(dataRecord,
829                                                         (const char*) row_vPtr,
830                                                         m_data_pos[i]);
831             if (ex_op->setValue((Uint32) i, (const char*)data) == -1)
832             {
833               err= ex_op->getNdbError();
834               break;
835             }
836           }
837           else if (default_value != NULL)
838           {
839             DBUG_PRINT("info", ("Column %s is not set to NULL because it has a default value", col->getName()));
840             /*
841              * Column has a default value
842              * Since no value was set in write_set
843              * we let the default value be set from
844              * Ndb instead.
845              */
846           }
847           else
848           {
849             DBUG_PRINT("info", ("Column %s is set to NULL because it not in write_set", col->getName()));
850             if (ex_op->setValue((Uint32) i, (char*)NULL))
851             {
852               err= ex_op->getNdbError();
853               break;
854             }
855           }
856         }
857       }
858     }
859   } while (0);
860 
861   if (err.code != 0)
862   {
863     if (err.classification == NdbError::SchemaError)
864     {
865       /*
866        * Something up with Exceptions table schema, forget it.
867        * No further exceptions will be recorded.
868        * Caller will log this and slave will stop.
869        */
870       NdbDictionary::Dictionary* dict= trans->getNdb()->getDictionary();
871       dict->removeTableGlobal(*m_ex_tab, false);
872       m_ex_tab= NULL;
873       DBUG_RETURN(0);
874     }
875     DBUG_RETURN(-1);
876   }
877   DBUG_RETURN(0);
878 }
879 
880 /* HAVE_NDB_BINLOG */
881 #endif
882 
883 /**
884    st_ndb_slave_state constructor
885 
886    Initialise Ndb Slave state object
887 */
st_ndb_slave_state()888 st_ndb_slave_state::st_ndb_slave_state()
889   : current_delete_delete_count(0),
890     current_reflect_op_prepare_count(0),
891     current_reflect_op_discard_count(0),
892     current_refresh_op_count(0),
893     current_master_server_epoch(0),
894     current_master_server_epoch_committed(false),
895     current_max_rep_epoch(0),
896     conflict_flags(0),
897     retry_trans_count(0),
898     current_trans_row_conflict_count(0),
899     current_trans_row_reject_count(0),
900     current_trans_in_conflict_count(0),
901     last_conflicted_epoch(0),
902     last_stable_epoch(0),
903     total_delete_delete_count(0),
904     total_reflect_op_prepare_count(0),
905     total_reflect_op_discard_count(0),
906     total_refresh_op_count(0),
907     max_rep_epoch(0),
908     sql_run_id(~Uint32(0)),
909     trans_row_conflict_count(0),
910     trans_row_reject_count(0),
911     trans_detect_iter_count(0),
912     trans_in_conflict_count(0),
913     trans_conflict_commit_count(0),
914     trans_conflict_apply_state(SAS_NORMAL),
915     trans_dependency_tracker(NULL)
916 {
917   memset(current_violation_count, 0, sizeof(current_violation_count));
918   memset(total_violation_count, 0, sizeof(total_violation_count));
919 
920   /* Init conflict handling state memroot */
921   const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
922   init_alloc_root(PSI_INSTRUMENT_ME,
923                   &conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
924 }
925 
~st_ndb_slave_state()926 st_ndb_slave_state::~st_ndb_slave_state()
927 {
928   free_root(&conflict_mem_root, 0);
929 }
930 
931 /**
932    resetPerAttemptCounters
933 
934    Reset the per-epoch-transaction-application-attempt counters
935 */
936 void
resetPerAttemptCounters()937 st_ndb_slave_state::resetPerAttemptCounters()
938 {
939   memset(current_violation_count, 0, sizeof(current_violation_count));
940   current_delete_delete_count = 0;
941   current_reflect_op_prepare_count = 0;
942   current_reflect_op_discard_count = 0;
943   current_refresh_op_count = 0;
944   current_trans_row_conflict_count = 0;
945   current_trans_row_reject_count = 0;
946   current_trans_in_conflict_count = 0;
947 
948   conflict_flags = 0;
949   current_max_rep_epoch = 0;
950 }
951 
952 /**
953    atTransactionAbort()
954 
955    Called by Slave SQL thread during transaction abort.
956 */
957 void
atTransactionAbort()958 st_ndb_slave_state::atTransactionAbort()
959 {
960 #ifdef HAVE_NDB_BINLOG
961   /* Reset any gathered transaction dependency information */
962   atEndTransConflictHandling();
963   trans_conflict_apply_state = SAS_NORMAL;
964 #endif
965 
966   /* Reset current-transaction counters + state */
967   resetPerAttemptCounters();
968 }
969 
970 
971 
972 /**
973    atTransactionCommit()
974 
975    Called by Slave SQL thread after transaction commit
976 */
977 void
atTransactionCommit(Uint64 epoch)978 st_ndb_slave_state::atTransactionCommit(Uint64 epoch)
979 {
980   assert( ((trans_dependency_tracker == NULL) &&
981            (trans_conflict_apply_state == SAS_NORMAL)) ||
982           ((trans_dependency_tracker != NULL) &&
983            (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
984   assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
985 
986   /* Merge committed transaction counters into total state
987    * Then reset current transaction counters
988    */
989   Uint32 total_conflicts = 0;
990   for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
991   {
992     total_conflicts+= current_violation_count[i];
993     total_violation_count[i]+= current_violation_count[i];
994   }
995   total_delete_delete_count+= current_delete_delete_count;
996   total_reflect_op_prepare_count+= current_reflect_op_prepare_count;
997   total_reflect_op_discard_count+= current_reflect_op_discard_count;
998   total_refresh_op_count+= current_refresh_op_count;
999   trans_row_conflict_count+= current_trans_row_conflict_count;
1000   trans_row_reject_count+= current_trans_row_reject_count;
1001   trans_in_conflict_count+= current_trans_in_conflict_count;
1002 
1003   if (current_trans_in_conflict_count)
1004     trans_conflict_commit_count++;
1005 
1006   if (current_max_rep_epoch > max_rep_epoch)
1007   {
1008     DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
1009                         max_rep_epoch,
1010                         current_max_rep_epoch));
1011     max_rep_epoch = current_max_rep_epoch;
1012   }
1013 
1014   {
1015     bool hadConflict = false;
1016     if (total_conflicts > 0)
1017     {
1018       /**
1019        * Conflict detected locally
1020        */
1021       DBUG_PRINT("info", ("Last conflicted epoch increases from %llu to %llu",
1022                           last_conflicted_epoch,
1023                           epoch));
1024       hadConflict = true;
1025     }
1026     else
1027     {
1028       /**
1029        * Update last_conflicted_epoch if we applied reflected or refresh ops
1030        * (Implies Secondary role in asymmetric algorithms)
1031        */
1032       assert(current_reflect_op_prepare_count >= current_reflect_op_discard_count);
1033       Uint32 current_reflect_op_apply_count = current_reflect_op_prepare_count -
1034         current_reflect_op_discard_count;
1035       if (current_reflect_op_apply_count > 0 ||
1036           current_refresh_op_count > 0)
1037       {
1038         DBUG_PRINT("info", ("Reflected (%u) or Refresh (%u) operations applied this "
1039                             "epoch, increasing last conflicted epoch from %llu to %llu.",
1040                             current_reflect_op_apply_count,
1041                             current_refresh_op_count,
1042                             last_conflicted_epoch,
1043                             epoch));
1044         hadConflict = true;
1045       }
1046     }
1047 
1048     /* Update status vars */
1049     if (hadConflict)
1050     {
1051       last_conflicted_epoch = epoch;
1052     }
1053     else
1054     {
1055       if (max_rep_epoch >= last_conflicted_epoch)
1056       {
1057         /**
1058          * This epoch which has looped the circle was stable -
1059          * no new conflicts have been found / corrected since
1060          * it was logged
1061          */
1062         last_stable_epoch = max_rep_epoch;
1063 
1064         /**
1065          * Note that max_rep_epoch >= last_conflicted_epoch
1066          * implies that there are no currently known-about
1067          * conflicts.
1068          * On the primary this is a definitive fact as it
1069          * finds out about all conflicts immediately.
1070          * On the secondary it does not mean that there
1071          * are not committed conflicts, just that they
1072          * have not started being corrected yet.
1073          */
1074       }
1075     }
1076   }
1077 
1078   resetPerAttemptCounters();
1079 
1080   /* Clear per-epoch-transaction retry_trans_count */
1081   retry_trans_count = 0;
1082 
1083   current_master_server_epoch_committed = true;
1084 
1085   DBUG_EXECUTE_IF("ndb_slave_fail_marking_epoch_committed",
1086                   {
1087                     fprintf(stderr,
1088                             "Slave clearing epoch committed flag "
1089                             "for epoch %llu/%llu (%llu)\n",
1090                             current_master_server_epoch >> 32,
1091                             current_master_server_epoch & 0xffffffff,
1092                             current_master_server_epoch);
1093                     current_master_server_epoch_committed = false;
1094                   });
1095 }
1096 
1097 /**
1098    verifyNextEpoch
1099 
1100    Check that a new incoming epoch from the relay log
1101    is expected given the current slave state, previous
1102    epoch etc.
1103    This is checking Generic replication errors, with
1104    a user warning thrown in too.
1105 */
1106 bool
verifyNextEpoch(Uint64 next_epoch,Uint32 master_server_id) const1107 st_ndb_slave_state::verifyNextEpoch(Uint64 next_epoch,
1108                                     Uint32 master_server_id) const
1109 {
1110   DBUG_ENTER("verifyNextEpoch");
1111 #ifdef HAVE_NDB_BINLOG
1112   /**
1113     WRITE_ROW to ndb_apply_status injected by MySQLD
1114     immediately upstream of us.
1115 
1116     Now we do some validation of the incoming epoch transaction's
1117     epoch - to make sure that we are getting a sensible sequence
1118     of epochs.
1119   */
1120   bool first_epoch_since_slave_start = (ndb_mi_get_slave_run_id() != sql_run_id);
1121 
1122   DBUG_PRINT("info", ("ndb_apply_status write from upstream master."
1123                       "ServerId %u, Epoch %llu/%llu (%llu) "
1124                       "Current master server epoch %llu/%llu (%llu)"
1125                       "Current master server epoch committed? %u",
1126                       master_server_id,
1127                       next_epoch >> 32,
1128                       next_epoch & 0xffffffff,
1129                       next_epoch,
1130                       current_master_server_epoch >> 32,
1131                       current_master_server_epoch & 0xffffffff,
1132                       current_master_server_epoch,
1133                       current_master_server_epoch_committed));
1134   DBUG_PRINT("info", ("mi_slave_run_id=%u, ndb_slave_state_run_id=%u",
1135                       ndb_mi_get_slave_run_id(), sql_run_id));
1136   DBUG_PRINT("info", ("First epoch since slave start : %u",
1137                       first_epoch_since_slave_start));
1138 
1139   /* Analysis of nextEpoch generally depends on whether it's the first or not */
1140   if (first_epoch_since_slave_start)
1141   {
1142     /**
1143        First epoch since slave start - might've had a CHANGE MASTER command,
1144        since we were last running, so we are not too strict about epoch
1145        changes, but we will warn.
1146     */
1147     if (next_epoch < current_master_server_epoch)
1148     {
1149       sql_print_warning("NDB Slave : At SQL thread start "
1150                         "applying epoch %llu/%llu "
1151                         "(%llu) from Master ServerId %u which is lower than previously "
1152                         "applied epoch %llu/%llu (%llu).  "
1153                         "Group Master Log : %s  Group Master Log Pos : %llu.  "
1154                         "Check slave positioning.",
1155                         next_epoch >> 32,
1156                         next_epoch & 0xffffffff,
1157                         next_epoch,
1158                         master_server_id,
1159                         current_master_server_epoch >> 32,
1160                         current_master_server_epoch & 0xffffffff,
1161                         current_master_server_epoch,
1162                         ndb_mi_get_group_master_log_name(),
1163                         ndb_mi_get_group_master_log_pos());
1164       /* Slave not stopped */
1165     }
1166     else if (next_epoch == current_master_server_epoch)
1167     {
1168       /**
1169          Could warn that started on already applied epoch,
1170          but this is often harmless.
1171       */
1172     }
1173     else
1174     {
1175       /* next_epoch > current_master_server_epoch - fine. */
1176     }
1177   }
1178   else
1179   {
1180     /**
1181        ! first_epoch_since_slave_start
1182 
1183        Slave has already applied some epoch in this run, so we expect
1184        either :
1185         a) previous epoch committed ok and next epoch is higher
1186                                   or
1187         b) previous epoch not committed and next epoch is the same
1188            (Retry case)
1189     */
1190     if (next_epoch < current_master_server_epoch)
1191     {
1192       /* Should never happen */
1193       sql_print_error("NDB Slave : SQL thread stopped as "
1194                       "applying epoch %llu/%llu "
1195                       "(%llu) from Master ServerId %u which is lower than previously "
1196                       "applied epoch %llu/%llu (%llu).  "
1197                       "Group Master Log : %s  Group Master Log Pos : %llu",
1198                       next_epoch >> 32,
1199                       next_epoch & 0xffffffff,
1200                       next_epoch,
1201                       master_server_id,
1202                       current_master_server_epoch >> 32,
1203                       current_master_server_epoch & 0xffffffff,
1204                       current_master_server_epoch,
1205                       ndb_mi_get_group_master_log_name(),
1206                       ndb_mi_get_group_master_log_pos());
1207       /* Stop the slave */
1208       DBUG_RETURN(false);
1209     }
1210     else if (next_epoch == current_master_server_epoch)
1211     {
1212       /**
1213          This is ok if we are retrying - e.g. the
1214          last epoch was not committed
1215       */
1216       if (current_master_server_epoch_committed)
1217       {
1218         /* This epoch is committed already, why are we replaying it? */
1219         sql_print_error("NDB Slave : SQL thread stopped as attempted "
1220                         "to reapply already committed epoch %llu/%llu (%llu) "
1221                         "from server id %u.  "
1222                         "Group Master Log : %s  Group Master Log Pos : %llu.",
1223                         current_master_server_epoch >> 32,
1224                         current_master_server_epoch & 0xffffffff,
1225                         current_master_server_epoch,
1226                         master_server_id,
1227                         ndb_mi_get_group_master_log_name(),
1228                         ndb_mi_get_group_master_log_pos());
1229         /* Stop the slave */
1230         DBUG_RETURN(false);
1231       }
1232       else
1233       {
1234         /* Probably a retry, no problem. */
1235       }
1236     }
1237     else
1238     {
1239       /**
1240          next_epoch > current_master_server_epoch
1241 
1242          This is the normal case, *unless* the previous epoch
1243          did not commit - in which case it may be a bug in
1244          transaction retry.
1245       */
1246       if (!current_master_server_epoch_committed)
1247       {
1248         /**
1249            We've moved onto a new epoch without committing
1250            the last - probably a bug in transaction retry
1251         */
1252         sql_print_error("NDB Slave : SQL thread stopped as attempting to "
1253                         "apply new epoch %llu/%llu (%llu) while lower "
1254                         "received epoch %llu/%llu (%llu) has not been "
1255                         "committed.  Master server id : %u.  "
1256                         "Group Master Log : %s  Group Master Log Pos : %llu.",
1257                         next_epoch >> 32,
1258                         next_epoch & 0xffffffff,
1259                         next_epoch,
1260                         current_master_server_epoch >> 32,
1261                         current_master_server_epoch & 0xffffffff,
1262                         current_master_server_epoch,
1263                         master_server_id,
1264                         ndb_mi_get_group_master_log_name(),
1265                         ndb_mi_get_group_master_log_pos());
1266         /* Stop the slave */
1267         DBUG_RETURN(false);
1268       }
1269       else
1270       {
1271         /* Normal case of next epoch after committing last */
1272       }
1273     }
1274   }
1275 #endif
1276 
1277   /* Epoch looks ok */
1278   DBUG_RETURN(true);
1279 }
1280 
1281 /**
1282    atApplyStatusWrite
1283 
1284    Called by Slave SQL thread when applying an event to the
1285    ndb_apply_status table
1286 */
1287 int
atApplyStatusWrite(Uint32 master_server_id,Uint32 row_server_id,Uint64 row_epoch,bool is_row_server_id_local)1288 st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
1289                                        Uint32 row_server_id,
1290                                        Uint64 row_epoch,
1291                                        bool is_row_server_id_local)
1292 {
1293   DBUG_ENTER("atApplyStatusWrite");
1294   if (row_server_id == master_server_id)
1295   {
1296     /* This is an apply status write from the immediate master */
1297 
1298     if (!verifyNextEpoch(row_epoch,
1299                          master_server_id))
1300     {
1301       /* Problem with the next epoch, stop the slave SQL thread */
1302       DBUG_RETURN(HA_ERR_ROWS_EVENT_APPLY);
1303     }
1304 
1305     /* Epoch ok, record that we're working on it now... */
1306 
1307     current_master_server_epoch = row_epoch;
1308     current_master_server_epoch_committed = false;
1309     assert(! is_row_server_id_local);
1310   }
1311   else if (is_row_server_id_local)
1312   {
1313     DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
1314                         " which is %s.",
1315                         row_server_id, row_epoch,
1316                         (row_epoch > current_max_rep_epoch)?
1317                         " new highest." : " older than previously applied"));
1318     if (row_epoch > current_max_rep_epoch)
1319     {
1320       /*
1321         Store new highest epoch in thdvar.  If we commit successfully
1322         then this can become the new global max
1323       */
1324       current_max_rep_epoch = row_epoch;
1325     }
1326   }
1327   DBUG_RETURN(0);
1328 }
1329 
1330 /**
1331    atResetSlave()
1332 
1333    Called when RESET SLAVE command issued - in context of command client.
1334 */
1335 void
atResetSlave()1336 st_ndb_slave_state::atResetSlave()
1337 {
1338   /* Reset the Maximum replicated epoch vars
1339    * on slave reset
1340    * No need to touch the sql_run_id as that
1341    * will increment if the slave is started
1342    * again.
1343    */
1344   resetPerAttemptCounters();
1345 
1346   retry_trans_count = 0;
1347   max_rep_epoch = 0;
1348   last_conflicted_epoch = 0;
1349   last_stable_epoch = 0;
1350 
1351   /* Reset current master server epoch
1352    * This avoids warnings when replaying a lower
1353    * epoch number after a RESET SLAVE - in this
1354    * case we assume the user knows best.
1355    */
1356   current_master_server_epoch = 0;
1357   current_master_server_epoch_committed = false;
1358 }
1359 
1360 
1361 /**
1362    atStartSlave()
1363 
1364    Called by Slave SQL thread when first applying a row to Ndb after
1365    a START SLAVE command.
1366 */
1367 void
atStartSlave()1368 st_ndb_slave_state::atStartSlave()
1369 {
1370 #ifdef HAVE_NDB_BINLOG
1371   if (trans_conflict_apply_state != SAS_NORMAL)
1372   {
1373     /*
1374       Remove conflict handling state on a SQL thread
1375       restart
1376     */
1377     atEndTransConflictHandling();
1378     trans_conflict_apply_state = SAS_NORMAL;
1379   }
1380 #endif
1381 }
1382 
1383 bool
checkSlaveConflictRoleChange(enum_slave_conflict_role old_role,enum_slave_conflict_role new_role,const char ** failure_cause)1384 st_ndb_slave_state::checkSlaveConflictRoleChange(enum_slave_conflict_role old_role,
1385                                                  enum_slave_conflict_role new_role,
1386                                                  const char** failure_cause)
1387 {
1388   if (old_role == new_role)
1389     return true;
1390 
1391   /**
1392    * Initial role is SCR_NONE
1393    * Allowed transitions :
1394    *   SCR_NONE -> SCR_PASS
1395    *   SCR_NONE -> SCR_PRIMARY
1396    *   SCR_NONE -> SCR_SECONDARY
1397    *   SCR_PRIMARY -> SCR_NONE
1398    *   SCR_PRIMARY -> SCR_SECONDARY
1399    *   SCR_SECONDARY -> SCR_NONE
1400    *   SCR_SECONDARY -> SCR_PRIMARY
1401    *   SCR_PASS -> SCR_NONE
1402    *
1403    * Disallowed transitions
1404    *   SCR_PASS -> SCR_PRIMARY
1405    *   SCR_PASS -> SCR_SECONDARY
1406    *   SCR_PRIMARY -> SCR_PASS
1407    *   SCR_SECONDARY -> SCR_PASS
1408    */
1409   bool bad_transition = false;
1410   *failure_cause = "Internal error";
1411 
1412   switch (old_role)
1413   {
1414   case SCR_NONE:
1415     break;
1416   case SCR_PRIMARY:
1417   case SCR_SECONDARY:
1418     bad_transition = (new_role == SCR_PASS);
1419     break;
1420   case SCR_PASS:
1421     bad_transition = ((new_role == SCR_PRIMARY) ||
1422                       (new_role == SCR_SECONDARY));
1423     break;
1424   default:
1425     assert(false);
1426     return false;
1427   }
1428 
1429   if (bad_transition)
1430   {
1431     *failure_cause = "Invalid role change.";
1432     return false;
1433   }
1434 
1435 #ifdef HAVE_NDB_BINLOG
1436   /* Check that Slave SQL thread is not running */
1437   if (ndb_mi_get_slave_sql_running())
1438   {
1439     *failure_cause = "Cannot change role while Slave SQL "
1440       "thread is running.  Use STOP SLAVE first.";
1441     return false;
1442   }
1443 #endif
1444 
1445   return true;
1446 }
1447 
1448 
1449 #ifdef HAVE_NDB_BINLOG
1450 
1451 /**
1452    atEndTransConflictHandling
1453 
1454    Called when transactional conflict handling has completed.
1455 */
1456 void
atEndTransConflictHandling()1457 st_ndb_slave_state::atEndTransConflictHandling()
1458 {
1459   DBUG_ENTER("atEndTransConflictHandling");
1460   /* Release any conflict handling state */
1461   if (trans_dependency_tracker)
1462   {
1463     current_trans_in_conflict_count =
1464       trans_dependency_tracker->get_conflict_count();
1465     trans_dependency_tracker = NULL;
1466     free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
1467   }
1468   DBUG_VOID_RETURN;
1469 }
1470 
1471 /**
1472    atBeginTransConflictHandling()
1473 
1474    Called by Slave SQL thread when it determines that Transactional
1475    Conflict handling is required
1476 */
1477 void
atBeginTransConflictHandling()1478 st_ndb_slave_state::atBeginTransConflictHandling()
1479 {
1480   DBUG_ENTER("atBeginTransConflictHandling");
1481   /*
1482      Allocate and initialise Transactional Conflict
1483      Resolution Handling Structures
1484   */
1485   assert(trans_dependency_tracker == NULL);
1486   trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
1487   DBUG_VOID_RETURN;
1488 }
1489 
1490 /**
1491    atPrepareConflictDetection
1492 
1493    Called by Slave SQL thread prior to defining an operation on
1494    a table with conflict detection defined.
1495 */
1496 int
atPrepareConflictDetection(const NdbDictionary::Table * table,const NdbRecord * key_rec,const uchar * row_data,Uint64 transaction_id,bool & handle_conflict_now)1497 st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
1498                                                const NdbRecord* key_rec,
1499                                                const uchar* row_data,
1500                                                Uint64 transaction_id,
1501                                                bool& handle_conflict_now)
1502 {
1503   DBUG_ENTER("atPrepareConflictDetection");
1504   /*
1505     Slave is preparing to apply an operation with conflict detection.
1506     If we're performing Transactional Conflict Resolution, take
1507     extra steps
1508   */
1509   switch( trans_conflict_apply_state )
1510   {
1511   case SAS_NORMAL:
1512     DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
1513     /* No special handling */
1514     break;
1515   case SAS_TRACK_TRANS_DEPENDENCIES:
1516   {
1517     DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
1518     /*
1519       Track this operation and its transaction id, to determine
1520       inter-transaction dependencies by {table, primary key}
1521     */
1522     assert( trans_dependency_tracker );
1523 
1524     int res = trans_dependency_tracker
1525       ->track_operation(table,
1526                         key_rec,
1527                         row_data,
1528                         transaction_id);
1529     if (res != 0)
1530     {
1531       sql_print_error("%s", trans_dependency_tracker->get_error_text());
1532       DBUG_RETURN(res);
1533     }
1534     /* Proceed as normal */
1535     break;
1536   }
1537   case SAS_APPLY_TRANS_DEPENDENCIES:
1538   {
1539     DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
1540     /*
1541        Check if this operation's transaction id is marked in-conflict.
1542        If it is, we tell the caller to perform conflict resolution now instead
1543        of attempting to apply the operation.
1544     */
1545     assert( trans_dependency_tracker );
1546 
1547     if (trans_dependency_tracker->in_conflict(transaction_id))
1548     {
1549       DBUG_PRINT("info", ("Event for transaction %llu is conflicting.  Handling.",
1550                           transaction_id));
1551       current_trans_row_reject_count++;
1552       handle_conflict_now = true;
1553       DBUG_RETURN(0);
1554     }
1555 
1556     /*
1557        This transaction is not marked in-conflict, so continue with normal
1558        processing.
1559        Note that normal processing may subsequently detect a conflict which
1560        didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
1561        In this case, we will rollback and repeat the TRACK_DEPENDENCIES
1562        stage.
1563     */
1564     DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
1565                         transaction_id));
1566     break;
1567   }
1568   }
1569   DBUG_RETURN(0);
1570 }
1571 
1572 /**
1573    atTransConflictDetected
1574 
1575    Called by the Slave SQL thread when a conflict is detected on
1576    an executed operation.
1577 */
1578 int
atTransConflictDetected(Uint64 transaction_id)1579 st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
1580 {
1581   DBUG_ENTER("atTransConflictDetected");
1582 
1583   /*
1584      The Slave has detected a conflict on an operation applied
1585      to a table with Transactional Conflict Resolution defined.
1586      Handle according to current state.
1587   */
1588   conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
1589   current_trans_row_conflict_count++;
1590 
1591   switch (trans_conflict_apply_state)
1592   {
1593   case SAS_NORMAL:
1594   {
1595     DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
1596                         "Requires multi-pass resolution.  Will transition to "
1597                         "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
1598     /*
1599       Conflict on table with transactional conflict resolution
1600       defined.
1601       This is the trigger that we will do transactional conflict
1602       resolution.
1603       Record that we need to do multiple passes to correctly
1604       perform resolution.
1605       TODO : Early exit from applying epoch?
1606     */
1607     break;
1608   }
1609   case SAS_TRACK_TRANS_DEPENDENCIES:
1610   {
1611     DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
1612                         "had conflict",
1613                         transaction_id));
1614     /*
1615        Conflict on table with transactional conflict resolution
1616        defined.
1617        We will mark the operation's transaction_id as in-conflict,
1618        so that any other operations on the transaction are also
1619        considered in-conflict, and any dependent transactions are also
1620        considered in-conflict.
1621     */
1622     assert(trans_dependency_tracker != NULL);
1623     int res = trans_dependency_tracker
1624       ->mark_conflict(transaction_id);
1625 
1626     if (res != 0)
1627     {
1628       sql_print_error("%s", trans_dependency_tracker->get_error_text());
1629       DBUG_RETURN(res);
1630     }
1631     break;
1632   }
1633   case SAS_APPLY_TRANS_DEPENDENCIES:
1634   {
1635     /*
1636        This must be a new conflict, not noticed on the previous
1637        pass.
1638     */
1639     DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected.  "
1640                         "Must be further conflict.  Will return to "
1641                         "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
1642     // TODO : Early exit from applying epoch
1643     break;
1644   }
1645   default:
1646     break;
1647   }
1648 
1649   DBUG_RETURN(0);
1650 }
1651 
1652 /**
1653    atConflictPreCommit
1654 
1655    Called by the Slave SQL thread prior to committing a Slave transaction.
1656    This method can request that the Slave transaction is retried.
1657 
1658 
1659    State transitions :
1660 
1661                        START SLAVE /
1662                        RESET SLAVE /
1663                         STARTUP
1664                             |
1665                             |
1666                             v
1667                     ****************
1668                     *  SAS_NORMAL  *
1669                     ****************
1670                        ^       |
1671     No transactional   |       | Conflict on transactional table
1672        conflicts       |       | (Rollback)
1673        (Commit)        |       |
1674                        |       v
1675             **********************************
1676             *  SAS_TRACK_TRANS_DEPENDENCIES  *
1677             **********************************
1678                ^          I              ^
1679      More      I          I Dependencies |
1680     conflicts  I          I determined   | No new conflicts
1681      found     I          I (Rollback)   | (Commit)
1682     (Rollback) I          I              |
1683                I          v              |
1684            **********************************
1685            *  SAS_APPLY_TRANS_DEPENDENCIES  *
1686            **********************************
1687 
1688 
1689    Operation
1690      The initial state is SAS_NORMAL.
1691 
1692      On detecting a conflict on a transactional conflict detetecing table,
1693      SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
1694      rolled back and reapplied.
1695 
1696      In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
1697      conflicts are tracked as the epoch transaction is applied.
1698 
1699      Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
1700      the epoch transaction is rolled back and reapplied.
1701 
1702      In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
1703      marked as in-conflict are not applied.
1704 
1705      If this results in no new conflicts, the epoch transaction is committed,
1706      and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
1707      the next replicated epch transaction.
1708      If it results in new conflicts, the epoch transactions is rolled back, and
1709      the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
1710      the new set of dependencies.
1711 
1712      If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
1713      the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
1714      state.
1715 
1716 
1717    Properties
1718      1) Normally, there is no transaction dependency tracking overhead paid by
1719         the slave.
1720 
1721      2) On first detecting a transactional conflict, the epoch transaction must be
1722         applied at least three times, with two rollbacks.
1723 
1724      3) Transactional conflicts detected in subsequent epochs require the epoch
1725         transaction to be applied two times, with one rollback.
1726 
1727      4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
1728         DEPENDENCIES occurs when further transactional conflicts are discovered
1729         in SAS_APPLY_TRANS_DEPENDENCIES state.  This implies that the  conflicts
1730         discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
1731         so we revisit that state to get a more complete picture.
1732 
1733      5) The number of iterations of this loop is fixed to a hard coded limit, after
1734         which the Slave will stop with an error.  This should be an unlikely
1735         occurrence, as it requires not just n conflicts, but at least 1 new conflict
1736         appearing between the transactions in the epoch transaction and the
1737         database between the two states, n times in a row.
1738 
1739      6) Where conflicts are occasional, as expected, the post-commit transition to
1740         SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
1741         transaction having its transaction dependencies needlessly tracked.
1742 
1743 */
1744 int
atConflictPreCommit(bool & retry_slave_trans)1745 st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
1746 {
1747   DBUG_ENTER("atConflictPreCommit");
1748 
1749   /*
1750     Prior to committing a Slave transaction, we check whether
1751     Transactional conflicts have been detected which require
1752     us to retry the slave transaction
1753   */
1754   retry_slave_trans = false;
1755   switch(trans_conflict_apply_state)
1756   {
1757   case SAS_NORMAL:
1758   {
1759     DBUG_PRINT("info", ("SAS_NORMAL"));
1760     /*
1761        Normal case.  Only if we defined conflict detection on a table
1762        with transactional conflict detection, and saw conflicts (on any table)
1763        do we go to another state
1764      */
1765     if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
1766     {
1767       DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
1768                           "SAS_TRACK_TRANS_DEPENDENCIES."));
1769       assert(conflict_flags & SCS_OPS_DEFINED);
1770       /* Transactional conflict resolution required, switch state */
1771       atBeginTransConflictHandling();
1772       resetPerAttemptCounters();
1773       trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
1774       retry_slave_trans = true;
1775     }
1776     break;
1777   }
1778   case SAS_TRACK_TRANS_DEPENDENCIES:
1779   {
1780     DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
1781 
1782     if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
1783     {
1784       /*
1785          Conflict on table with transactional detection
1786          this pass, we have collected the details and
1787          dependencies, now transition to
1788          SAS_APPLY_TRANS_DEPENDENCIES and
1789          reapply the epoch transaction without the
1790          conflicting transactions.
1791       */
1792       assert(conflict_flags & SCS_OPS_DEFINED);
1793       DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
1794                           "SAS_APPLY_TRANS_DEPENDENCIES"));
1795 
1796       trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
1797       trans_detect_iter_count++;
1798       retry_slave_trans = true;
1799       break;
1800     }
1801     else
1802     {
1803       /*
1804          No transactional conflicts detected this pass, lets
1805          return to SAS_NORMAL state after commit for more efficient
1806          application of epoch transactions
1807       */
1808       DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
1809                           "SAS_NORMAL"));
1810       atEndTransConflictHandling();
1811       trans_conflict_apply_state = SAS_NORMAL;
1812       break;
1813     }
1814   }
1815   case SAS_APPLY_TRANS_DEPENDENCIES:
1816   {
1817     DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
1818     assert(conflict_flags & SCS_OPS_DEFINED);
1819     /*
1820        We've applied the Slave epoch transaction subject to the
1821        conflict detection.  If any further transactional
1822        conflicts have been observed, then we must repeat the
1823        process.
1824     */
1825     atEndTransConflictHandling();
1826     atBeginTransConflictHandling();
1827     trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
1828 
1829     if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
1830     {
1831       DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
1832                           "TRACK_TRANS_DEPENDENCIES pass"));
1833       /*
1834          Further conflict observed when applying, need
1835          to re-determine dependencies
1836       */
1837       resetPerAttemptCounters();
1838       retry_slave_trans = true;
1839       break;
1840     }
1841 
1842 
1843     DBUG_PRINT("info", ("No further conflicts detected, committing and "
1844                         "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
1845     /*
1846        With dependencies taken into account, no further
1847        conflicts detected, can now proceed to commit
1848     */
1849     break;
1850   }
1851   }
1852 
1853   /*
1854     Clear conflict flags, to ensure that we detect any new conflicts
1855   */
1856   conflict_flags = 0;
1857 
1858   if (retry_slave_trans)
1859   {
1860     DBUG_PRINT("info", ("Requesting transaction restart"));
1861     DBUG_RETURN(1);
1862   }
1863 
1864   DBUG_PRINT("info", ("Allowing commit to proceed"));
1865   DBUG_RETURN(0);
1866 }
1867 
1868 
1869 
1870 /**
1871  * Conflict function interpreted programs
1872  */
1873 
1874 
1875 /**
1876   CFT_NDB_OLD
1877 
1878   To perform conflict detection, an interpreted program is used to read
1879   the timestamp stored locally and compare to what was on the master.
1880   If timestamp is not equal, an error for this operation (9998) will be raised,
1881   and new row will not be applied. The error codes for the operations will
1882   be checked on return.  For this to work is is vital that the operation
1883   is run with ignore error option.
1884 
1885   As an independent feature, phase 2 also saves the
1886   conflicts into the table's exceptions table.
1887 */
1888 static int
row_conflict_fn_old(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)1889 row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
1890                     enum_conflicting_op_type op_type,
1891                     const NdbRecord* data_record,
1892                     const uchar* old_data,
1893                     const uchar* new_data,
1894                     const MY_BITMAP* bi_cols,
1895                     const MY_BITMAP* ai_cols,
1896                     NdbInterpretedCode* code)
1897 {
1898   DBUG_ENTER("row_conflict_fn_old");
1899   uint32 resolve_column= cfn_share->m_resolve_column;
1900   uint32 resolve_size= cfn_share->m_resolve_size;
1901   const uchar* field_ptr = (const uchar*)
1902     NdbDictionary::getValuePtr(data_record,
1903                                (const char*) old_data,
1904                                cfn_share->m_resolve_column);
1905 
1906   assert((resolve_size == 4) || (resolve_size == 8));
1907 
1908   if (unlikely(!bitmap_is_set(bi_cols, resolve_column)))
1909   {
1910     sql_print_information("NDB Slave: missing data for %s "
1911                           "timestamp column %u.",
1912                           cfn_share->m_conflict_fn->name,
1913                           resolve_column);
1914     DBUG_RETURN(1);
1915   }
1916 
1917   const uint label_0= 0;
1918   const Uint32 RegOldValue= 1, RegCurrentValue= 2;
1919   int r;
1920 
1921   DBUG_PRINT("info",
1922              ("Adding interpreted filter, existing value must eq event old value"));
1923   /*
1924    * read old value from record
1925    */
1926   union {
1927     uint32 old_value_32;
1928     uint64 old_value_64;
1929   };
1930   {
1931     if (resolve_size == 4)
1932     {
1933       memcpy(&old_value_32, field_ptr, resolve_size);
1934       DBUG_PRINT("info", ("  old_value_32: %u", old_value_32));
1935     }
1936     else
1937     {
1938       memcpy(&old_value_64, field_ptr, resolve_size);
1939       DBUG_PRINT("info", ("  old_value_64: %llu",
1940                           (unsigned long long) old_value_64));
1941     }
1942   }
1943 
1944   /*
1945    * Load registers RegOldValue and RegCurrentValue
1946    */
1947   if (resolve_size == 4)
1948     r= code->load_const_u32(RegOldValue, old_value_32);
1949   else
1950     r= code->load_const_u64(RegOldValue, old_value_64);
1951   assert(r == 0);
1952   r= code->read_attr(RegCurrentValue, resolve_column);
1953   assert(r == 0);
1954   /*
1955    * if RegOldValue == RegCurrentValue goto label_0
1956    * else raise error for this row
1957    */
1958   r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
1959   assert(r == 0);
1960   r= code->interpret_exit_nok(error_conflict_fn_violation);
1961   assert(r == 0);
1962   r= code->def_label(label_0);
1963   assert(r == 0);
1964   r= code->interpret_exit_ok();
1965   assert(r == 0);
1966   r= code->finalise();
1967   assert(r == 0);
1968   DBUG_RETURN(r);
1969 }
1970 
1971 static int
row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)1972 row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
1973                                 enum_conflicting_op_type op_type,
1974                                 const NdbRecord* data_record,
1975                                 const uchar* old_data,
1976                                 const uchar* new_data,
1977                                 const MY_BITMAP* bi_cols,
1978                                 const MY_BITMAP* ai_cols,
1979                                 NdbInterpretedCode* code)
1980 {
1981   DBUG_ENTER("row_conflict_fn_max_update_only");
1982   uint32 resolve_column= cfn_share->m_resolve_column;
1983   uint32 resolve_size= cfn_share->m_resolve_size;
1984   const uchar* field_ptr = (const uchar*)
1985     NdbDictionary::getValuePtr(data_record,
1986                                (const char*) new_data,
1987                                cfn_share->m_resolve_column);
1988 
1989   assert((resolve_size == 4) || (resolve_size == 8));
1990 
1991   if (unlikely(!bitmap_is_set(ai_cols, resolve_column)))
1992   {
1993     sql_print_information("NDB Slave: missing data for %s "
1994                           "timestamp column %u.",
1995                           cfn_share->m_conflict_fn->name,
1996                           resolve_column);
1997     DBUG_RETURN(1);
1998   }
1999 
2000   const uint label_0= 0;
2001   const Uint32 RegNewValue= 1, RegCurrentValue= 2;
2002   int r;
2003 
2004   DBUG_PRINT("info",
2005              ("Adding interpreted filter, existing value must be lt event new"));
2006   /*
2007    * read new value from record
2008    */
2009   union {
2010     uint32 new_value_32;
2011     uint64 new_value_64;
2012   };
2013   {
2014     if (resolve_size == 4)
2015     {
2016       memcpy(&new_value_32, field_ptr, resolve_size);
2017       DBUG_PRINT("info", ("  new_value_32: %u", new_value_32));
2018     }
2019     else
2020     {
2021       memcpy(&new_value_64, field_ptr, resolve_size);
2022       DBUG_PRINT("info", ("  new_value_64: %llu",
2023                           (unsigned long long) new_value_64));
2024     }
2025   }
2026   /*
2027    * Load registers RegNewValue and RegCurrentValue
2028    */
2029   if (resolve_size == 4)
2030     r= code->load_const_u32(RegNewValue, new_value_32);
2031   else
2032     r= code->load_const_u64(RegNewValue, new_value_64);
2033   assert(r == 0);
2034   r= code->read_attr(RegCurrentValue, resolve_column);
2035   assert(r == 0);
2036   /*
2037    * if RegNewValue > RegCurrentValue goto label_0
2038    * else raise error for this row
2039    */
2040   r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
2041   assert(r == 0);
2042   r= code->interpret_exit_nok(error_conflict_fn_violation);
2043   assert(r == 0);
2044   r= code->def_label(label_0);
2045   assert(r == 0);
2046   r= code->interpret_exit_ok();
2047   assert(r == 0);
2048   r= code->finalise();
2049   assert(r == 0);
2050   DBUG_RETURN(r);
2051 }
2052 
2053 /**
2054   CFT_NDB_MAX
2055 
2056   To perform conflict resolution, an interpreted program is used to read
2057   the timestamp stored locally and compare to what is going to be applied.
2058   If timestamp is lower, an error for this operation (9999) will be raised,
2059   and new row will not be applied. The error codes for the operations will
2060   be checked on return.  For this to work is is vital that the operation
2061   is run with ignore error option.
2062 
2063   Note that for delete, this algorithm reverts to the OLD algorithm.
2064 */
2065 static int
row_conflict_fn_max(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2066 row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
2067                     enum_conflicting_op_type op_type,
2068                     const NdbRecord* data_record,
2069                     const uchar* old_data,
2070                     const uchar* new_data,
2071                     const MY_BITMAP* bi_cols,
2072                     const MY_BITMAP* ai_cols,
2073                     NdbInterpretedCode* code)
2074 {
2075   switch(op_type)
2076   {
2077   case WRITE_ROW:
2078     abort();
2079     return 1;
2080   case UPDATE_ROW:
2081     return row_conflict_fn_max_update_only(cfn_share,
2082                                            op_type,
2083                                            data_record,
2084                                            old_data,
2085                                            new_data,
2086                                            bi_cols,
2087                                            ai_cols,
2088                                            code);
2089   case DELETE_ROW:
2090     /* Can't use max of new image, as there's no new image
2091      * for DELETE
2092      * Use OLD instead
2093      */
2094     return row_conflict_fn_old(cfn_share,
2095                                op_type,
2096                                data_record,
2097                                old_data,
2098                                new_data,
2099                                bi_cols,
2100                                ai_cols,
2101                                code);
2102   default:
2103     abort();
2104     return 1;
2105   }
2106 }
2107 
2108 
2109 /**
2110   CFT_NDB_MAX_DEL_WIN
2111 
2112   To perform conflict resolution, an interpreted program is used to read
2113   the timestamp stored locally and compare to what is going to be applied.
2114   If timestamp is lower, an error for this operation (9999) will be raised,
2115   and new row will not be applied. The error codes for the operations will
2116   be checked on return.  For this to work is is vital that the operation
2117   is run with ignore error option.
2118 
2119   In this variant, replicated DELETEs alway succeed - no filter is added
2120   to them.
2121 */
2122 
2123 static int
row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2124 row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
2125                             enum_conflicting_op_type op_type,
2126                             const NdbRecord* data_record,
2127                             const uchar* old_data,
2128                             const uchar* new_data,
2129                             const MY_BITMAP* bi_cols,
2130                             const MY_BITMAP* ai_cols,
2131                             NdbInterpretedCode* code)
2132 {
2133   switch(op_type)
2134   {
2135   case WRITE_ROW:
2136     abort();
2137     return 1;
2138   case UPDATE_ROW:
2139     return row_conflict_fn_max_update_only(cfn_share,
2140                                            op_type,
2141                                            data_record,
2142                                            old_data,
2143                                            new_data,
2144                                            bi_cols,
2145                                            ai_cols,
2146                                            code);
2147   case DELETE_ROW:
2148     /* This variant always lets a received DELETE_ROW
2149      * succeed.
2150      */
2151     return 0;
2152   default:
2153     abort();
2154     return 1;
2155   }
2156 }
2157 
2158 
2159 /**
2160   CFT_NDB_EPOCH
2161 
2162 */
2163 
2164 static int
row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2165 row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
2166                       enum_conflicting_op_type op_type,
2167                       const NdbRecord* data_record,
2168                       const uchar* old_data,
2169                       const uchar* new_data,
2170                       const MY_BITMAP* bi_cols,
2171                       const MY_BITMAP* ai_cols,
2172                       NdbInterpretedCode* code)
2173 {
2174   DBUG_ENTER("row_conflict_fn_epoch");
2175   switch(op_type)
2176   {
2177   case WRITE_ROW:
2178     abort();
2179     DBUG_RETURN(1);
2180   case UPDATE_ROW:
2181   case DELETE_ROW:
2182   case READ_ROW: /* Read tracking */
2183   {
2184     const uint label_0= 0;
2185     const Uint32
2186       RegAuthor= 1, RegZero= 2,
2187       RegMaxRepEpoch= 1, RegRowEpoch= 2;
2188     int r;
2189 
2190     r= code->load_const_u32(RegZero, 0);
2191     assert(r == 0);
2192     r= code->read_attr(RegAuthor, NdbDictionary::Column::ROW_AUTHOR);
2193     assert(r == 0);
2194     /* If last author was not local, assume no conflict */
2195     r= code->branch_ne(RegZero, RegAuthor, label_0);
2196     assert(r == 0);
2197 
2198     /*
2199      * Load registers RegMaxRepEpoch and RegRowEpoch
2200      */
2201     r= code->load_const_u64(RegMaxRepEpoch, g_ndb_slave_state.max_rep_epoch);
2202     assert(r == 0);
2203     r= code->read_attr(RegRowEpoch, NdbDictionary::Column::ROW_GCI64);
2204     assert(r == 0);
2205 
2206     /*
2207      * if RegRowEpoch <= RegMaxRepEpoch goto label_0
2208      * else raise error for this row
2209      */
2210     r= code->branch_le(RegRowEpoch, RegMaxRepEpoch, label_0);
2211     assert(r == 0);
2212     r= code->interpret_exit_nok(error_conflict_fn_violation);
2213     assert(r == 0);
2214     r= code->def_label(label_0);
2215     assert(r == 0);
2216     r= code->interpret_exit_ok();
2217     assert(r == 0);
2218     r= code->finalise();
2219     assert(r == 0);
2220     DBUG_RETURN(r);
2221   }
2222   default:
2223     abort();
2224     DBUG_RETURN(1);
2225   }
2226 }
2227 
2228 /**
2229  * CFT_NDB_EPOCH2
2230  */
2231 
2232 static int
row_conflict_fn_epoch2_primary(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2233 row_conflict_fn_epoch2_primary(NDB_CONFLICT_FN_SHARE* cfn_share,
2234                                enum_conflicting_op_type op_type,
2235                                const NdbRecord* data_record,
2236                                const uchar* old_data,
2237                                const uchar* new_data,
2238                                const MY_BITMAP* bi_cols,
2239                                const MY_BITMAP* ai_cols,
2240                                NdbInterpretedCode* code)
2241 {
2242   DBUG_ENTER("row_conflict_fn_epoch2_primary");
2243 
2244   /* We use the normal NDB$EPOCH detection function */
2245   DBUG_RETURN(row_conflict_fn_epoch(cfn_share,
2246                                     op_type,
2247                                     data_record,
2248                                     old_data,
2249                                     new_data,
2250                                     bi_cols,
2251                                     ai_cols,
2252                                     code));
2253 }
2254 
2255 static int
row_conflict_fn_epoch2_secondary(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2256 row_conflict_fn_epoch2_secondary(NDB_CONFLICT_FN_SHARE* cfn_share,
2257                                  enum_conflicting_op_type op_type,
2258                                  const NdbRecord* data_record,
2259                                  const uchar* old_data,
2260                                  const uchar* new_data,
2261                                  const MY_BITMAP* bi_cols,
2262                                  const MY_BITMAP* ai_cols,
2263                                  NdbInterpretedCode* code)
2264 {
2265   DBUG_ENTER("row_conflict_fn_epoch2_secondary");
2266 
2267   /* Only called for reflected update and delete operations
2268    * on the secondary.
2269    * These are returning operations which should only be
2270    * applied if the row in the database was last written
2271    * remotely (by the Primary)
2272    */
2273 
2274   switch(op_type)
2275   {
2276   case WRITE_ROW:
2277     abort();
2278     DBUG_RETURN(1);
2279   case UPDATE_ROW:
2280   case DELETE_ROW:
2281   {
2282     const uint label_0= 0;
2283     const Uint32
2284       RegAuthor= 1, RegZero= 2;
2285     int r;
2286 
2287     r= code->load_const_u32(RegZero, 0);
2288     assert(r == 0);
2289     r= code->read_attr(RegAuthor, NdbDictionary::Column::ROW_AUTHOR);
2290     assert(r == 0);
2291     r= code->branch_eq(RegZero, RegAuthor, label_0);
2292     assert(r == 0);
2293     /* Last author was not local, no conflict, apply */
2294     r= code->interpret_exit_ok();
2295     assert(r == 0);
2296     r= code->def_label(label_0);
2297     assert(r == 0);
2298     /* Last author was secondary-local, conflict, do not apply */
2299     r= code->interpret_exit_nok(error_conflict_fn_violation);
2300     assert(r == 0);
2301 
2302 
2303     r= code->finalise();
2304     assert(r == 0);
2305     DBUG_RETURN(r);
2306   }
2307   default:
2308     abort();
2309     DBUG_RETURN(1);
2310   }
2311 }
2312 
2313 static int
row_conflict_fn_epoch2(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const NdbRecord * data_record,const uchar * old_data,const uchar * new_data,const MY_BITMAP * bi_cols,const MY_BITMAP * ai_cols,NdbInterpretedCode * code)2314 row_conflict_fn_epoch2(NDB_CONFLICT_FN_SHARE* cfn_share,
2315                        enum_conflicting_op_type op_type,
2316                        const NdbRecord* data_record,
2317                        const uchar* old_data,
2318                        const uchar* new_data,
2319                        const MY_BITMAP* bi_cols,
2320                        const MY_BITMAP* ai_cols,
2321                        NdbInterpretedCode* code)
2322 {
2323   DBUG_ENTER("row_conflict_fn_epoch2");
2324 
2325   /**
2326    * NdbEpoch2 behaviour depends on the Slave conflict role variable
2327    *
2328    */
2329   switch(opt_ndb_slave_conflict_role)
2330   {
2331   case SCR_NONE:
2332     /* This is a problem */
2333     DBUG_RETURN(1);
2334   case SCR_PRIMARY:
2335     DBUG_RETURN(row_conflict_fn_epoch2_primary(cfn_share,
2336                                                op_type,
2337                                                data_record,
2338                                                old_data,
2339                                                new_data,
2340                                                bi_cols,
2341                                                ai_cols,
2342                                                code));
2343   case SCR_SECONDARY:
2344     DBUG_RETURN(row_conflict_fn_epoch2_secondary(cfn_share,
2345                                                  op_type,
2346                                                  data_record,
2347                                                  old_data,
2348                                                  new_data,
2349                                                  bi_cols,
2350                                                  ai_cols,
2351                                                  code));
2352   case SCR_PASS:
2353     /* Do nothing */
2354     DBUG_RETURN(0);
2355 
2356   default:
2357     break;
2358   }
2359 
2360   abort();
2361 
2362   DBUG_RETURN(1);
2363 }
2364 
2365 /**
2366  * Conflict function setup infrastructure
2367  */
2368 
2369 static const st_conflict_fn_arg_def resolve_col_args[]=
2370 {
2371   /* Arg type              Optional */
2372   { CFAT_COLUMN_NAME,      false },
2373   { CFAT_END,              false }
2374 };
2375 
2376 static const st_conflict_fn_arg_def epoch_fn_args[]=
2377 {
2378   /* Arg type              Optional */
2379   { CFAT_EXTRA_GCI_BITS,   true  },
2380   { CFAT_END,              false }
2381 };
2382 
2383 static const st_conflict_fn_def conflict_fns[]=
2384 {
2385   { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
2386     &resolve_col_args[0], row_conflict_fn_max_del_win, 0 },
2387   { "NDB$MAX",            CFT_NDB_MAX,
2388     &resolve_col_args[0], row_conflict_fn_max,         0 },
2389   { "NDB$OLD",            CFT_NDB_OLD,
2390     &resolve_col_args[0], row_conflict_fn_old,         0 },
2391   { "NDB$EPOCH2_TRANS",   CFT_NDB_EPOCH2_TRANS,
2392     &epoch_fn_args[0],    row_conflict_fn_epoch2,
2393     CF_REFLECT_SEC_OPS | CF_USE_ROLE_VAR | CF_TRANSACTIONAL},
2394   { "NDB$EPOCH2",         CFT_NDB_EPOCH2,
2395     &epoch_fn_args[0],    row_conflict_fn_epoch2,
2396     CF_REFLECT_SEC_OPS | CF_USE_ROLE_VAR
2397   },
2398   { "NDB$EPOCH_TRANS",    CFT_NDB_EPOCH_TRANS,
2399     &epoch_fn_args[0],    row_conflict_fn_epoch,       CF_TRANSACTIONAL},
2400   { "NDB$EPOCH",          CFT_NDB_EPOCH,
2401     &epoch_fn_args[0],    row_conflict_fn_epoch,       0 }
2402 };
2403 
2404 static unsigned n_conflict_fns=
2405   sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
2406 
2407 
2408 int
parse_conflict_fn_spec(const char * conflict_fn_spec,const st_conflict_fn_def ** conflict_fn,st_conflict_fn_arg * args,Uint32 * max_args,char * msg,uint msg_len)2409 parse_conflict_fn_spec(const char* conflict_fn_spec,
2410                        const st_conflict_fn_def** conflict_fn,
2411                        st_conflict_fn_arg* args,
2412                        Uint32* max_args,
2413                        char *msg, uint msg_len)
2414 {
2415   DBUG_ENTER("parse_conflict_fn_spec");
2416 
2417   Uint32 no_args = 0;
2418   const char *ptr= conflict_fn_spec;
2419   const char *error_str= "unknown conflict resolution function";
2420   /* remove whitespace */
2421   while (*ptr == ' ' && *ptr != '\0') ptr++;
2422 
2423   DBUG_PRINT("info", ("parsing %s", conflict_fn_spec));
2424 
2425   for (unsigned i= 0; i < n_conflict_fns; i++)
2426   {
2427     const st_conflict_fn_def &fn= conflict_fns[i];
2428 
2429     uint len= (uint)strlen(fn.name);
2430     if (strncmp(ptr, fn.name, len))
2431       continue;
2432 
2433     DBUG_PRINT("info", ("found function %s", fn.name));
2434 
2435     /* skip function name */
2436     ptr+= len;
2437 
2438     /* remove whitespace */
2439     while (*ptr == ' ' && *ptr != '\0') ptr++;
2440 
2441     /* next '(' */
2442     if (*ptr != '(')
2443     {
2444       error_str= "missing '('";
2445       DBUG_PRINT("info", ("parse error %s", error_str));
2446       break;
2447     }
2448     ptr++;
2449 
2450     /* find all arguments */
2451     for (;;)
2452     {
2453       if (no_args >= *max_args)
2454       {
2455         error_str= "too many arguments";
2456         DBUG_PRINT("info", ("parse error %s", error_str));
2457         break;
2458       }
2459 
2460       /* expected type */
2461       enum enum_conflict_fn_arg_type type=
2462         conflict_fns[i].arg_defs[no_args].arg_type;
2463 
2464       /* remove whitespace */
2465       while (*ptr == ' ' && *ptr != '\0') ptr++;
2466 
2467       if (type == CFAT_END)
2468       {
2469         args[no_args].type= type;
2470         error_str= NULL;
2471         break;
2472       }
2473 
2474       /* arg */
2475       /* Todo : Should support comma as an arg separator? */
2476       const char *start_arg= ptr;
2477       while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
2478       const char *end_arg= ptr;
2479 
2480       bool optional_arg = conflict_fns[i].arg_defs[no_args].optional;
2481       /* any arg given? */
2482       if (start_arg == end_arg)
2483       {
2484         if (!optional_arg)
2485         {
2486           error_str= "missing function argument";
2487           DBUG_PRINT("info", ("parse error %s", error_str));
2488           break;
2489         }
2490         else
2491         {
2492           /* Arg was optional, and not present
2493            * Must be at end of args, finish parsing
2494            */
2495           args[no_args].type= CFAT_END;
2496           error_str= NULL;
2497           break;
2498         }
2499       }
2500 
2501       uint len= (uint)(end_arg - start_arg);
2502       args[no_args].type=    type;
2503 
2504       DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
2505 
2506       bool arg_processing_error = false;
2507       switch (type)
2508       {
2509       case CFAT_COLUMN_NAME:
2510       {
2511         /* Copy column name out into argument's buffer */
2512         char* dest= &args[no_args].resolveColNameBuff[0];
2513 
2514         memcpy(dest, start_arg, (len < (uint) NAME_CHAR_LEN ?
2515                                  len :
2516                                  NAME_CHAR_LEN));
2517         dest[len]= '\0';
2518         break;
2519       }
2520       case CFAT_EXTRA_GCI_BITS:
2521       {
2522         /* Map string to number and check it's in range etc */
2523         char* end_of_arg = (char*) end_arg;
2524         Uint32 bits = strtoul(start_arg, &end_of_arg, 0);
2525         DBUG_PRINT("info", ("Using %u as the number of extra bits", bits));
2526 
2527         if (bits > 31)
2528         {
2529           arg_processing_error= true;
2530           error_str= "Too many extra Gci bits";
2531           DBUG_PRINT("info", ("%s", error_str));
2532           break;
2533         }
2534         /* Num bits seems ok */
2535         args[no_args].extraGciBits = bits;
2536         break;
2537       }
2538       case CFAT_END:
2539         abort();
2540       }
2541 
2542       if (arg_processing_error)
2543         break;
2544       no_args++;
2545     }
2546 
2547     if (error_str)
2548       break;
2549 
2550     /* remove whitespace */
2551     while (*ptr == ' ' && *ptr != '\0') ptr++;
2552 
2553     /* next ')' */
2554     if (*ptr != ')')
2555     {
2556       error_str= "missing ')'";
2557       break;
2558     }
2559     ptr++;
2560 
2561     /* remove whitespace */
2562     while (*ptr == ' ' && *ptr != '\0') ptr++;
2563 
2564     /* garbage in the end? */
2565     if (*ptr != '\0')
2566     {
2567       error_str= "garbage in the end";
2568       break;
2569     }
2570 
2571     /* Update ptrs to conflict fn + # of args */
2572     *conflict_fn = &conflict_fns[i];
2573     *max_args = no_args;
2574 
2575     DBUG_RETURN(0);
2576   }
2577   /* parse error */
2578   my_snprintf(msg, msg_len, "%s, %s at '%s'",
2579               conflict_fn_spec, error_str, ptr);
2580   DBUG_PRINT("info", ("%s", msg));
2581   DBUG_RETURN(-1);
2582 }
2583 
2584 static uint
slave_check_resolve_col_type(const NDBTAB * ndbtab,uint field_index)2585 slave_check_resolve_col_type(const NDBTAB *ndbtab,
2586                              uint field_index)
2587 {
2588   DBUG_ENTER("slave_check_resolve_col_type");
2589   const NDBCOL *c= ndbtab->getColumn(field_index);
2590   uint sz= 0;
2591   switch (c->getType())
2592   {
2593   case  NDBCOL::Unsigned:
2594     sz= sizeof(Uint32);
2595     DBUG_PRINT("info", ("resolve column Uint32 %u",
2596                         field_index));
2597     break;
2598   case  NDBCOL::Bigunsigned:
2599     sz= sizeof(Uint64);
2600     DBUG_PRINT("info", ("resolve column Uint64 %u",
2601                         field_index));
2602     break;
2603   default:
2604     DBUG_PRINT("info", ("resolve column %u has wrong type",
2605                         field_index));
2606     break;
2607   }
2608   DBUG_RETURN(sz);
2609 }
2610 
2611 static int
slave_set_resolve_fn(Ndb * ndb,NDB_CONFLICT_FN_SHARE ** ppcfn_share,const char * dbName,const char * tabName,const NDBTAB * ndbtab,uint field_index,uint resolve_col_sz,const st_conflict_fn_def * conflict_fn,uint8 flags)2612 slave_set_resolve_fn(Ndb* ndb,
2613                      NDB_CONFLICT_FN_SHARE** ppcfn_share,
2614                      const char* dbName,
2615                      const char* tabName,
2616                      const NDBTAB *ndbtab, uint field_index,
2617                      uint resolve_col_sz,
2618                      const st_conflict_fn_def* conflict_fn,
2619                      uint8 flags)
2620 {
2621   DBUG_ENTER("slave_set_resolve_fn");
2622 
2623   NdbDictionary::Dictionary *dict= ndb->getDictionary();
2624   NDB_CONFLICT_FN_SHARE *cfn_share= *ppcfn_share;
2625   const char *ex_suffix= (char *)NDB_EXCEPTIONS_TABLE_SUFFIX;
2626   if (cfn_share == NULL)
2627   {
2628     *ppcfn_share= cfn_share=
2629         (NDB_CONFLICT_FN_SHARE*) my_malloc(PSI_INSTRUMENT_ME,
2630                                            sizeof(NDB_CONFLICT_FN_SHARE),
2631                                            MYF(MY_WME | ME_FATALERROR));
2632     slave_reset_conflict_fn(cfn_share);
2633   }
2634   cfn_share->m_conflict_fn= conflict_fn;
2635 
2636   /* Calculate resolve col stuff (if relevant) */
2637   cfn_share->m_resolve_size= resolve_col_sz;
2638   cfn_share->m_resolve_column= field_index;
2639   cfn_share->m_flags = flags;
2640 
2641   /* Init Exceptions Table Writer */
2642   new (&cfn_share->m_ex_tab_writer) ExceptionsTableWriter();
2643   /* Check for '$EX' or '$ex' suffix in table name */
2644   for (int tries= 2;
2645        tries-- > 0;
2646        ex_suffix=
2647          (tries == 1)
2648          ? (const char *)NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER
2649          : NullS)
2650   {
2651     /* get exceptions table */
2652     char ex_tab_name[FN_REFLEN];
2653     strxnmov(ex_tab_name, sizeof(ex_tab_name), tabName,
2654              ex_suffix, NullS);
2655     ndb->setDatabaseName(dbName);
2656     Ndb_table_guard ndbtab_g(dict, ex_tab_name);
2657     const NDBTAB *ex_tab= ndbtab_g.get_table();
2658     if (ex_tab)
2659     {
2660       char msgBuf[ FN_REFLEN ];
2661       const char* msg = NULL;
2662       if (cfn_share->m_ex_tab_writer.init(ndbtab,
2663                                           ex_tab,
2664                                           msgBuf,
2665                                           sizeof(msgBuf),
2666                                           &msg) == 0)
2667       {
2668         /* Ok */
2669         /* Hold our table reference outside the table_guard scope */
2670         ndbtab_g.release();
2671 
2672         /* Table looked suspicious, warn user */
2673         if (msg)
2674           sql_print_warning("%s", msg);
2675 
2676         if (opt_ndb_extra_logging)
2677         {
2678           sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
2679                                 dbName,
2680                                 tabName,
2681                                 dbName,
2682                                 ex_tab_name);
2683         }
2684       }
2685       else
2686       {
2687         sql_print_warning("%s", msg);
2688       }
2689       break;
2690     } /* if (ex_tab) */
2691   }
2692   DBUG_RETURN(0);
2693 }
2694 
2695 
2696 bool
is_exceptions_table(const char * table_name)2697 is_exceptions_table(const char *table_name)
2698 {
2699   size_t len = strlen(table_name);
2700   size_t suffixlen = strlen(NDB_EXCEPTIONS_TABLE_SUFFIX);
2701   if(len > suffixlen &&
2702      (strcmp(table_name + len - suffixlen,
2703              lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
2704                                       NDB_EXCEPTIONS_TABLE_SUFFIX) == 0))
2705   {
2706      return true;
2707   }
2708   return false;
2709 }
2710 
2711 int
setup_conflict_fn(Ndb * ndb,NDB_CONFLICT_FN_SHARE ** ppcfn_share,const char * dbName,const char * tabName,bool tableUsesBlobs,bool tableBinlogUseUpdate,const NDBTAB * ndbtab,char * msg,uint msg_len,const st_conflict_fn_def * conflict_fn,const st_conflict_fn_arg * args,const Uint32 num_args)2712 setup_conflict_fn(Ndb* ndb,
2713                   NDB_CONFLICT_FN_SHARE** ppcfn_share,
2714                   const char* dbName,
2715                   const char* tabName,
2716                   bool tableUsesBlobs,
2717                   bool tableBinlogUseUpdate,
2718                   const NDBTAB *ndbtab,
2719                   char *msg, uint msg_len,
2720                   const st_conflict_fn_def* conflict_fn,
2721                   const st_conflict_fn_arg* args,
2722                   const Uint32 num_args)
2723 {
2724   DBUG_ENTER("setup_conflict_fn");
2725 
2726   if(is_exceptions_table(tabName))
2727   {
2728     my_snprintf(msg, msg_len,
2729                 "Ndb Slave: Table %s.%s is exceptions table: not using conflict function %s",
2730                 dbName,
2731                 tabName,
2732                 conflict_fn->name);
2733     DBUG_PRINT("info", ("%s", msg));
2734     DBUG_RETURN(0);
2735   }
2736 
2737   /* setup the function */
2738   switch (conflict_fn->type)
2739   {
2740   case CFT_NDB_MAX:
2741   case CFT_NDB_OLD:
2742   case CFT_NDB_MAX_DEL_WIN:
2743   {
2744     if (num_args != 1)
2745     {
2746       my_snprintf(msg, msg_len,
2747                   "Incorrect arguments to conflict function");
2748       DBUG_PRINT("info", ("%s", msg));
2749       DBUG_RETURN(-1);
2750     }
2751 
2752     /* Now try to find the column in the table */
2753     int colNum = -1;
2754     const char* resolveColName = args[0].resolveColNameBuff;
2755     int resolveColNameLen = (int)strlen(resolveColName);
2756 
2757     for (int j=0; j< ndbtab->getNoOfColumns(); j++)
2758     {
2759       const char* colName = ndbtab->getColumn(j)->getName();
2760 
2761       if (strncmp(colName,
2762                   resolveColName,
2763                   resolveColNameLen) == 0 &&
2764           colName[resolveColNameLen] == '\0')
2765       {
2766         colNum = j;
2767         break;
2768       }
2769     }
2770     if (colNum == -1)
2771     {
2772       my_snprintf(msg, msg_len,
2773                   "Could not find resolve column %s.",
2774                   resolveColName);
2775       DBUG_PRINT("info", ("%s", msg));
2776       DBUG_RETURN(-1);
2777     }
2778 
2779     const uint resolve_col_sz= slave_check_resolve_col_type(ndbtab, colNum);
2780     if (resolve_col_sz == 0)
2781     {
2782       /* wrong data type */
2783       slave_reset_conflict_fn(*ppcfn_share);
2784       my_snprintf(msg, msg_len,
2785                   "Column '%s' has wrong datatype",
2786                   resolveColName);
2787       DBUG_PRINT("info", ("%s", msg));
2788       DBUG_RETURN(-1);
2789     }
2790 
2791     if (slave_set_resolve_fn(ndb,
2792                              ppcfn_share,
2793                              dbName,
2794                              tabName,
2795                              ndbtab,
2796                              colNum, resolve_col_sz,
2797                              conflict_fn, CFF_NONE))
2798     {
2799       my_snprintf(msg, msg_len,
2800                   "Unable to setup conflict resolution using column '%s'",
2801                   resolveColName);
2802       DBUG_PRINT("info", ("%s", msg));
2803       DBUG_RETURN(-1);
2804     }
2805 
2806     /* Success, update message */
2807     my_snprintf(msg, msg_len,
2808                 "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
2809                 dbName,
2810                 tabName,
2811                 conflict_fn->name,
2812                 resolveColName);
2813     break;
2814   }
2815   case CFT_NDB_EPOCH2:
2816   case CFT_NDB_EPOCH2_TRANS:
2817   {
2818     /* Check how updates will be logged... */
2819     bool log_update_as_write = (!tableBinlogUseUpdate);
2820 
2821     if (log_update_as_write)
2822     {
2823       my_snprintf(msg, msg_len,
2824                   "Table %s.%s configured to log updates as writes.  "
2825                   "Not suitable for %s.",
2826                   dbName,
2827                   tabName,
2828                   conflict_fn->name);
2829       DBUG_PRINT("info", ("%s", msg));
2830       DBUG_RETURN(-1);
2831     }
2832 
2833     /* Fall through for the rest of the EPOCH* processing... */
2834   }
2835   case CFT_NDB_EPOCH:
2836   case CFT_NDB_EPOCH_TRANS:
2837   {
2838     if (num_args > 1)
2839     {
2840       my_snprintf(msg, msg_len,
2841                   "Too many arguments to conflict function");
2842       DBUG_PRINT("info", ("%s", msg));
2843       DBUG_RETURN(-1);
2844     }
2845 
2846     /* Check that table doesn't have Blobs as we don't support that */
2847     if (tableUsesBlobs)
2848     {
2849       my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for %s.",
2850                   conflict_fn->name);
2851       DBUG_PRINT("info", ("%s", msg));
2852       DBUG_RETURN(-1);
2853     }
2854 
2855     /* Check that table has required extra meta-columns */
2856     /* Todo : Could warn if extra gcibits is insufficient to
2857      * represent SavePeriod/EpochPeriod
2858      */
2859     if (ndbtab->getExtraRowGciBits() == 0)
2860       sql_print_information("NDB Slave: Table %s.%s : %s, low epoch resolution",
2861                             dbName,
2862                             tabName,
2863                             conflict_fn->name);
2864 
2865     if (ndbtab->getExtraRowAuthorBits() == 0)
2866     {
2867       my_snprintf(msg, msg_len, "No extra row author bits in table.");
2868       DBUG_PRINT("info", ("%s", msg));
2869       DBUG_RETURN(-1);
2870     }
2871 
2872     if (slave_set_resolve_fn(ndb,
2873                              ppcfn_share,
2874                              dbName,
2875                              tabName,
2876                              ndbtab,
2877                              0, // field_no
2878                              0, // resolve_col_sz
2879                              conflict_fn, CFF_REFRESH_ROWS))
2880     {
2881       my_snprintf(msg, msg_len,
2882                   "unable to setup conflict resolution");
2883       DBUG_PRINT("info", ("%s", msg));
2884       DBUG_RETURN(-1);
2885     }
2886     /* Success, update message */
2887     my_snprintf(msg, msg_len,
2888                 "NDB Slave: Table %s.%s using conflict_fn %s.",
2889                 dbName,
2890                 tabName,
2891                 conflict_fn->name);
2892 
2893     break;
2894   }
2895   case CFT_NUMBER_OF_CFTS:
2896   case CFT_NDB_UNDEF:
2897     abort();
2898   }
2899   DBUG_RETURN(0);
2900 }
2901 
2902 
2903 void
teardown_conflict_fn(Ndb * ndb,NDB_CONFLICT_FN_SHARE * cfn_share)2904 teardown_conflict_fn(Ndb* ndb, NDB_CONFLICT_FN_SHARE* cfn_share)
2905 {
2906   if (cfn_share &&
2907       cfn_share->m_ex_tab_writer.hasTable() &&
2908       ndb)
2909   {
2910     cfn_share->m_ex_tab_writer.mem_free(ndb);
2911   }
2912 
2913   // Release the NDB_CONFLICT_FN_SHARE which was allocated
2914   // in setup_conflict_fn()
2915   my_free(cfn_share);
2916 }
2917 
2918 
slave_reset_conflict_fn(NDB_CONFLICT_FN_SHARE * cfn_share)2919 void slave_reset_conflict_fn(NDB_CONFLICT_FN_SHARE *cfn_share)
2920 {
2921   if (cfn_share)
2922   {
2923     memset(cfn_share, 0, sizeof(*cfn_share));
2924   }
2925 }
2926 
2927 #endif
2928 
2929 
2930 /**
2931  * Variables related to conflict handling
2932  * All prefixed 'ndb_conflict'
2933  */
2934 
2935 SHOW_VAR ndb_status_conflict_variables[]= {
2936   {"fn_max",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2937   {"fn_old",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2938   {"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2939   {"fn_epoch",     (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2940   {"fn_epoch_trans", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH_TRANS], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2941   {"fn_epoch2",    (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH2], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2942   {"fn_epoch2_trans", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH2_TRANS], SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2943   {"trans_row_conflict_count", (char*) &g_ndb_slave_state.trans_row_conflict_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2944   {"trans_row_reject_count",   (char*) &g_ndb_slave_state.trans_row_reject_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2945   {"trans_reject_count",       (char*) &g_ndb_slave_state.trans_in_conflict_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2946   {"trans_detect_iter_count",  (char*) &g_ndb_slave_state.trans_detect_iter_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2947   {"trans_conflict_commit_count",
2948                                (char*) &g_ndb_slave_state.trans_conflict_commit_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2949   {"epoch_delete_delete_count", (char*) &g_ndb_slave_state.total_delete_delete_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2950   {"reflected_op_prepare_count", (char*) &g_ndb_slave_state.total_reflect_op_prepare_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2951   {"reflected_op_discard_count", (char*) &g_ndb_slave_state.total_reflect_op_discard_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2952   {"refresh_op_count", (char*) &g_ndb_slave_state.total_refresh_op_count, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2953   {"last_conflict_epoch",    (char*) &g_ndb_slave_state.last_conflicted_epoch, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2954   {"last_stable_epoch",    (char*) &g_ndb_slave_state.last_stable_epoch, SHOW_LONGLONG, SHOW_SCOPE_GLOBAL},
2955   {NullS, NullS, SHOW_LONG, SHOW_SCOPE_GLOBAL}
2956 };
2957 
2958 int
show_ndb_conflict_status_vars(THD * thd,struct st_mysql_show_var * var,char * buff)2959 show_ndb_conflict_status_vars(THD *thd, struct st_mysql_show_var *var, char *buff)
2960 {
2961   /* Just a function to allow moving array into this file */
2962   var->type = SHOW_ARRAY;
2963   var->value = (char*) &ndb_status_conflict_variables;
2964   return 0;
2965 }
2966 
2967